Netty源码—5.Pipeline和Handler
大纲
1.Pipeline和Handler的作用和构成
2.ChannelHandler的分类
3.几个特殊的ChannelHandler
4.ChannelHandler的生命周期
5.ChannelPipeline的事件处理
6.关于ChannelPipeline的问题整理
7.ChannelPipeline主要包括三部分内容
8.ChannelPipeline的初始化
9.ChannelPipeline添加ChannelHandler
10.ChannelPipeline删除ChannelHandler
11.Inbound事件的传播
12.Outbound事件的传播
13.ChannelPipeline中异常的传播
14.ChannelPipeline总结
1.Pipeline和Handler的作用和构成
(1)Pipeline和Handler的作用
(2)Pipeline和Handler的构成
(1)Pipeline和Handler的作用
可以在处理复杂的业务逻辑时避免if else的泛滥,可以实现对业务逻辑的模块化处理,不同的逻辑放置到单独的类中进行处理。最后将这些逻辑串联起来,形成一个完整的逻辑处理链。
Netty通过责任链模式来组织代码逻辑,能够支持逻辑的动态添加和删除,能够支持各类协议的扩展。
(2)Pipeline和Handler的构成
在Netty里,一个连接对应着一个Channel。这个Channel的所有处理逻辑都在一个叫ChannelPipeline的对象里,ChannelPipeline是双向链表结构,它和Channel之间是一对一的关系。
ChannelPipeline里的每个结点都是一个ChannelHandlerContext对象,这个ChannelHandlerContext对象能够获得和Channel相关的所有上下文信息。每个ChannelHandlerContext对象都包含一个逻辑处理器ChannelHandler,每个逻辑处理器ChannelHandler都处理一块独立的逻辑。
2.ChannelHandler的分类
ChannelHandler有两大子接口,分别为Inbound和Outbound类型:第一个子接口是ChannelInboundHandler,用于处理读数据逻辑,最重要的方法是channelRead()。第二个子接口是ChannelOutboundHandler,用于处理写数据逻辑,最重要的方法是write()。
这两个子接口默认的实现分别是:ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter。它们分别实现了两个子接口的所有功能,在默认情况下会把读写事件传播到下一个Handler。
InboundHandler的事件通常只会传播到下一个InboundHandler,OutboundHandler的事件通常只会传播到下一个OuboundHandler,InboundHandler的执行顺序与实际addLast的添加顺序相同,OutboundHandler的执行顺序与实际addLast的添加顺序相反。
Inbound事件通常由IO线程触发,如TCP链路的建立事件、关闭事件、读事件、异常通知事件等。其触发方法一般带有fire字眼,如下所示:
ctx.fireChannelRegister()、
ctx.fireChannelActive()、
ctx.fireChannelRead()、
ctx.fireChannelReadComplete()、
ctx.fireChannelInactive()。
Outbound事件通常由用户主动发起的网络IO操作触发,如用户发起的连接操作、绑定操作、消息发送等操作。其触发方法一般如:ctx.bind()、ctx.connect()、ctx.write()、ctx.flush()、ctx.read()、ctx.disconnect()、ctx.close()。
3.几个特殊的ChannelHandler
(1)ChannelInboundHandlerAdapter
(2)ChannelOutboundHandlerAdapter
(3)ByteToMessageDecoder
(4)SimpleChannelInboundHandler
(5)MessageToByteEncoder
(1)ChannelInboundHandlerAdapter
ChannelInboundHandlerAdapter主要用于实现ChannelInboundHandler接口的所有方法,这样我们在继承它编写自己的ChannelHandler时就不需要实现ChannelHandler里的每种方法了,从而避免了直接实现ChannelHandler时需要实现其所有方法而导致代码显得冗余和臃肿。
//Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in its ChannelPipeline. public interface ChannelHandler { //Gets called after the ChannelHandler was added to the actual context and it's ready to handle events. void handlerAdded(ChannelHandlerContext ctx) throws Exception; //Gets called after the ChannelHandler was removed from the actual context and it doesn't handle events anymore. void handlerRemoved(ChannelHandlerContext ctx) throws Exception; //Gets called if a Throwable was thrown. @Deprecated void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; //Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition. @Inherited @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value } } //Skeleton implementation of a ChannelHandler. public abstract class ChannelHandlerAdapter implements ChannelHandler { // Not using volatile because it's used only for a sanity check. boolean added; //Return true if the implementation is Sharable and so can be added to different ChannelPipelines. public boolean isSharable() { Class<?> clazz = getClass(); Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; } //Do nothing by default, sub-classes may override this method. @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP } //Do nothing by default, sub-classes may override this method. @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP } //Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline. @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } } //ChannelHandler which adds callbacks for state changes. //This allows the user to hook in to state changes easily. public interface ChannelInboundHandler extends ChannelHandler { //The Channel of the ChannelHandlerContext was registered with its EventLoop void channelRegistered(ChannelHandlerContext ctx) throws Exception; //The Channel of the ChannelHandlerContext was unregistered from its EventLoop void channelUnregistered(ChannelHandlerContext ctx) throws Exception; //The Channel of the ChannelHandlerContext is now active void channelActive(ChannelHandlerContext ctx) throws Exception; //The Channel of the ChannelHandlerContext was registered is now inactive and reached its end of lifetime. void channelInactive(ChannelHandlerContext ctx) throws Exception; //Invoked when the current Channel has read a message from the peer. void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; //Invoked when the last message read by the current read operation has been consumed by #channelRead(ChannelHandlerContext, Object). //If ChannelOption#AUTO_READ is off, no further attempt to read an inbound data from the current Channel will be made until ChannelHandlerContext#read() is called. void channelReadComplete(ChannelHandlerContext ctx) throws Exception; //Gets called if an user event was triggered. void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; //Gets called once the writable state of a Channel changed. //You can check the state with Channel#isWritable(). void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; //Gets called if a Throwable was thrown. @Override @SuppressWarnings("deprecation") void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; } //Abstract base class for ChannelInboundHandler implementations which provide implementations of all of their methods. public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { //Calls ChannelHandlerContext#fireChannelRegistered() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } //Calls ChannelHandlerContext#fireChannelUnregistered() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } //Calls ChannelHandlerContext#fireChannelActive() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } //Calls ChannelHandlerContext#fireChannelInactive() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } //Calls ChannelHandlerContext#fireChannelRead(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } //Calls ChannelHandlerContext#fireChannelReadComplete() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } //Calls ChannelHandlerContext#fireUserEventTriggered(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } //Calls ChannelHandlerContext#fireChannelWritabilityChanged() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } //Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline. @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
(2)ChannelOutboundHandlerAdapter
ChannelOutboundHandlerAdapter主要用于实现ChannelOutboundHandler接口的所有方法,这样我们在继承它编写自己的ChannelHandler时就不需要实现ChannelHandler里的每种方法了,从而避免了直接实现ChannelHandler时需要实现其所有方法而导致代码显得冗余和臃肿。
//ChannelHandler which will get notified for IO-outbound-operations. public interface ChannelOutboundHandler extends ChannelHandler { //Called once a bind operation is made. void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception; //Called once a connect operation is made. void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception; //Called once a disconnect operation is made. void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; //Called once a close operation is made. void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; //Called once a deregister operation is made from the current registered EventLoop. void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; //Intercepts ChannelHandlerContext#read(). void read(ChannelHandlerContext ctx) throws Exception; //Called once a write operation is made. The write operation will write the messages through the ChannelPipeline. //Those are then ready to be flushed to the actual Channel once Channel#flush() is called. void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; //Called once a flush operation is made. The flush operation will try to flush out all previous written messages that are pending. void flush(ChannelHandlerContext ctx) throws Exception; } //Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext. public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { //Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); } //Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); } //Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); } //Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); } //Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); } //Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } //Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } //Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
(3)ByteToMessageDecoder
基于这个ChannelHandler可以实现自定义解码,而不用关心ByteBuf的强转和解码结果的传递。Netty里的ByteBuf默认下使用的是堆外内存,ByteToMessageDecoder会自动进行内存的释放,不用操心内存管理。我们自定义的ChannelHandler继承了ByteToMessageDecoder后,需要实现decode()方法。
(4)SimpleChannelInboundHandler
基于这个ChannelHandler可以实现每一种指令的处理,不再需要强转、不再有冗长的if else逻辑、不再需要手动传递对象。
同时还可以自动释放没有往下传播的ByteBuf,因为我们编写指令处理ChannelHandler时,可能会编写不用关心的if else判断,然后手动传递无法处理的对象至下一个指令处理器。
//xxxHandler.java if (packet instanceof xxxPacket) { //进行处理 } else { ctx.fireChannelRead(packet); }
(5)MessageToByteEncoder
基于这个ChannelHandler可以实现自定义编码,而不用关心ByteBuf的创建,不用把创建完的ByteBuf进行返回。
4.ChannelHandler的生命周期
(1)ChannelHandler回调方法的执行顺序
ChannelHandler回调方法的执行顺序可以称为ChannelHandler的生命周期。
新建连接时ChannelHandler回调方法的执行顺序是:handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete()。
关闭连接时ChannelHandler回调方法的执行顺序是:channelInactive() -> channelUnregistered() -> handlerRemoved()。
接下来是ChannelHandler具体的回调方法说明,其中一二三的顺序可以参考AbstractChannel的内部类AbstractUnsafe的register0()方法。
一.handlerAdded()
检测到新连接后调用"ch.pipeline().addLast(...)"之后的回调,表示当前Channel已成功添加一个ChannelHandler。
二.channelRegistered()
表示当前Channel已和某个NioEventLoop线程建立了绑定关系,已经创建了一个Reactor线程来处理当前这个Channel的读写。
三.channelActive()
当Channel的Pipeline已经添加完所有的ChannelHandler以及绑定好一个NioEventLoop线程,这个Channel对应的连接才算真正被激活,接下来就会回调该方法。
四.channelRead()
服务端每次收到客户端发送的数据时都会回调该方法,表示有数据可读。
五.channelReadComplete()
服务端每次读完一条完整的数据都会回调该方法,表示数据读取完毕。
六.channelInactive()
表示这个连接已经被关闭,该连接在TCP层已经不再是ESTABLISH状态。
七.channelUnregister()
表示与这个连接对应的NioEventLoop线程移除了对这个连接的处理。
八.handlerRemoved()
表示给这个连接添加的所有的ChannelHandler都被移除了。
(2)ChannelHandler回调方法的应用场景
一.handlerAdded()方法与handlerRemoved()方法通常可用于一些资源的申请和释放。
二.channelActive()方法与channelInactive()方法表示的是TCP连接的建立与释放,可用于统计单机连接数或IP过滤。
三.channelRead()方法可用于根据自定义协议进行拆包。每次读到一定数据就累加到一个容器里,然后看看能否拆出完整的包。
四.channelReadComplete()方法可用于实现批量刷新。如果每次向客户端写数据都通过writeAndFlush()方法写数据并刷新到底层,其实并不高效。所以可以把调用writeAndFlush()方法的地方换成调用write()方法,然后再在channelReadComplete()方法里调用ctx.channel().flush()。
5.ChannelPipeline的事件处理
(1)消息读取和发送被Pipeline处理的过程
(2)ChannelPipeline的主要特征
(1)消息读取和发送被Pipeline处理的过程
消息的读取和发送被ChannelPipeline的ChannelHandler链拦截和处理的全过程:
一.首先AbstractNioChannel内部类NioUnsafe的read()方法读取ByteBuf时会触发ChannelRead事件,也就是由NioEventLoop线程调用ChannelPipeline的fireChannelRead()方法将ByteBuf消息传输到ChannelPipeline中。
二.然后ByteBuf消息会依次被HeadContext、xxxChannelHandler、...、TailContext拦截处理。在这个过程中,任何ChannelHandler都可以中断当前的流程,结束消息的传递。
三.接着用户可能会调用ChannelHandlerContext的write()方法发送ByteBuf消息。此时ByteBuf消息会从TailContext开始,途径xxxChannelHandler、...、HeadContext,最终被添加到消息发送缓冲区中等待刷新和发送。在这个过程中,任何ChannelHandler都可以中断当前的流程,中断消息的传递。
(2)ChannelPipeline的主要特征
一.ChannelPipeline支持运行时动态地添加或者删除ChannelHandler
例如业务高峰时对系统做拥塞保护。处于业务高峰期时,则动态地向当前的ChannelPipeline添加ChannelHandler。高峰期过后,再移除ChannelHandler。
二.ChannelPipeline是线程安全的
多个业务线程可以并发操作ChannelPipeline,因为使用了synchronized关键字。但ChannelHandler却不一定是线程安全的,这由用户保证。
6.关于ChannelPipeline的问题整理
一.Netty是如何判断ChannelHandler类型的?
即如何判断一个ChannelHandler是Inbound类型还是Outbound类型?
答:当调用Pipeline去添加一个ChannelHandler结点时,旧版Netty会使用instanceof关键字来判断该结点是Inbound类型还是Outbound类型,并分别用一个布尔类型的变量来进行标识。新版Netty则使用一个整形的executionMask来具体区分详细的Inbound事件和Outbound事件。这个executionMask对应一个16位的二进制数,是哪一种事件就对应哪一个Mask。
//Inbound事件的Mask MASK_EXEPTION_CAUGHT = 1; MASK_CHANNEL_REGISTER = 1 << 1; MASK_CHANNEL_UNREGISTER = 1 << 2; MASK_CHANNEL_ACTIVE = 1 << 3; MASK_CHANNEL_INACTIVE = 1 << 4; MASK_CHANNEL_READ = 1 << 5; MASK_CHANNEL_READ_COMPLETE = 1 << 6; MASK_CHANNEL_USER_EVENT_TRIGGERED = 1 << 7; MASK_CHANNEL_WRITABLITY_CHANGED = 1 << 8; //Outbound事件的Mask MASK_BIND = 1 << 9; MASK_CONNECT = 1 << 10; MASK_DISCONNECT = 1 << 11; MASK_CLOSE = 1 << 12; MASK_DEREGISTER = 1 << 13; MASK_READ = 1 << 14; MASK_WRITE = 1 << 15; MASK_FLUSH = 1 << 16;
二.添加ChannelHandler时应遵循什么样的顺序?
答:Inbound类型的事件传播跟添加ChannelHandler的顺序一样,Outbound类型的事件传播跟添加ChannelHandler的顺序相反。
三.用户手动触发事件传播的两种方式有什么区别?
这两种方式是分别是:ctx.writeAndFlush()和ctx.channel().writeAndFlush()。
答:当通过Channel去触发一个事件时,那么该事件会沿整个ChannelPipeline传播。如果是Inbound类型事件,则从HeadContext结点开始向后传播到最后一个Inbound类型的结点。如果是Outbound类型事件,则从TailContext结点开始向前传播到第一个Outbound类型的结点。当通过当前结点去触发一个事件时,那么该事件只会从当前结点开始传播。如果是Inbound类型事件,则从当前结点开始一直向后传播到最后一个Inbound类型的结点。如果是Outbound类型事件,则从当前结点开始一直向前传播到第一个Outbound类型的结点。
7.ChannelPipeline主要包括三部分内容
一.ChannelPipeline的初始化
服务端Channel和客户端Channel在何时初始化ChannelPipeline?在初始化时又做了什么事情?
二.添加和删除ChannelHandler
Netty是如何实现业务逻辑处理器动态编织的?
三.事件和异常的传播
读写事件和异常在ChannelPipeline中的传播。
8.ChannelPipeline的初始化
(1)ChannelPipeline的初始化时机
(2)ChannelPipeline的初始化内容
(3)ChannelPipeline的说明
(1)ChannelPipeline的初始化时机
在服务端启动和客户端连接接入的过程中,在创建NioServerSocketChannel和NioSocketChannel时,会逐层执行父类的构造方法,最后执行到AbstractChannel的构造方法。AbstractChannel的构造方法会将Netty的核心组件创建出来。而核心组件中就包含了DefaultChannelPipeline类型的ChannelPipeline组件。
//A skeletal Channel implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final Channel parent; private final ChannelId id; private final Unsafe unsafe; private final DefaultChannelPipeline pipeline; ... //Creates a new instance. protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } //Returns a new DefaultChannelPipeline instance. protected DefaultChannelPipeline newChannelPipeline() { //创建ChannelPipeline组件 return new DefaultChannelPipeline(this); } ... } //The default ChannelPipeline implementation. //It is usually created by a Channel implementation when the Channel is created. public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; private final Channel channel;//保存了Channel的引用 ... protected DefaultChannelPipeline(Channel channel) { //保存Channel的引用到Pipeline组件的成员变量 this.channel = ObjectUtil.checkNotNull(channel, "channel"); ... tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } ... }
(2)ChannelPipeline的初始化内容
ChannelPipeline的初始化主要涉及三部分内容:
一.Pipeline在创建Channel时被创建
二.Pipeline的结点是ChannelHandlerContext
三.Pipeline两大哨兵HeadContext和TailContext
(3)ChannelPipeline的说明
ChannelPipeline中保存了Channel的引用,ChannelPipeline中每个结点都是一个ChannelHandlerContext对象,每个ChannelHandlerContext结点都包裹着一个ChannelHandler执行器,每个ChannelHandlerContext结点都保存了它包裹的执行器ChannelHandler执行操作时所需要的上下文ChannelPipeline。由于ChannelPipeline又保存了Channel的引用,所以每个ChannelHandlerContext结点都可以拿到所有的上下文信息。
ChannelHandlerContext接口多继承自AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker。
ChannelHandlerContext的关键方法有:channel()、executor()、handler()、pipeline()、alloc()。ChannelHandlerContext默认是由AbstractChannelHandlerContext去实现的,它实现了大部分功能。
ChannelPipeline初始化时会初始化两个结点:HeadContext和TailContext,并构成双向链表。HeadContext结点会比TailContext结点多一个unsafe成员变量。
public class DefaultChannelPipeline implements ChannelPipeline { //ChannelPipeline中每个结点都是一个ChannelHandlerContext对象 final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; private final Channel channel;//ChannelPipeline中保存了Channel的引用 ... protected DefaultChannelPipeline(Channel channel) { //保存Channel的引用到Pipeline组件的成员变量 this.channel = ObjectUtil.checkNotNull(channel, "channel"); ... //ChannelPipeline初始化时会初始化两个结点:HeadContext和TailContext,并构成双向链表 tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { //HeadContext结点会比TailContext结点多一个unsafe成员变量 private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } ... } final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } ... } ... } //ChannelHandlerContext默认是由AbstractChannelHandlerContext去实现的,它实现了大部分功能 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { //每个ChannelHandlerContext结点都保存了它包裹的执行器ChannelHandler执行操作时所需要的上下文ChannelPipeline private final DefaultChannelPipeline pipeline; ... AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; ordered = executor == null || executor instanceof OrderedEventExecutor; } } public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { //Return the Channel which is bound to the ChannelHandlerContext. Channel channel(); //Returns the EventExecutor which is used to execute an arbitrary task. EventExecutor executor(); //The unique name of the ChannelHandlerContext. //The name was used when then ChannelHandler was added to the ChannelPipeline. //This name can also be used to access the registered ChannelHandler from the ChannelPipeline. String name(); //The ChannelHandler that is bound this ChannelHandlerContext. ChannelHandler handler(); //Return true if the ChannelHandler which belongs to this context was removed from the ChannelPipeline. //Note that this method is only meant to be called from with in the EventLoop. boolean isRemoved(); ChannelHandlerContext fireChannelRegistered(); ChannelHandlerContext fireChannelUnregistered(); ChannelHandlerContext fireChannelActive(); ChannelHandlerContext fireChannelInactive(); ChannelHandlerContext fireExceptionCaught(Throwable cause); ChannelHandlerContext fireUserEventTriggered(Object evt); ChannelHandlerContext fireChannelRead(Object msg); ChannelHandlerContext fireChannelReadComplete(); ChannelHandlerContext fireChannelWritabilityChanged(); ChannelHandlerContext read(); ChannelHandlerContext flush(); //Return the assigned ChannelPipeline ChannelPipeline pipeline(); //Return the assigned ByteBufAllocator which will be used to allocate ByteBufs. ByteBufAllocator alloc(); ... }
9.ChannelPipeline添加ChannelHandler
(1)常见的客户端代码
(2)ChannelPipeline添加ChannelHandler入口
(3)DefaultChannelPipeline的addLast()方法
(4)检查是否重复添加ChannelHandler结点
(5)创建ChannelHandlerContext结点
(6)添加ChannelHandlerContext结点
(7)回调handerAdded()方法
(8)ChannelPipeline添加ChannelHandler总结
(1)常见的客户端代码
首先用一个拆包器Spliter对二进制数据流进行拆包,然后解码器Decoder会将拆出来的包进行解码,接着业务处理器BusinessHandler会处理解码出来的Java对象,最后编码器Encoder会将业务处理完的结果编码成二进制数据进行输出。
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(newSpliter()); p.addLast(new Decoder()); p.addLast(new BusinessHandler()); p.addLast(new Encoder()); } });
整个ChannelPipeline的结构如下所示:
这里共有两种不同类型的结点,结点之间通过双向链表连接。一种是ChannelInboundHandler,用来处理Inbound事件,比如读取数据流进行加工处理。一种是ChannelOutboundHandler,用来处理Outbound事件,比如当调用writeAndFlush()方法时就会经过这种类型的Handler。
(2)ChannelPipeline添加ChannelHandler入口
当服务端Channel的Reactor线程轮询到新连接接入的事件时,就会调用AbstractNioChannel的内部类NioUnsafe的read()方法,也就是调用AbstractNioMessageChannel的内部类NioMessageUnsafe的read()方法。
然后会触发执行代码pipeline.fireChannelRead()传播ChannelRead事件,从而最终触发调用ServerBootstrapAcceptor接入器的channelRead()方法。
在ServerBootstrapAcceptor的channelRead()方法中,便会通过执行代码channel.pipeline().addLast()添加ChannelHandler,也就是通过调用DefaultChannelPipeline的addLast()方法添加ChannelHandler。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop. public final class NioEventLoop extends SingleThreadEventLoop { Selector selector; private SelectedSelectionKeySet selectedKeys; private boolean needsToSelectAgain; private int cancelledKeys; ... @Override protected void run() { for (;;) { ... //1.调用select()方法执行一次事件轮询 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } ... //2.处理产生IO事件的Channel needsToSelectAgain = false; processSelectedKeys(); ... //3.执行外部线程放入TaskQueue的任务 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } private void processSelectedKeys() { if (selectedKeys != null) { //selectedKeys.flip()会返回一个数组 processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { //1.首先取出IO事件 final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null;//Help GC //2.然后获取对应的Channel和处理该Channel //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { //网络事件的处理 processSelectedKey(k, (AbstractNioChannel) a); } else { //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } //3.最后判断是否应该再进行一次轮询 if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); //selectedKeys.flip()会返回一个数组 selectedKeys = this.selectedKeys.flip(); i = -1; } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ... try { int readyOps = k.readyOps(); ... //boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入 //此时将调用Channel的unsafe变量来进行实际操作 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //调用AbstractNioMessageChannel的NioMessageUnsafe.read()方法 //进行新连接接入处理 unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } ... } //AbstractNioChannel base class for Channels that operate on messages. public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... private final class NioMessageUnsafe extends AbstractNioUnsafe { //临时存放读到的连接NioSocketChannel private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { //断言确保该read()方法必须来自Reactor线程调用 assert eventLoop().inEventLoop(); //获得Channel对应的Pipeline final ChannelPipeline pipeline = pipeline(); //获得Channel对应的RecvByteBufAllocator.Handle final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); do { //1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel //通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } } while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接 //2.设置并绑定NioSocketChannel int size = readBuf.size(); for (int i = 0; i < size; i ++) { //调用DefaultChannelPipeline的fireChannelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } //3.清理容器并触发DefaultChannelPipeline的fireChannelReadComplete()方法 readBuf.clear(); pipeline.fireChannelReadComplete(); } } ... } //The default ChannelPipeline implementation. //It is usually created by a Channel implementation when the Channel is created. public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ... protected DefaultChannelPipeline(Channel channel) { ... tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } @Override public final ChannelPipeline fireChannelRead(Object msg) { //从Pipeline的第一个HeadContext处理器开始调用 AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //调用AbstractChannelHandlerContext的fireChannelRead()方法 ctx.fireChannelRead(msg); } @Override public ChannelHandler handler() { return this; } ... } ... } public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { ... //初始化服务端Channel时,会向其Pipeline添加ServerBootstrapAcceptor处理器 @Override void init(Channel channel) throws Exception { //1.设置服务端Channel的Option与Attr final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //2.设置客户端Channel的Option与Attr final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //3.配置服务端启动逻辑 ChannelPipeline p = channel.pipeline(); //p.addLast()用于定义服务端启动过程中需要执行哪些逻辑 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { //一.添加用户自定义的Handler,注意这是handler,而不是childHandler final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) pipeline.addLast(handler); //二.添加一个特殊的Handler用于接收新连接 //自定义的childHandler会作为参数传入连接器ServerBootstrapAcceptor ch.eventLoop().execute(new Runnable() { @Override public void run() { //调用DefaultChannelPipeline的addLast()方法 pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs) ); } }); } }); } private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; ... //channelRead()方法在新连接接入时被调用 @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //1.给新连接的Channel添加用户自定义的Handler处理器 //这里的childHandler其实是一个特殊的Handler: ChannelInitializer child.pipeline().addLast(childHandler); //2.设置ChannelOption,主要和TCP连接一些底层参数及Netty自身对一个连接的参数有关 for (Entry<ChannelOption<?>, Object> e: childOptions) { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } //3.设置新连接Channel的属性 for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } //4.绑定Reactor线程 //childGroup是一个NioEventLoopGroup,所以下面会调用其父类的register()方法 childGroup.register(child); } ... } ... }
(3)DefaultChannelPipeline的addLast()方法
使用synchronized关键字是为了防止多线程并发操作ChannelPipeline底层的双向链表,添加ChannelHandler结点的过程主要分为4个步骤:
步骤一:判断ChannelHandler是否重复添加
步骤二:创建结点
步骤三:添加结点到链表
步骤四:回调添加完成事件
这个结点便是ChannelHandlerContext,Pipeline里每个结点都是一个ChannelHandlerContext。addLast()方法便是把ChannelHandler包装成一个ChannelHandlerContext,然后添加到链表。
public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) throw new NullPointerException("handlers"); for (ChannelHandler h: handlers) { if (h == null) break; addLast(executor, null, h); } return this; } @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //1.检查是否有重复的ChannelHandler结点 checkMultiplicity(handler); //2.创建ChannelHandlerContext结点 newCtx = newContext(group, filterName(name, handler), handler); //3.添加ChannelHandlerContext结点 addLast0(newCtx); ... } //4.回调用户方法 //通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了 callHandlerAdded0(newCtx); return this; } ... }
(4)检查是否重复添加ChannelHandler结点
Netty使用了一个成员变量added来表示一个ChannelHandler是否已经添加。如果当前要添加的ChannelHandler是非共享的并且已经添加过,那么抛出异常,否则标识该ChannelHandler已添加。
如果一个ChannelHandler支持共享,那么它就可以无限次被添加到ChannelPipeline中。如果要让一个ChannelHandler支持共享,只需要加一个@Sharable注解即可。而ChannelHandlerAdapter的isSharable()方法正是通过判断该ChannelHandler对应的类是否标有@Sharable注解来实现的。
Netty为了性能优化,还使用了ThreadLocal来缓存ChannelHandler是否共享的情况。在高并发海量连接下,每次有新连接添加ChannelHandler都会调用isSharable()方法,从而优化性能。
public class DefaultChannelPipeline implements ChannelPipeline { ... private static void checkMultiplicity(ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); } h.added = true; } } ... } //Skeleton implementation of a ChannelHandler. public abstract class ChannelHandlerAdapter implements ChannelHandler { //Not using volatile because it's used only for a sanity check. boolean added; //Return true if the implementation is Sharable and so can be added to different ChannelPipelines. public boolean isSharable() { //Cache the result of Sharable annotation detection to workaround a condition. //We use a ThreadLocal and WeakHashMap to eliminate the volatile write/reads. //Using different WeakHashMap instances per Thread is good enough for us and the number of Threads are quite limited anyway. Class<?> clazz = getClass(); Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; } ... }
(5)创建ChannelHandlerContext结点
根据ChannelHandler创建ChannelHandlerContext类型的结点时,会将该ChannelHandler的引用保存到结点的成员变量中。
public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //1.检查是否有重复的ChannelHandler结点 checkMultiplicity(handler); //2.创建ChannelHandlerContext结点 newCtx = newContext(group, filterName(name, handler), handler); //3.添加ChannelHandlerContext结点 addLast0(newCtx); ... } //4.回调用户方法 //通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了 callHandlerAdded0(newCtx); return this; } //给ChannelHandler创建一个唯一性的名字 private String filterName(String name, ChannelHandler handler) { if (name == null) { return generateName(handler); } checkDuplicateName(name); return name; } //根据ChannelHandler创建一个ChannelHandlerContext结点 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); } ... } final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; } ... } abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { ... AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; ordered = executor == null || executor instanceof OrderedEventExecutor; } ... }
(6)添加ChannelHandlerContext结点
使用尾插法向双向链表添加结点。
public class DefaultChannelPipeline implements ChannelPipeline { ... private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } ... } abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; ... }
(7)回调handerAdded()方法
向ChannelPipeline添加完新结点后,会使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成,然后执行ctx.handler().handlerAdded(ctx),回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法。
public class DefaultChannelPipeline implements ChannelPipeline { ... private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { //使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成 ctx.setAddComplete(); //回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法 ctx.handler().handlerAdded(ctx); } ... } //DemoHandler是用户定义的ChannelHandler public class DemoHandler extends SimpleChannelInboundHandler<...> { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //这个DemoHandler结点被添加到ChannelPipeline之后,就会回调这里的方法 } ... }
最典型的一个回调就是用户代码的ChannelInitializer被添加完成后,会先调用其initChannel()方法将用户自定义的ChannelHandler添加到ChannelPipeline,然后再调用pipeline.remove()方法将自身结点进行删除。
public class NettyServer { private int port; public NettyServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//监听端口的ServerSocketChannel .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) //设置一个ChannelInitializer类型的childHandler //新连接接入时,会执行ServerBootstrapAcceptor.channelRead()中的代码"child.pipeline().addLast(childHandler)" //也就是会把这个ChannelInitializer类型的结点会被添加到新连接Channel的Pipeline中 //添加完这个结点后会回调ChannelInitializer的handlerAdded()方法 //其中会调用ChannelInitializer的initChannel()方法给Pipeline添加真正的结点 //执行完initChannel()方法后,就会移除ChannelInitializer这个结点 .childHandler(new ChannelInitializer<SocketChannel>() {//处理每个客户端连接的SocketChannel @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new NettyServerHandler());//针对网络请求的处理逻辑 } }); ChannelFuture channelFuture = serverBootstrap.bind(port).sync();//同步等待启动服务器监控端口 channelFuture.channel().closeFuture().sync();//同步等待关闭启动服务器的结果 } catch (Exception e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception{ System.out.println("Starting Netty Server..."); int port = 8998; if (args.length > 0) { port = Integer.parseInt(args[0]); } new NettyServer(port).start(); } } @Sharable public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { ... @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { initChannel(ctx); } } @SuppressWarnings("unchecked") private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { remove(ctx); } return true; } return false; } private void remove(ChannelHandlerContext ctx) { try { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { //ChannelPipeline删除ChannelHandler结点(ChannelInitializer) pipeline.remove(this); } } finally { initMap.remove(ctx); } } ... }
(8)ChannelPipeline添加ChannelHandler总结
一.判断ChannelHandler是否重复添加的依据是:如果该ChannelHandler不是共享的且已被添加过,则拒绝添加。
二.否则就创建一个ChannelHandlerContext结点(ctx),并把这个ChannelHandler包装进去,也就是保存ChannelHandler的引用到ChannelHandlerContext的成员变量中。由于创建ctx时保存了ChannelHandler的引用、ChannelPipeline的引用到成员变量,ChannelPipeline又保存了Channel的引用,所以每个ctx都拥有一个Channel的所有信息。
三.接着通过双向链表的尾插法,将这个ChannelHandlerContext结点添加到ChannelPipeline中。
四.最后回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法。
10.ChannelPipeline删除ChannelHandler
Netty最大的特征之一就是ChannelHandler是可插拔的,可以动态编织ChannelPipeline。比如在客户端首次连接服务端时,需要进行权限认证,认证通过后就可以不用再认证了。下面的AuthHandler便实现了只对第一个传来的数据包进行认证校验。如果通过验证则删除此AuthHandler,这样后续传来的数据包便不会再校验了。
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> { ... protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throw Exception { if (verify(data)) { ctx.pipeline().remove(this); } else { ctx.close(); } } }
DefaultChannelPipeline的remove()方法如下:
public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline remove(ChannelHandler handler) { remove(getContextOrDie(handler)); return this; } private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) { AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); if (ctx == null) { throw new NoSuchElementException(handler.getClass().getName()); } else { return ctx; } } @Override public final ChannelHandlerContext context(ChannelHandler handler) { ... AbstractChannelHandlerContext ctx = head.next; //遍历双向链表 for (;;) { ... if (ctx.handler() == handler) return ctx; ctx = ctx.next; } } private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) { //Pipeline中的head和tail结点不能被删除 assert ctx != head && ctx != tail; synchronized (this) { //调整链表指针并删除 remove0(ctx); ... } //回调用户在这个要删除的ChannelHandler实现的handlerRemoved()方法 callHandlerRemoved0(ctx); return ctx; } private static void remove0(AbstractChannelHandlerContext ctx) { AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; } private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) { ... ctx.handler().handlerRemoved(ctx); ... } ... }
ChannelPipeline删除ChannelHandler的步骤:
一.遍历双向链表,根据ChannelHandler找到对应的ChannelHandlerContext结点。
二.通过调整ChannelPipeline中双向链表的指针来删除对应的ChannelHandlerContext结点。
三.回调用户在这个要删除的ChannelHandler实现的handlerRemoved()方法,比如进行资源清理。
11.Inbound事件的传播
(1)Unsafe的介绍
(2)Unsafe的继承结构
(3)Unsafe的分类
(4)ChannelPipeline中Inbound事件传播
(5)ChannelPipeline中的头结点和尾结点
(6)Inbound事件的传播总结
(1)Unsafe的介绍
Unsafe和ChannelPipeline密切相关,ChannelPipeline中有关IO的操作最终都会落地到Unsafe的。Unsafe是不安全的意思,即不要在应用程序里直接使用Unsafe及它的衍生类对象。Unsafe是在Channel中定义的,是属于Channel的内部类。Unsafe中的接口操作都和JDK底层相关,包括:分配内存、Socket四元组信息、注册事件循环、绑定端口、Socket的连接和关闭、Socket的读写。
//A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind. public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { //Returns the globally unique identifier of this Channel. ChannelId id(); //Return the EventLoop this Channel was registered to. EventLoop eventLoop(); //Returns the parent of this channel. Channel parent(); //Returns the configuration of this channel. ChannelConfig config(); //Returns true if the Channel is open and may get active later boolean isOpen(); //Returns true if the Channel is registered with an EventLoop. boolean isRegistered(); //Return true if the Channel is active and so connected. boolean isActive(); //Return the ChannelMetadata of the Channel which describe the nature of the Channel. ChannelMetadata metadata(); //Returns the local address where this channel is bound to. //The returned SocketAddress is supposed to be down-cast into more concrete type such as InetSocketAddress to retrieve the detailed information. SocketAddress localAddress(); //Returns the remote address where this channel is connected to. //The returned SocketAddress is supposed to be down-cast into more concrete type such as InetSocketAddress to retrieve the detailed information. SocketAddress remoteAddress(); //Returns the ChannelFuture which will be notified when this channel is closed. //This method always returns the same future instance. ChannelFuture closeFuture(); //Returns true if and only if the I/O thread will perform the requested write operation immediately. //Any write requests made when this method returns false are queued until the I/O thread is ready to process the queued write requests. boolean isWritable(); //Get how many bytes can be written until #isWritable() returns false. //This quantity will always be non-negative. If #isWritable() is false then 0. long bytesBeforeUnwritable(); //Get how many bytes must be drained from underlying buffers until #isWritable() returns true. //This quantity will always be non-negative. If #isWritable() is true then 0. long bytesBeforeWritable(); //Returns an <em>internal-use-only</em> object that provides unsafe operations. Unsafe unsafe(); //Return the assigned ChannelPipeline. ChannelPipeline pipeline(); //Return the assigned ByteBufAllocator which will be used to allocate ByteBufs. ByteBufAllocator alloc(); @Override Channel read(); @Override Channel flush(); //Unsafe operations that should never be called from user-code. //These methods are only provided to implement the actual transport, and must be invoked from an I/O thread except for the following methods: //#invoker() //#localAddress() //#remoteAddress() //#closeForcibly() //#register(EventLoop, ChannelPromise) //#deregister(ChannelPromise) //#voidPromise() interface Unsafe { //Return the assigned RecvByteBufAllocator.Handle which will be used to allocate ByteBuf's when receiving data. RecvByteBufAllocator.Handle recvBufAllocHandle(); //Return the SocketAddress to which is bound local or null if none. SocketAddress localAddress(); //Return the SocketAddress to which is bound remote or null if none is bound yet. SocketAddress remoteAddress(); //Register the Channel of the ChannelPromise and notify the ChannelFuture once the registration was complete. void register(EventLoop eventLoop, ChannelPromise promise); //Bind the SocketAddress to the Channel of the ChannelPromise and notify it once its done. void bind(SocketAddress localAddress, ChannelPromise promise); //Connect the Channel of the given ChannelFuture with the given remote SocketAddress. //If a specific local SocketAddress should be used it need to be given as argument. Otherwise just pass null to it. //he ChannelPromise will get notified once the connect operation was complete. void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); //Disconnect the Channel of the ChannelFuture and notify the ChannelPromise once the operation was complete. void disconnect(ChannelPromise promise); //Close the Channel of the ChannelPromise and notify the ChannelPromise once the operation was complete. void close(ChannelPromise promise); //Closes the Channel immediately without firing any events. Probably only useful when registration attempt failed. void closeForcibly(); //Deregister the Channel of the ChannelPromise from EventLoop and notify the ChannelPromise once the operation was complete. void deregister(ChannelPromise promise); //Schedules a read operation that fills the inbound buffer of the first ChannelInboundHandler in the ChannelPipeline. //If there's already a pending read operation, this method does nothing. void beginRead(); //Schedules a write operation. void write(Object msg, ChannelPromise promise); //Flush out all write operations scheduled via #write(Object, ChannelPromise). void flush(); //Return a special ChannelPromise which can be reused and passed to the operations in Unsafe. //It will never be notified of a success or error and so is only a placeholder for operations //that take a ChannelPromise as argument but for which you not want to get notified. ChannelPromise voidPromise(); //Returns the ChannelOutboundBuffer of the Channel where the pending write requests are stored. ChannelOutboundBuffer outboundBuffer(); } } public abstract class AbstractNioChannel extends AbstractChannel { ... public interface NioUnsafe extends Unsafe { //Return underlying SelectableChannel SelectableChannel ch(); //Finish connect void finishConnect(); //Read from underlying SelectableChannel void read(); void forceFlush(); } ... }
(2)Unsafe的继承结构
一.NioUnsafe增加了可以访问底层JDK的SelectableChannel的功能,定义了从SelectableChannel读取数据的read()方法。
二.AbstractUnsafe实现了大部分Unsafe的功能。
三.AbstractNioUnsafe主要是通过代理到其外部类AbstractNioChannel获得与JDK NIO相关的一些信息,比如SelectableChannel、SelectionKey等。
四.NioMessageUnsafe和NioByteUnsafe是处在同一层次的抽象,Netty将一个新连接的建立也当作一个IO操作来处理,这里Message的含义可以当作一个SelectableChannel,读的意思就是接收一个SelectableChannel。
(3)Unsafe的分类
有两种类型的Unsafe:一种是与连接的字节数据读写相关的NioByteUnsafe,另一种是与新连接建立操作相关的NioMessageUnsafe。
一.NioByteUnsafe的读和写
NioByteUnsafe的读会被委托到NioByteChannel的doReadBytes()方法进行读取处理,doReadBytes()方法会将JDK的SelectableChannel的字节数据读取到Netty的ByteBuf中。
NioByteUnsafe中的写有两个方法,一个是write()方法,一个是flush()方法。write()方法是将数据添加到Netty的缓冲区,flush()方法是将Netty缓冲区的字节流写到TCP缓冲区,并最终委托到NioSocketChannel的doWrite()方法通过JDK底层Channel的write()方法写数据。
//AbstractNioChannel base class for Channels that operate on bytes. public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... protected class NioByteUnsafe extends AbstractNioUnsafe { ... //NioByteUnsafe的读 @Override public final void read() { ... doReadBytes(byteBuf); ... } } } public class NioSocketChannel extends AbstractNioByteChannel implements SocketChannel { ... @Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); } @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { ... ByteBuffer[] nioBuffers = in.nioBuffers(); SocketChannel ch = javaChannel(); ... ByteBuffer nioBuffer = nioBuffers[0]; ... ch.write(nioBuffer) ... } } public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... protected abstract class AbstractUnsafe implements Unsafe { private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); ... //NioByteUnsafe的写 @Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ... outboundBuffer.addMessage(msg, size, promise); } //NioByteUnsafe的写 @Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ... outboundBuffer.addFlush(); flush0(); } @SuppressWarnings("deprecation") protected void flush0() { ... doWrite(outboundBuffer); ... } } }
二.NioMessageUnsafe的读
NioMessageUnsafe的读会委托到NioServerSocketChannel的doReadMessages()方法进行处理。doReadMessages()方法会调用JDK的accept()方法新建立一个连接,并将这个连接放到一个List里以方便后续进行批量处理。
public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); //NioMessageUnsafe的读 @Override public void read() { ... doReadMessages(readBuf) ... } } } public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel { ... @Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } return 0; } }
(4)ChannelPipeline中Inbound事件传播
当新连接已准备接入或者已经存在的连接有数据可读时,会在NioEventLoop的processSelectedKey()方法中执行unsafe.read()。
如果是新连接已准备接入,执行的是NioMessageUnsafe的read()方法。如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法。
最后都会执行pipeline.fireChannelRead()引发ChannelPipeline的读事件传播。首先会从HeadContext结点开始,也就是调用HeadContext的channelRead()方法。然后触发调用AbstractChannelHandlerContext的fireChannelRead()方法,接着通过findContextInbound()方法找到HeadContext的下一个结点,然后通过invokeChannelRead()方法继续调用该结点的channelRead()方法,直到最后一个结点TailContext。
public final class NioEventLoop extends SingleThreadEventLoop { ... private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ... //新连接已准备接入或者已经存在的连接有数据可读 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //如果是新连接已准备接入,则调用NioMessageUnsafe的read()方法 //如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法 unsafe.read(); if (!ch.isOpen()) { return; } } } ... } public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //创建ByteBuf分配器 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); ... do { int localRead = doReadMessages(readBuf); ... allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; //调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); //调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件 pipeline.fireChannelReadComplete(); ... } ... } } public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... protected class NioByteUnsafe extends AbstractNioUnsafe { ... @Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //创建ByteBuf分配器 final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; do { //1.分配一个ByteBuf byteBuf = allocHandle.allocate(allocator); //2.将数据读取到分配的ByteBuf中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } ... //3.调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); //4.调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件 pipeline.fireChannelReadComplete(); ... } } } public class DefaultChannelPipeline implements ChannelPipeline { //ChannelPipeline的头结点 final AbstractChannelHandlerContext head; ... @Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //调用AbstractChannelHandlerContext的fireChannelRead()方法 ctx.fireChannelRead(msg); } } } abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; ... static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { //调用AbstractChannelHandlerContext的invokeChannelRead()方法 next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { //比如调用HeadContext的channelRead()方法 ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; } //寻找下一个结点 private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; } }
(5)ChannelPipeline中的头结点和尾结点
HeadContext是一个同时属于Inbound类型和Outbound类型的ChannelHandler,TailContext则只是一个属于Inbound类型的ChannelHandler。
HeadContext结点的作用就是作为头结点开始传递读写事件并调用unsafe进行实际的读写操作。比如Channel读完一次数据后,HeadContext的channelReadComplete()方法会被调用。然后继续执行如下的调用流程:readIfAutoRead() -> channel.read() -> pipeline.read() -> HeadContext.read() -> unsafe.beginRead() -> 再次注册读事件。所以Channel读完一次数据后,会继续向Selector注册读事件。这样只要Channel活跃就可以连续不断地读取数据,然后数据又会通过ChannelPipeline传递到HeadContext结点。
TailContext结点的作用是通过让方法体为空来终止大部分事件的传播,它的exceptionCaugh()方法和channelRead()方法分别会发出告警日志以及释放到达该结点的对象。
public class DefaultChannelPipeline implements ChannelPipeline { //ChannelPipeline的头结点 final AbstractChannelHandlerContext head; //ChannelPipeline的尾结点 final AbstractChannelHandlerContext tail; ... @Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //调用AbstractChannelHandlerContext的fireChannelRead()方法 ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); } } } final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } ... } //Called once a message hit the end of the ChannelPipeline without been handled by the user in ChannelInboundHandler#channelRead(ChannelHandlerContext, Object). //This method is responsible to call ReferenceCountUtil#release(Object) on the given msg at some point. protected void onUnhandledInboundMessage(Object msg) { try { logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } } }
(6)Inbound事件的传播总结
一般用户自定义的ChannelInboundHandler都继承自ChannelInboundHandlerAdapter。如果用户代码没有覆盖ChannelInboundHandlerAdapter的channelXXX()方法,那么Inbound事件会从HeadContext开始遍历ChannelPipeline的双向链表进行传播,并默认情况下传播到TailContext结点。
如果用户代码覆盖了ChannelInboundHandlerAdapter的channelXXX()方法,那么事件传播就会在当前结点结束。所以如果此时这个ChannelHandler又忘记了手动释放业务对象ByteBuf,则可能会造成内存泄露,而SimpleChannelInboundHandler则可以帮用户自动释放业务对象。
如果用户代码调用了ChannelHandlerContext的fireXXX()方法来传播事件,那么该事件就从当前结点开始往下传播。
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { //Calls ChannelHandlerContext#fireChannelRegistered() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } //Calls ChannelHandlerContext#fireChannelUnregistered() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } //Calls ChannelHandlerContext#fireChannelActive() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } //Calls ChannelHandlerContext#fireChannelInactive() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } //Calls ChannelHandlerContext#fireChannelRead(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } //Calls ChannelHandlerContext#fireChannelReadComplete() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } //Calls ChannelHandlerContext#fireUserEventTriggered(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } //Calls ChannelHandlerContext#fireChannelWritabilityChanged() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } //Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline. @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
12.Outbound事件的传播
(1)触发Outbound事件传播的入口
(2)Outbound事件传播的源码
(3)总结
(1)触发Outbound事件传播的入口
在消息推送系统中,可能会有如下代码,意思是根据用户ID获得对应的Channel,然后向用户推送消息。
Channel channel = ChannelManager.getChannel(userId); channel.writeAndFlush(response);
(2)Outbound事件传播的源码
如果通过Channel来传播Outbound事件,则是从TailContext开始传播的。
和Inbound事件一样,Netty为了保证程序的高效执行,所有核心操作都要在Reactor线程中处理。如果业务线程调用了Channel的方法,那么Netty会将该操作封装成一个Task任务添加到任务队列中,随后在Reactor线程的事件循环中执行。
findContextOutbound()方法找Outbound结点的过程和findContextInbound()方法找Inbound结点类似,需要反向遍历ChannelPipeline中的双向链表,一直遍历到第一个Outbound结点HeadCountext。
如果用户的ChannelHandler覆盖了Outbound类型的方法,但没有把事件在方法中继续传播下去,那么会导致该事件的传播中断。
最后一个Inbound结点是TailContext,最后一个Outbound结点是HeadContext,而数据最终会落到HeadContext的write()方法上。
下面是channel.writeAndFlush()方法的源码:
public interface ChannelOutboundInvoker { ... //Shortcut for call #write(Object) and #flush(). ChannelFuture writeAndFlush(Object msg); ... } public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final Channel parent; private final ChannelId id; private final Unsafe unsafe; private final DefaultChannelPipeline pipeline; ... protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); } @Override public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); } ... } public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; private final Channel channel; protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } @Override public final ChannelFuture writeAndFlush(Object msg) { //从TailContext开始传播 //但TailContext没有重写writeAndFlush()方法 //所以会调用AbstractChannelHandlerContext的writeAndFlush()方法 return tail.writeAndFlush(msg); } ... } abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; ... @Override public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); } @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) throw new NullPointerException("msg"); if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); return promise; } write(msg, true, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { //反向遍历链表进行查找 AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); //最终都会由Reactor线程处理Channel的数据读写 if (executor.inEventLoop()) { if (flush) { //调用结点的invokeWriteAndFlush()方法 next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } } private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //逐个调用ChannelHandler结点的write()方法,但前提是当前ChannelHandler可以往下传 //即write()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.write()往下传播 invokeWrite0(msg, promise); //逐个调用ChannelHandler结点的flush()方法,但前提是当前ChannelHandler可以往下传 //即flush()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.flush()往下传播 invokeFlush0(); } else { writeAndFlush(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { //逐个调用,最终回到HeadContext的write()方法 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } private void invokeFlush0() { try { //逐个调用,最终回到HeadContext的flush()方法 ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { notifyHandlerException(t); } } ... } public class DefaultChannelPipeline implements ChannelPipeline { ... final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } ... @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } } ... } //Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext. public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { //Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); } //Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); } //Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); } //Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); } //Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); } //Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } //Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } //Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
(3)总结
Outbound事件的传播机制和Inbound事件的传播机制类似。但Outbound事件是从链表尾部开始向前传播,而Inbound事件是从链表头部开始向后传播。Outbound事件传播中的写数据,最终都会落到HeadContext结点中的unsafe进行处理。
13.ChannelPipeline中异常的传播
Inbound事件和Outbound事件在传播时发生异常都会调用notifyHandlerExecption()方法,该方法会按Inbound事件的传播顺序找每个结点的异常处理方法exceptionCaught()进行处理。
我们通常在自定义的ChannelHandler中实现一个处理异常的方法exceptionCaught(),统一处理ChannelPipeline过程中的所有异常。这个自定义ChannelHandler一般继承自ChannelDuplexHandler,表示该结点既是一个Inbound结点,又是一个Outbound结点。
如果我们在自定义的ChannelHandler中没有处理异常,由于ChannelHandler通常都继承了ChannelInboundHandlerAdapter,通过其默认实现的exceptionCaught()方法可知异常会一直往下传递,直到最后一个结点的异常处理方法exceptionCaught()中结束。因此如果异常处理方法exceptionCaught()在ChannelPipeline中间的结点实现,则该结点后面的ChannelHandler抛出的异常就没法处理了。所以一般会在ChannelHandler链表的末尾结点实现处理异常的方法exceptionCaught()。
需要注意的是:在任何结点中发生的异常都会向下一个结点进行传递。
public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } ... } abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { ... @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; } static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } private void notifyHandlerException(Throwable cause) { if (inExceptionCaught(cause)) { if (logger.isWarnEnabled()) { logger.warn("...", cause); } return; } invokeExceptionCaught(cause); } private void invokeExceptionCaught(final Throwable cause) { if (invokeHandler()) { try { //调用ChannelHandler的exceptionCaught() handler().exceptionCaught(this, cause); } catch (Throwable error) { if (logger.isDebugEnabled()) { logger.debug("...", ThrowableUtil.stackTraceToString(error), cause); } else if (logger.isWarnEnabled()) { logger.warn("...", error, cause); } } } else { fireExceptionCaught(cause); } } @Override public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { //调用下一个结点next的exceptionCaught()方法 invokeExceptionCaught(next, cause); return this; } static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) { ObjectUtil.checkNotNull(cause, "cause"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeExceptionCaught(cause); } else { try { executor.execute(new Runnable() { @Override public void run() { next.invokeExceptionCaught(cause); } }); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to submit an exceptionCaught() event.", t); logger.warn("The exceptionCaught() event that was failed to submit was:", cause); } } } } ... } public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { ... //Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline. @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
14.ChannelPipeline总结
(1)ChannelPipeline的初始化
(2)ChannelPipeline的数据结构
(3)ChannelHandler类型的判断
(4)ChannelPipeline的头尾结点
(5)Channel与Unsafe
(6)ChannelPipeline的事件传播机制
(1)ChannelPipeline的初始化
ChannelPipeline在服务端Channel和客户端Channel被创建时创建,创建ChannelPipeline的类是服务端Channel和客户端Channel的共同父类AbstractChannel。
(2)ChannelPipeline的数据结构
ChannelPipeline中的数据结构是双向链表结构,每一个结点都是一个ChannelHandlerContext对象。ChannelHandlerContext里包装了用户自定义的ChannelHandler,即前者会保存后者的引用到其成员变量handler中。ChannelHandlerContext中拥有ChannelPipeline和Channel的所有上下文信息。添加和删除ChannelHandler最终都是在ChannelPipeline的链表结构中添加和删除对应的ChannelHandlerContext结点。
(3)ChannelHandler类型的判断
在旧版Netty中,会使用instanceof关键字来判断ChannelHandler的类型,并使用两个成员变量inbound和outbound来标识。在新版Netty中,会使用一个16位的二进制数executionMask来表示ChannelHandler具体实现的事件类型,若实现则给对应的位标1。
(4)ChannelPipeline的头尾结点
创建ChannelPipeline时会默认添加两个结点:HeadContext结点和TailContext结点。HeadContext结点的作用是作为头结点,开始传播读写事件,并且通过它的unsafe变量实现具体的读写操作。TailContext结点的作用是起到终止事件传播(方法体为空)以及异常和对象未处理的告警。
(5)Channel与Unsafe
一个Channel对应一个Unsafe,Unsafe用于处理底层IO操作。NioServerSocketChannel对应NioMessageUnsafe,NioSocketChannel对应NioByteUnsafe。
(6)ChannelPipeline的事件传播机制
ChannelPipeline中的事件传播机制分为3种:Inbound事件的传播、Outbound事件的传播、异常事件的传播。
一.Inbound事件的传播
如果通过Channel的Pipeline触发这类事件(默认情况下),那么触发的规则是从head结点开始不断寻找下一个InboundHandler,最终落到tail结点。如果在当前ChannelHandlerContext上触发这类事件,那么事件只会从当前结点开始向下传播。
二.Outbound事件的传播
如果通过Channel的Pipeline触发这类事件(默认情况下),那么触发的规则是从tail结点开始不断寻找上一个InboundHandler,最终落到head结点。如果在当前ChannelHandlerContext上触发这类事件,那么事件只会从当前结点开始向上传播。
三.异常事件的传播
异常在ChannelPipeline中的双向链表传播时,无论Inbound结点还是Outbound结点,都是向下一个结点传播,直到tail结点为止。TailContext结点会打印这些异常信息,最佳实践是在ChannelPipeline的最后实现异常处理方法exceptionCaught()。
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等