@adamhand
2019-03-30T09:51:08.000000Z
字数 9193
阅读 1220
netty 4.1
服务端启动的关键代码如下:
// Configure the server.EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();final EchoServerHandler serverHandler = new EchoServerHandler();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}//p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(serverHandler);}});// Start the server.ChannelFuture f = b.bind(PORT).sync();// Wait until the server socket is closed.f.channel().closeFuture().sync();} finally {// Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
和客户端差不多,服务端启动的时候也是做了下面几件事:
client端初始化channel的时候类型为NioSocketChannel,而此处服务端的类型为NioServerSocketChannel。
和客户端一样,.channel()其实是AbstractBootstrap类中的函数,它返回的是一个ReflectiveChannelFactory类的ChannelFactory,ReflectiveChannelFactory的构造函数关键代码如下,它最主要的作用就是将constructor的值保存,而clazz就是从.channel()中传进去的NioServerSocketChannel。此时Channel并没有初始化,而是等到bind函数的时候才初始化的。
public ReflectiveChannelFactory(Class<? extends T> clazz) {...this.constructor = clazz.getConstructor();...}
还是先看一下NioServerSocketChannel的继承关系图:
沿着bind()函数往下走,一路找到AbstractBootstrap.doBind()函数,它的关键逻辑如下:
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();...ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);...}
接下来看一下initAndRegister()函数,它的关键逻辑如下:
final ChannelFuture initAndRegister() {Channel channel = null;channel = channelFactory.newChannel();init(channel);...ChannelFuture regFuture = config().group().register(channel);...}
此处的channelFactory就是上一节中的ReflectiveChannelFactory,它的newChannel()方法如下,根据上一小节的分析,此处创建的是NioServerSocketChannel对象,调用的是NioServerSocketChannel的构造方法。
public T newChannel() {...return constructor.newInstance();...}
NioServerSocketChannel的构造方法如下:
public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}
和client初始化时一样,此处也是经过了多个构造方法的调用,并且调用了父类的构造方法,下面直接看一下这些构造方法:
public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());}protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;...ch.configureBlocking(false);...}protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}
调用NioServerSocketChannel()方法之后首先会进入newSocket()函数,之后会调用AbstractChannel()、AbstractNioChannel()、AbstractNioMessageChannel()和NioServerSocketChannel()完成初始化。先看一下newSocket()方法。
private static ServerSocketChannel newSocket(SelectorProvider provider) {..return provider.openServerSocketChannel();...}
可以看到,这里调用openServerSocketChannel()方法打开一个channel。如果顺着这个方法往下看,会发现最终绑定到了一个FileDescriptor。和client初始化时类似。这时,一个NioServerSocketChannel就创建完成了。
回到initAndRegister()方法,接下来会调用init()方法对channel进行初始化。追踪这个方法,进入ServerBootstrap.init(),关键代码如下:
void init(Channel channel) throws Exception {...ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}...p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}
这个方法主要是为channel绑定了一个ChannelPipeline,并且为这个ChannelPipeline添加了一个 ChannelInitializer, 而这个 ChannelInitializer 中添加了一个 ServerBootstrapAcceptor handler。下面是关注一下ServerBootstrapAcceptor handler类。
先看一下这个类的构造方法:
ServerBootstrapAcceptor(...) {...enableAutoReadTask = new Runnable() {@Overridepublic void run() {channel.config().setAutoRead(true);}};}
这里起了一个线程,将AutoRead设置为true。看这个构造方法的注释得知,这个线程的作用是用来处理线程(EventLoop)接受connection失败的情况。当某个EventLoop接受connection失败时,会将AutoRead设置为false并切换至另一个EventLoop,这个EventLoop会将AutoRead设置为true尝试接受connection。
ServerBootstrapAcceptor 中重写了 channelRead 方法, 其主要代码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);...childGroup.register(child).addListener(...);}
ServerBootstrapAcceptor 中的 childGroup 是构造此对象是传入的 currentChildGroup, 即workerGroup, 而 Channel 是一个 NioSocketChannel 的实例, 因此这里的 childGroup.register 就是将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联了。
那么现在的问题是, ServerBootstrapAcceptor.channelRead 方法是怎么被调用的呢? 其实当一个 client 连接到 server 时, Java 底层的 NIO ServerSocketChannel 会有一个 SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 NioServerSocketChannel.doReadMessages:
protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = javaChannel().accept();buf.add(new NioSocketChannel(this, ch));return 1;}
在 doReadMessages 中, 通过 javaChannel().accept() 获取到客户端新连接的 SocketChannel, 接着就实例化一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this), 由此可知, 创建的这个 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 。
接下来就经由 Netty 的 ChannelPipeline 机制, 将读取事件逐级发送到各个 handler 中, 于是就会触发前面提到的 ServerBootstrapAcceptor.channelRead 方法。
回到initAndRegister()方法,当init()方法调用完之后,会调用register()方法对channel进行注册。这个注册的过程和client启动的时候差不多,就不再赘述了。
回到AbstractBootstrap.doBind()方法,当channel注册完之后,会调用doBind0()方法。按照doBind0()方法走下去,会走到AbstractChannel.bind()方法,继续走下去,会走到DefaultChannelPipeline.bind(),分别如下:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}
服务器端的 handler 的添加过程和客户端的有点区别, 和 EventLoopGroup 一样, 服务器端的 handler 也有两个, 一个是通过 handler() 方法设置 handler 字段, 另一个是通过 childHandler() 设置 childHandler 字段。通过前面的 bossGroup 和 workerGroup 的分析, 其实在这里可以大胆地猜测: handler 字段与 accept 过程有关, 即这个 handler 负责处理客户端的连接请求; 而 childHandler 就是负责和客户端的连接的 IO 交互。
回到init()函数:
void init(Channel channel) throws Exception {...ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}...p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}
上面代码的 initChannel 方法中, 首先通过 handler() 方法获取一个 handler, 如果获取的 handler 不为空,则添加到 pipeline 中。 然后接着, 添加了一个 ServerBootstrapAcceptor 实例。 这里 handler() 方法返回的就是在服务器端的启动代码中设置的:
b.group(bossGroup, workerGroup)....handler(new LoggingHandler(LogLevel.INFO))
那么这个时候, pipeline 中的 handler 情况如下:
根据分析客户端的经验, 当 channel 绑定到 eventLoop 后(在这里是 NioServerSocketChannel 绑定到 bossGroup)中时, 会在 pipeline 中发出 fireChannelRegistered 事件, 接着就会触发 ChannelInitializer.initChannel 方法的调用。
因此在绑定完成后, 此时的 pipeline 的内如如下:
前面在分析 bossGroup 和 workerGroup 时, 已经知道了在 ServerBootstrapAcceptor.channelRead 中会为新建的 Channel 设置 handler 并注册到一个 eventLoop 中, 即:
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);...childGroup.register(child).addListener(...);}
而这里的 childHandler 就是在服务器端启动代码中设置的 handler:
b.group(bossGroup, workerGroup)....childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}//p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(new EchoServerHandler());}});
当这个客户端连接 Channel 注册后, 就会触发 ChannelInitializer.initChannel 方法的调用, 此后的客户端连接的 ChannelPipeline 状态如下:
最后总结一下服务器端的 handler 与 childHandler 的区别与联系:
下面用一幅图来总结一下服务器端的 handler 添加流程: