[关闭]
@adamhand 2019-04-06T10:21:46.000000Z 字数 19051 阅读 1136

netty源码阅读--ChannelPipeline


在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应, 它们的组成关系如下:



一个 Channel 包含了一个 ChannelPipeline, 而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表。这个链表的头是 HeadContext, 链表的尾是 TailContext, 并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。

ChannelPipeline 实例化过程

在服务端和客户端启动的时候分析过,创建Channel需要使用父类的构造方法,而在调用AbstractChannel的构造方法的时候,会给channel绑定一个ChannelPipeline,这个构造函数如下所示:

  1. protected AbstractChannel(Channel parent) {
  2. this.parent = parent;
  3. unsafe = newUnsafe();
  4. pipeline = new DefaultChannelPipeline(this);
  5. }

顺着DefaultChannelPipeline往下看,它的构造函数如下:

  1. public DefaultChannelPipeline(AbstractChannel channel) {
  2. if (channel == null) {
  3. throw new NullPointerException("channel");
  4. }
  5. this.channel = channel;
  6. tail = new TailContext(this);
  7. head = new HeadContext(this);
  8. head.next = tail;
  9. tail.prev = head;
  10. }

在 DefaultChannelPipeline 的构造方法中, 将传入的 channel 赋值给字段 this.channel, 接着又实例化了两个特殊的字段: tail 与 head,构成一个双向链表。其实在 DefaultChannelPipeline 中, 就是维护了一个以 AbstractChannelHandlerContext 为节点的双向链表。

head 和 tail 的类层次结构如下:


TailContext 的继承层次结构


HeadContext 的继承层次结构

从类层次结构图中可以很清楚地看到, head 实现了 ChannelInboundHandler, 而 tail 实现了 ChannelOutboundHandler 接口, 并且它们都实现了 ChannelHandlerContext 接口, 因此可以说 head 和 tail 即是一个 ChannelHandler, 又是一个 ChannelHandlerContext.

HeadContext和TailContext的构造器如下:

  1. HeadContext(DefaultChannelPipeline pipeline) {
  2. super(pipeline, null, HEAD_NAME, false, true);
  3. unsafe = pipeline.channel().unsafe();
  4. }
  5. TailContext(DefaultChannelPipeline pipeline) {
  6. super(pipeline, null, TAIL_NAME, true, false);
  7. }

HeadContext调用了父类 AbstractChannelHandlerContext 的构造器, 并传入参数 inbound = false, outbound = true。

TailContext 的构造器与 HeadContext 的相反, 它调用了父类 AbstractChannelHandlerContext 的构造器, 并传入参数 inbound = true, outbound = false。

即 header 是一个 outboundHandler, 而 tail 是一个inboundHandler。

ChannelInitializer 的添加

最开始的时候 ChannelPipeline 中含有两个 ChannelHandlerContext(同时也是 ChannelHandler), 但是这个 Pipeline并不能实现什么特殊的功能, 因为还没有给它添加自定义的 ChannelHandler。

通常来说, 在初始化 Bootstrap时会添加自定义的 ChannelHandler, 比如 EchoClient:

  1. handler(new ChannelInitializer<SocketChannel>() {
  2. @Override
  3. public void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline p = ch.pipeline();
  5. if (sslCtx != null) {
  6. p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
  7. }
  8. //p.addLast(new LoggingHandler(LogLevel.INFO));
  9. p.addLast(new EchoClientHandler());
  10. }
  11. })

ChannelInitializer 实现了 ChannelHandler, 顺着BootStrap.connect()方法往下走,可以发现hander()被添加到ChannelPipeline中的时机,调用链为:

  1. Bootstrap.connect() ->
  2. Bootstrap.doConnect() ->
  3. AbstractBootstrap.initAndRegister() ->
  4. Bootstrap.init()

具体代码块为:

  1. void init(Channel channel) throws Exception {
  2. ChannelPipeline p = channel.pipeline();
  3. p.addLast(handler());
  4. ...
  5. }

这里的handler()就是刚才初始化的hander(new Channelinitializer())。因此这里就是将 ChannelInitializer 插入到了 Pipeline 的末端。

进入addLast()方法,一路追踪,进入下面的addLast()方法:

  1. public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
  2. synchronized (this) {
  3. checkDuplicateName(name);
  4. AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
  5. addLast0(name, newCtx);
  6. }
  7. return this;
  8. }

上面的 addLast 方法中, 首先检查这个 ChannelHandler 的名字是否是重复的, 如果不重复的话, 则为这个 Handler 创建一个对应的 DefaultChannelHandlerContext 实例, 并与之关联起来(Context 中有一个 handler 属性保存着对应的 Handler 实例)。判断此 Handler 是否重名的方法很简单: Netty 中有一个 name2ctx Map 字段, key 是 handler 的名字, 而 value 则是 handler 本身。因此通过如下代码就可以判断一个 handler 是否重名了:

  1. private void checkDuplicateName(String name) {
  2. if (name2ctx.containsKey(name)) {
  3. throw new IllegalArgumentException("Duplicate handler name: " + name);
  4. }
  5. }

为了添加一个 handler 到 pipeline 中, 必须把此 handler 包装成 ChannelHandlerContext. 因此在上面的代码中我们可以看到新实例化了一个 newCtx 对象, 并将 handler 作为参数传递到构造方法中。下面是DefaultChannelHandlerContext的构造器:

  1. DefaultChannelHandlerContext(
  2. DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
  3. super(pipeline, group, name, isInbound(handler), isOutbound(handler));
  4. if (handler == null) {
  5. throw new NullPointerException("handler");
  6. }
  7. this.handler = handler;
  8. }

其中调用的isInbound()和isOutbound()方法如下所示:

  1. private static boolean isInbound(ChannelHandler handler) {
  2. return handler instanceof ChannelInboundHandler;
  3. }
  4. private static boolean isOutbound(ChannelHandler handler) {
  5. return handler instanceof ChannelOutboundHandler;
  6. }

当一个 handler 实现了 ChannelInboundHandler 接口, 则 isInbound 返回真; 相似地, 当一个 handler 实现了 ChannelOutboundHandler 接口, 则 isOutbound 就返回真。

而这两个 boolean 变量会传递到父类 AbstractChannelHandlerContext 中, 并初始化父类的两个字段: inbound 与 outbound。

想知道这里的 ChannelInitializer 所对应的 DefaultChannelHandlerContext 的 inbound 与 inbound 字段分别是什么,只需要看一下ChannelInitializer 实现了哪个接口就行了,下面是ChannelInitializer 的继承关系图:



可以清楚地看到, ChannelInitializer 仅仅实现了 ChannelInboundHandler 接口, 因此这里实例化的 DefaultChannelHandlerContext 的 inbound = true, outbound = false。

当创建好 Context 后, 就将这个 Context 插入到 Pipeline 的双向链表中:

  1. private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
  2. checkMultiplicity(newCtx);
  3. AbstractChannelHandlerContext prev = tail.prev;
  4. newCtx.prev = prev;
  5. newCtx.next = tail;
  6. prev.next = newCtx;
  7. tail.prev = newCtx;
  8. name2ctx.put(name, newCtx);
  9. callHandlerAdded(newCtx);
  10. }

此时 Pipeline 的结构如下图所示:



自定义 ChannelHandler 的添加过程

下面看看一下自定义的 ChannelHandler 是如何插入到 Pipeline 中的。

在前面client初始化章节已经分析了Channel 的注册过程,如下:

完整的调用链如下:

  1. BootStrap.connect() ->
  2. BootStrap.doConnect() ->
  3. AbstractBootstrap.initAndRegister() ->
  4. MultithreadEventLoopGroup.register() ->
  5. SingleThreadEventLoopGroup.register() ->
  6. SingleThreadEventLoop.register() ->
  7. AbstractChannel.AbstractUnsafe.register() ->
  8. AbstractChannel.AbstractUnsafe.register0() ->
  9. AbstractNioChannel.doRegister() ->
  10. AbstractSelectableChannel.register() ->
  11. SelectorImpl.register() ->
  12. WindowsSelectorImpl.implRegister()

自定义 ChannelHandler 的添加过程, 发生在 AbstractChanel.AbstractUnsafe.register0 中, 在这个方法中调用了 pipeline.fireChannelRegistered() 方法, 其实现如下:

  1. public ChannelPipeline fireChannelRegistered() {
  2. head.fireChannelRegistered();
  3. return this;
  4. }

上面方法的逻辑就是调用head.fireChannelRegistered()方法。这里需要注意的是,head.fireXXX 的调用形式, 是 Netty 中 Pipeline 传递事件的常用方式。进入AbstractChannelHandlerContext.fireChannelRegistered,它的逻辑如下:

  1. public ChannelHandlerContext fireChannelRegistered() {
  2. final AbstractChannelHandlerContext next = findContextInbound();
  3. EventExecutor executor = next.executor();
  4. if (executor.inEventLoop()) {
  5. next.invokeChannelRegistered();
  6. } else {
  7. executor.execute(new OneTimeTask() {
  8. @Override
  9. public void run() {
  10. next.invokeChannelRegistered();
  11. }
  12. });
  13. }
  14. return this;
  15. }

这个方法的第一句是调用 findContextInbound 获取一个 Context,进入这个方法看一下,它的逻辑如下:

  1. private AbstractChannelHandlerContext findContextInbound() {
  2. AbstractChannelHandlerContext ctx = this;
  3. do {
  4. ctx = ctx.next;
  5. } while (!ctx.inbound);
  6. return ctx;
  7. }

可以看到,这个方法会遍历Pipeline 的双向链表, 然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例。

由于ChannelInitializer 实现了 ChannelInboudHandler, 因此它所对应的 ChannelHandlerContext 的 inbound 属性就是 true, 因此这里返回就是 ChannelInitializer 实例所对应的 ChannelHandlerContext. 即:



接着,会调用它的invokeChannelRegistered方法:

  1. private void invokeChannelRegistered() {
  2. try {
  3. ((ChannelInboundHandler) handler()).channelRegistered(this);
  4. } catch (Throwable t) {
  5. notifyHandlerException(t);
  6. }
  7. }

由于每个 ChannelHandler 都与一个 ChannelHandlerContext 关联, 并且可以通过 ChannelHandlerContext 获取到对应的 ChannelHandler。因此这里 handler() 返回的, 其实就是一开始实例化的 ChannelInitializer 对象, 并接着调用了 ChannelInitializer.channelRegistered 方法。逻辑如下:

  1. public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  2. initChannel((C) ctx.channel());
  3. ctx.pipeline().remove(this);
  4. ctx.fireChannelRegistered();
  5. }

这里的initChannel 方法就是在初始化 Bootstrap 时, 调用 handler 方法传入的匿名内部类所实现的方法:

  1. .handler(new ChannelInitializer<SocketChannel>() {
  2. @Override
  3. public void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline p = ch.pipeline();
  5. p.addLast(new EchoClientHandler());
  6. }
  7. });

因此当调用了这个方法后, 自定义的 ChannelHandler 就插入到 Pipeline 了, 此时的 Pipeline 如下图所示:



当添加了自定义的 ChannelHandler 后, 会删除 ChannelInitializer 这个 ChannelHandler, 即 "ctx.pipeline().remove(this)", 因此最后的 Pipeline 如下:



ChannelHandler 的名字

上面说了,DefaultChannelPipeline有几个重载的addLast()方法,下面通过addLast()方法看一下ChannelHandlerContext的命名规则。下面的addLast()方法的第一个参数name就是ChannelHandlerContext的名字,或者说是channel的名字。

  1. public ChannelPipeline addLast(String name, ChannelHandler handler) {
  2. return addLast(null, name, handler);
  3. }

它会调用下面的重载方法:

  1. public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
  2. synchronized (this) {
  3. checkDuplicateName(name);
  4. AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
  5. addLast0(name, newCtx);
  6. }
  7. return this;
  8. }

可以看到,在添加一个 handler 之前, 需要调用 checkDuplicateName 方法来确定此 handler 的名字是否和已添加的 handler 的名字重复。

  1. private void checkDuplicateName(String name) {
  2. if (name2ctx.containsKey(name)) {
  3. throw new IllegalArgumentException("Duplicate handler name: " + name);
  4. }
  5. }

Netty 判断一个 handler 的名字是否重复的依据很简单: DefaultChannelPipeline 中有一个 类型为 Map 的 name2ctx 字段, 它的 key 是一个 handler 的名字, 而 value 则是这个 handler 所对应的 ChannelHandlerContext. 每当新添加一个 handler 时, 就会 put 到 name2ctx 中。因此检查 name2ctx 中是否包含这个 name 即可。

当没有重名的 handler 时, 就为这个 handler 生成一个关联的 DefaultChannelHandlerContext 对象, 然后就将 name 和 newCtx 作为 key-value 对 放到 name2Ctx 中。

自动生成 handler 的名字

还有一个重载的addLast()方法,如果不指定channel的名字而调用这个方法,就会自动生成一个名字,方法如下:

  1. public ChannelPipeline addLast(ChannelHandler... handlers) {
  2. return addLast(null, handlers);
  3. }

addLast()方法会调用generateName()方法生成一个名字。

  1. private String generateName(ChannelHandler handler) {
  2. WeakHashMap<Class<?>, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)];
  3. Class<?> handlerType = handler.getClass();
  4. String name;
  5. synchronized (cache) {
  6. name = cache.get(handlerType);
  7. if (name == null) {
  8. name = generateName0(handlerType);
  9. cache.put(handlerType, name);
  10. }
  11. }
  12. synchronized (this) {
  13. // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
  14. // any name conflicts. Note that we don't cache the names generated here.
  15. if (name2ctx.containsKey(name)) {
  16. String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
  17. for (int i = 1;; i ++) {
  18. String newName = baseName + i;
  19. if (!name2ctx.containsKey(newName)) {
  20. name = newName;
  21. break;
  22. }
  23. }
  24. }
  25. }
  26. return name;
  27. }

generateName()方法又会调用generateName0()方法产生一个 handler 的名字。自动生成的名字的规则很简单, 就是 handler 的简单类名加上 "#0", 因此 EchoClientHandler 的名字就是 "EchoClientHandler#0"。

  1. private static String generateName0(Class<?> handlerType) {
  2. return StringUtil.simpleClassName(handlerType) + "#0";
  3. }

Pipeline 的事件传输机制

根据前面的描述,可以知道,AbstractChannelHandlerContext 中有 inbound 和 outbound 两个 boolean 变量, 分别用于标识 Context 所对应的 handler 的类型, 即:

  1. ChannelHandlerContext.fireChannelRegistered()
  2. ChannelHandlerContext.fireChannelActive()
  3. ChannelHandlerContext.fireChannelRead(Object)
  4. ChannelHandlerContext.fireChannelReadComplete()
  5. ChannelHandlerContext.fireExceptionCaught(Throwable)
  6. ChannelHandlerContext.fireUserEventTriggered(Object)
  7. ChannelHandlerContext.fireChannelWritabilityChanged()
  8. ChannelHandlerContext.fireChannelInactive()
  9. ChannelHandlerContext.fireChannelUnregistered()

Oubound 事件传输方法有:

  1. ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
  2. ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
  3. ChannelHandlerContext.write(Object, ChannelPromise)
  4. ChannelHandlerContext.flush()
  5. ChannelHandlerContext.read()
  6. ChannelHandlerContext.disconnect(ChannelPromise)
  7. ChannelHandlerContext.close(ChannelPromise)

需要注意的是, 如果捕获了一个事件, 并且想让这个事件继续传递下去, 那么需要调用 Context 相应的传播方法。例如:

  1. public class MyInboundHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelActive(ChannelHandlerContext ctx) {
  4. System.out.println("Connected!");
  5. ctx.fireChannelActive();
  6. }
  7. }
  8. public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
  9. @Override
  10. public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
  11. System.out.println("Closing ..");
  12. ctx.close(promise);
  13. }
  14. }

上面的例子中, MyInboundHandler 收到了一个 channelActive 事件, 它在处理后, 如果希望将事件继续传播下去, 那么需要接着调用 ctx.fireChannelActive()。

Outbound 操作(outbound operations of a channel)

Outbound 事件都是请求事件(request event), 即请求某件事情的发生, 然后通过 Outbound 事件进行通知。Outbound 事件的传播方向是 tail -> customContext -> head。

接下来以 connect 事件为例, 分析一下 Outbound 事件的传播机制。
首先, 当用户调用了 Bootstrap.connect 方法时, 就会触发一个 Connect 请求事件, 此调用会触发如下调用链:

  1. Bootstrap.connect ->
  2. Bootstrap.doConnect ->
  3. Bootstrap.doConnect0 ->
  4. AbstractChannel.connect ->
  5. DefaultChannelPipeline.connect()

DefaultChannelPipeline.connect()实现如下:

  1. public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
  2. return tail.connect(remoteAddress, promise);
  3. }

可以看到, 当 outbound 事件(这里是 connect 事件)传递到 Pipeline 后, 它其实是以 tail 为起点开始传播的
而 tail.connect 其实调用的是 AbstractChannelHandlerContext.connect 方法:

  1. public ChannelFuture connect(
  2. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  3. ...
  4. final AbstractChannelHandlerContext next = findContextOutbound();
  5. EventExecutor executor = next.executor();
  6. ...
  7. next.invokeConnect(remoteAddress, localAddress, promise);
  8. ...
  9. return promise;
  10. }

indContextOutbound()的作用是以当前 Context 为起点, 向 Pipeline 中的 Context 双向链表的前端寻找第一个 outbound 属性为真的 Context(即关联着 ChannelOutboundHandler 的 Context), 然后返回。
它的实现如下:

  1. private AbstractChannelHandlerContext findContextOutbound() {
  2. AbstractChannelHandlerContext ctx = this;
  3. do {
  4. ctx = ctx.prev;
  5. } while (!ctx.outbound);
  6. return ctx;
  7. }

当找到了一个 outbound 的 Context 后, 就调用它的 invokeConnect 方法, 这个方法中会调用 Context 所关联着的 ChannelHandler 的 connect 方法:

  1. private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
  2. try {
  3. ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
  4. } catch (Throwable t) {
  5. notifyOutboundHandlerException(t, promise);
  6. }
  7. }

如果用户没有重写 ChannelHandler 的 connect 方法, 那么会调用 ChannelOutboundHandlerAdapter 所实现的方法:

  1. @Override
  2. public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
  3. SocketAddress localAddress, ChannelPromise promise) throws Exception {
  4. ctx.connect(remoteAddress, localAddress, promise);
  5. }

之后,会形成一个调用循环:

  1. Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect

这样的循环中, 直到 connect 事件传递到DefaultChannelPipeline 的双向链表的头节点, 即 head 中. head 实现了 ChannelOutboundHandler, 因此它的 outbound 属性是 true.

因为 head 本身既是一个 ChannelHandlerContext, 又实现了 ChannelOutboundHandler 接口, 因此当 connect 消息传递到 head 后, 会将消息转递到对应的 ChannelHandler 中处理, 而恰好, head 的 handler() 返回的就是 head 本身:

  1. @Override
  2. public ChannelHandler handler() {
  3. return this;
  4. }

因此最终 connect 事件是在 head 中处理的. head 的 connect 事件处理方法如下:

  1. @Override
  2. public void connect(
  3. ChannelHandlerContext ctx,
  4. SocketAddress remoteAddress, SocketAddress localAddress,
  5. ChannelPromise promise) throws Exception {
  6. unsafe.connect(remoteAddress, localAddress, promise);
  7. }

到这里, 整个 Connect 请求事件就结束了.下面以一幅图来描述一个整个 Connect 请求事件的处理过程:



Inbound 事件

Inbound 事件是一个通知事件, 即某件事已经发生了, 然后通过 Inbound 事件进行通知. Inbound 通常发生在 Channel 的状态的改变或 IO 事件就绪.

Inbound 的特点是它传播方向是 head -> customContext -> tail.

接着分析Outbound事件 Connect 事件后会发生什么 Inbound 事件。当 Connect 这个 Outbound 传播到 unsafe 后, 其实是在 AbstractNioUnsafe.connect 方法中进行处理的:

  1. @Override
  2. public final void connect(
  3. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  4. ...
  5. if (doConnect(remoteAddress, localAddress)) {
  6. fulfillConnectPromise(promise, wasActive);
  7. } else {
  8. ...
  9. }
  10. ...
  11. }

在 AbstractNioUnsafe.connect 中, 首先调用 doConnect 方法进行实际上的 Socket 连接, 当连接上后, 会调用 fulfillConnectPromise 方法:

  1. private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
  2. ...
  3. // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
  4. // because what happened is what happened.
  5. if (!wasActive && isActive()) {
  6. pipeline().fireChannelActive();
  7. }
  8. ...
  9. }

在 fulfillConnectPromise 中, 会通过调用 pipeline().fireChannelActive() 将通道激活的消息(即 Socket 连接成功)发送出去.

而这里, 当调用 pipeline.fireXXX 后, 就是 Inbound 事件的起点.因此当调用了 pipeline().fireChannelActive() 后, 就产生了一个 ChannelActive Inbound 事件, 可以从这里开始看看这个 Inbound 事件是怎么传播的。DefaultChannelPipeline.fireChannelActive()方法如下:

  1. public ChannelPipeline fireChannelActive() {
  2. head.fireChannelActive();
  3. if (channel.config().isAutoRead()) {
  4. channel.read();
  5. }
  6. return this;
  7. }

可以看到,在 fireChannelActive 方法中, 调用的是 head.fireChannelActive, 因此可以证明了, Inbound 事件在 Pipeline 中传输的起点是 head。继续往下看。

  1. public ChannelHandlerContext fireChannelActive() {
  2. final AbstractChannelHandlerContext next = findContextInbound();
  3. EventExecutor executor = next.executor();
  4. if (executor.inEventLoop()) {
  5. next.invokeChannelActive();
  6. } else {
  7. executor.execute(new OneTimeTask() {
  8. @Override
  9. public void run() {
  10. next.invokeChannelActive();
  11. }
  12. });
  13. }
  14. return this;
  15. }

这个地方就和Outbound事件类似了,

invokeChannelActive 方法如下:

  1. private void invokeChannelActive() {
  2. try {
  3. ((ChannelInboundHandler) handler()).channelActive(this);
  4. } catch (Throwable t) {
  5. notifyHandlerException(t);
  6. }
  7. }

这个方法和 Outbound 的对应方法(例如 invokeConnect) 如出一辙. 同 Outbound 一样, 如果用户没有重写 channelActive 方法, 那么会调用 ChannelInboundHandlerAdapter 的 channelActive 方法:

  1. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  2. ctx.fireChannelActive();
  3. }

同样地, 在 ChannelInboundHandlerAdapter.channelActive 中,会产生事件调用循环:

  1. Context.fireChannelActive -> Connect.findContextInbound -> nextContext.invokeChannelActive -> nextHandler.channelActive -> nextContext.fireChannelActive

同理, tail 本身 既实现了 ChannelInboundHandler 接口, 又实现了 ChannelHandlerContext 接口, 因此当 channelActive 消息传递到 tail 后, 会将消息转递到对应的 ChannelHandler 中处理, 而恰好, tail 的 handler() 返回的就是 tail 本身:

  1. @Override
  2. public ChannelHandler handler() {
  3. return this;
  4. }

因此 channelActive Inbound 事件最终是在 tail 中处理的, 看一下它的处理方法:

  1. @Override
  2. public void channelActive(ChannelHandlerContext ctx) throws Exception { }

TailContext.channelActive 方法是空的. 如果查看 TailContext 的 Inbound 处理方法时, 会发现, 它们的实现都是空的. 可见, 如果是 Inbound, 当用户没有实现自定义的处理器时, 那么默认是不处理的.

用一幅图来总结一下 Inbound 的传输过程:



总结

对于 Outbound事件:

对于 Inbound 事件:

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注