[关闭]
@zhangever 2020-02-25T02:09:30.000000Z 字数 16350 阅读 2062

抽丝剥茧NIO

nio


从JDK NIO文档可以看到,Java将其划分成了三大块:Channel,Buffer以及Selector。
此处输入图片的描述
Channel抽象了IO设备(如网络/文件);Buffer封装了对数据的缓冲存储,Selector则是提供了一种可以以单线程非阻塞的方式,来处理多个连接。
下面我们围绕这几个核心概念,来认识一下NIO.

一 基础概念

要搞清楚网络IO,我们需要理解几个概念。这里以linux为例。

1.1 缓冲区

内核缓冲区:

OS会为每个网络连接(channel)分配一个发送缓冲区以及接收缓冲区(缓冲区大小通过tcp内核参数指定,详见下文)。所以一台服务器能支持多少个连接,必要前提就是内存必须够大。

网卡缓冲区:

类似内核缓冲区,硬件设备也有缓冲区. 对于网卡来说,也分为两个:发送缓冲区以及接收缓冲区。

发送数据的时候,应用通过系统调用把数据写入到内核缓冲区,驱动程序再把数据从内核缓冲区写到网卡缓冲区,最后网卡把数据从网卡缓冲区发到网络上去。在tcp协议栈中,当内核收到对方的ack后,才会把数据从内核缓冲区中删除
接收数据的时候,网卡把数据先放入网卡的缓冲区,然后通过驱动程序把网卡缓冲区的数据copy到内核缓冲区,同时通过中断通知应用读取数据。

1.2 数据流

一条典型的数据流如下:

Created with Raphaël 2.1.2app bufferos bufferether buffernetwork
  1. 1. 应用构建一条消息,通过系统调用write,写入到内核缓冲区
  2. 2. 网卡驱动程序把内核缓冲区的数据copy到网卡缓冲区中
  3. 3. 最后网卡把网卡缓冲区的消息发到网络上

对于基于jvm的应用来说,如果应用是在堆中产生的消息,还会额外多一次堆内内存到堆外内存的copy

Created with Raphaël 2.1.2app heap bufferapp heap-off bufferos bufferether buffernetwork

1.3 关键的tcp内核参数

SO_BACKLOG:

服务端TCP栈在处理客户端connect请求的过程中,会维护A/B两个队列. 服务端在跟客户端进行连接握手的时候,
1. 首先会收到客户端的SYN时(第一次握手),然后向客户端发送SYN ACK确认(第二次握手),TCP内核模块把客户端连接加入到A队列中,
2. 然后服务器接收到客户端发送的ACK时(第三次握手),TCP内核模块把客户端连接从A队列移动到B队列,连接完成,
3. 这时,应用程序的accept调用会返回。也就是说accept从B队列中取出完成了三次握手的连接。
B队列的长度就是SO_BACKLOG,当B队列长度已经等于SO_BACKLOG时,新的连接将会被TCP内核拒绝。
需要注意的是, 有一个内核参数net.core.somaxconn是B队列长度的最大值(默认长度为128),可见实际的B队列长度应该=min(SO_BACKLOG, somaxconn)
所以,如果backlog过小,可能会出现accept速度跟不上,A、B队列满了,导致新的客户端无法连接。要注意的是,backlog对应用支持的连接数并无影响,backlog影响的只是还没有被accept取出的连接(连接数很大程度上取决于服务器的内存大小
服务端处理客户端连接建立的过程, 可参考占小狼的博文

缓冲区参数:

TCP内核为每个连接分配的缓冲区大小默认分别为这两个内核参数值:net.ipv4.tcp_rmem/net.ipv4.tcp_wmem.

在Linux下,我们可以通过如下指令查看:

  1. # cat /proc/sys/net/ipv4/tcp_rmem
  2. 4096 87380 6291456

以上三个值分别为最小值/默认值/最大值.
其中默认值以及最大值又分别给net.core.rmem_default以及net.core.rmem_max覆盖.
应用程序可通过SO_RCVBUF/SO_SNDBUF来分别修改接收/发送缓冲区大小(但不能超过内核指定的最大值以及最小值)

1.4 问题

  1. 在Java IO操作中,为何使用堆外内存(heap-off)会比堆内存高效?

    一般来说,申请堆内存比申请堆外内存更快。
    但是,如上所述,当发生IO操作的时候,数据需从堆内复制到堆外,再把数据从用户态复制到核心态,相当于做了两次copy;而使用堆外内存的话,就只需要做一次copy(从用户态到核心态的copy).
    具体代码可参看java.net.SocketOutputStream对应的native代码

  2. 为何数据在经过IO的时候,需要两次copy?不能直接把堆内存数据直接传到内核么?

    简单来说,read/write等系统调用,需要传入buffer的地址。然而heapBuffer的话,由于GC的存在,地址会发生移动而heap-off不会. 更详细解释可参考知乎R大的解释.

  3. 服务端对客户端的连接,设置接收缓冲区大小为10(socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 10)),然后客户端发送长度为100的字符串过来,结果会如何?

    服务端将会正常的接收到所有字符.
    这个设置实际上并没有产生预期结果.
    在上述语句执行前, 该channel的接收缓冲区大小为net.core.rmem_default(此值优先)或者net.ipv4.tcp_rmem的第二个值;
    执行后,由于设置的值小于内核允许的最小值(net.ipv4.tcp_rmem的第一个值),最终该channel的接收缓冲区大小设置为内核允许的最小值.在本人机器上,该值为1024.

  4. 续4, 客户端发送长度为1500的字符串过来,已经超过了设置的接收缓冲区大小,结果会如何?会有消息丢失吗?

    服务端会触发2次可读事件,第一次读了1024个字符,第二次读了476个字符,且消息不会丢失.
    前面我们在说缓冲区的时候说到,除了有内核缓冲区,还会有一个硬件设备的缓冲区(这里是网卡的缓冲区).

  5. 为什么需要三次握手?三次握手解决啥问题?
    客户端和服务端通信前要进行连接,“3次握手”的作用就是双方都能明确自己和对方的收、发能力是正常的。
    第一次握手:客户端发送网络包,服务端收到了。这样服务端就能得出结论:客户端的发送能力、服务端的接收能力是正常的。
    第二次握手:服务端发包,客户端收到了。这样客户端就能得出结论:服务端的接收、发送能力,客户端的接收、发送能力是正常的。
    从客户端的视角来看,我接到了服务端发送过来的响应数据包,说明服务端接收到了我在第一次握手时发送的网络包,并且成功发送了响应数据包,这就说明,服务端的接收、发送能力正常。而另一方面,我收到了服务端的响应数据包,说明我第一次发送的网络包成功到达服务端,这样,我自己的发送和接收能力也是正常的。
    第三次握手:客户端发包,服务端收到了。这样服务端就能得出结论:客户端的接收、发送能力,服务端的发送、接收能力是正常的。
    第一、二次握手后,服务端并不知道客户端的接收能力以及自己的发送能力是否正常。而在第三次握手时,服务端收到了客户端对第二次握手作的回应。从服务端的角度,我在第二次握手时的响应数据发送出去了,客户端接收到了。所以,我的发送能力是正常的。而客户端的接收能力也是正常的。
    经历了上面的三次握手过程,客户端和服务端都确认了自己的接收、发送能力是正常的。之后就可以正常通信了。

三次握手
1. client->server: sync
2. server->client:sync+ack
3. client->server:ack

四次挥手
1. client->server: fin
2. server->client: ack
3. server->client: fin
4. client->server: ack

syn flood攻击

最基本的DoS攻击就是利用合理的服务请求来占用过多的服务资源,从而使合法用户无法得到服务的响应。syn flood属于Dos攻击的一种。

如果恶意的向某个服务器端口发送大量的SYN包,则可以使服务器打开大量的半开连接,分配TCB(Transmission Control Block), 从而消耗大量的服务器资源,同时也使得正常的连接请求无法被相应。当开放了一个TCP端口后,该端口就处于Listening状态,不停地监视发到该端口的Syn报文,一 旦接收到Client发来的Syn报文,就需要为该请求分配一个TCB,通常一个TCB至少需要280个字节,在某些操作系统中TCB甚至需要1300个字节,并返回一个SYN ACK命令,立即转为SYN-RECEIVED即半开连接状态。系统会为此耗尽资源

syn flood攻击

最基本的DoS攻击就是利用合理的服务请求来占用过多的服务资源,从而使合法用户无法得到服务的响应。syn flood属于Dos攻击的一种。

如果恶意的向某个服务器端口发送大量的SYN包,则可以使服务器打开大量的半开连接,分配TCB(Transmission Control Block), 从而消耗大量的服务器资源,同时也使得正常的连接请求无法被相应。当开放了一个TCP端口后,该端口就处于Listening状态,不停地监视发到该端口的Syn报文,一 旦接收到Client发来的Syn报文,就需要为该请求分配一个TCB,通常一个TCB至少需要280个字节,在某些操作系统中TCB甚至需要1300个字节,并返回一个SYN ACK命令,立即转为SYN-RECEIVED即半开连接状态。系统会为此耗尽资源
问题3/4可在3.1小节的NIO简单模式中验证.

二 NIO的底层实现机制

高性能的非阻塞IO的实现,依赖一个叫Selector的牛逼货,该组件也称IO多路复用器,多个网络连接可共用一个selector,从而实现单线程处理多个客户端连接的目的(传统的阻塞式IO,通常是一个连接需要一个线程去处理).

selector的机制如下:

  1. 系统在内核中创建一个selector, 对于不同客户端的连接(也就是NIO中的socketChannel),都可以被注册到同一个selector上. selector通过数组或者链表结构维护这众多的channel(在内核中表现为文件句柄).
  2. 当channel状态发生变化的时候(例如接收缓冲区收到数据会触发可读事件,发送缓冲区空闲会触发可写事件等),该channel会被selector打上ready的标记.
  3. 当应用程序调用selector.select()方法的时候,selector会轮询其所管辖的channel,把就绪的channel放到selectedKeys中.
  4. 应用遍历selectedKeys,并对每个channel的事件进行处理.

selector的底层实现有三种方式,分别使用select/poll/epoll系统调用.在内核2.6+的Linux上, Java使用的是epoll.
下面是三种实现的对比

实现 说明 性能
select 需要在用户态跟核心态之间相互copy大量文件句柄,文件句柄采用数组结构,数组大小跟内核参数有关 一般来说,少于1024个句柄下,性能优异;但由于采用盲轮询,句柄越多性能越差
poll 跟select没有本质差别,不同之处在于,文件句柄采用的是链表结构,理论上没限制数量 跟select一致
epoll 采用事件回调机制,只copy有效的文件句柄 性能优越,但如果侦听的连接数不多的话(例如少于1024),性能反而没有select/poll高

下面给三段伪代码说明上述三种机制,摘自知乎蓝形参的回复:

1 简单粗暴模式(非阻塞忙轮询):
所谓的忙,就是说这个机制永远在盲目的轮询

  1. while true {
  2. for channel in channels[] {
  3. if channel has data
  4. handle(channel)
  5. }
  6. }

2 select/poll模式:
首先通过select进入阻塞.当有IO事件的时候,从阻塞态中醒来.
是有目的的轮询,复杂度为O(n)

  1. while true {
  2. select(channels[])
  3. for channel in channels[] {
  4. if channel has data
  5. handle(channel)
  6. }
  7. }

3 epoll模式:
首先也是通过select进入阻塞.当有IO事件的时候,从阻塞态中醒来,并返回有IO事件发生的channel.
复杂度为O(1)

  1. while true {
  2. selected_channels[] = select(channels[])
  3. for channel in selected_channels[] {
  4. handle(channel)
  5. }
  6. }

最后再提一下epoll的LT模式以及ET模式

LT模式:level-trigger,水平触发模式,当某channel处于某种状态下(例如可读或者可写),如果关注了该事件,那么每次调用select都会返回该channel.
ET模式:edge-trigger,边缘触发模式,某channel只有在发生状态变化的时候,才会在select的时候返回该channel.
JDK的nio使用LT模式,而netty使用ET模式.
所以, 在JDK的原生NIO接口中, 我们可以在响应读事件的时候读一次就跑,反正如果没读完下次select会继续读.但是在netty中, 响应读事件的时候必须要一次读光缓冲区的内容.

三 经典的NIO使用模式

Talk is cheap, show the code.
下面从简单到复杂介绍两种nio的使用模式.

3.1 简单模式

下面的代码展示了一个使用了NIO的服务端应用. 目的是了解一下NIO的相关API.

  1. package nio.demo;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.net.SocketOptions;
  5. import java.net.StandardSocketOptions;
  6. import java.nio.ByteBuffer;
  7. import java.nio.channels.*;
  8. import java.util.Iterator;
  9. /**
  10. * Created by ever on 2017/11/24.
  11. */
  12. public class ServerDemo {
  13. private static final int BUF_SIZE = 1024;
  14. private static final int PORT = 8070;
  15. private static final int TIMEOUT = 3000;
  16. public static void main(String[] args) throws IOException, InterruptedException {
  17. selector();
  18. }
  19. /**
  20. * 初始化selector以及ServerSocketChannel
  21. * @throws IOException
  22. * @throws InterruptedException
  23. */
  24. private static void selector() throws IOException, InterruptedException {
  25. ServerSocketChannel ssc = ServerSocketChannel.open();
  26. Selector selector = Selector.open();
  27. ssc.socket().bind(new InetSocketAddress(PORT));
  28. ssc.configureBlocking(false);
  29. ssc.register(selector, SelectionKey.OP_ACCEPT);
  30. // 开始轮询直至天荒地老
  31. while (true) {
  32. if (selector.select(TIMEOUT) == 0) {
  33. System.out.println("No io events found");
  34. continue;
  35. }
  36. Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  37. while (iter.hasNext()) {
  38. SelectionKey key = iter.next();
  39. if (key.isAcceptable()) {
  40. handleAccept(key);
  41. }
  42. if (key.isWritable() && key.isValid()) {
  43. handleWrite(key);
  44. }
  45. if (key.isReadable()) {
  46. handleRead(key);
  47. }
  48. iter.remove();
  49. }
  50. }
  51. }
  52. private static void handleAccept(SelectionKey key) throws IOException {
  53. System.out.println("Handle accept event");
  54. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  55. SocketChannel sc = ssc.accept();
  56. sc.configureBlocking(false);
  57. //设置输入缓冲区的大小, 参看1.4小节问题3/4
  58. sc.setOption(StandardSocketOptions.SO_RCVBUF, 10);
  59. sc.register(key.selector(), SelectionKey.OP_READ);
  60. }
  61. /**
  62. * @param key
  63. * @throws IOException
  64. * @throws InterruptedException
  65. */
  66. private static void handleWrite(SelectionKey key) throws IOException, InterruptedException {
  67. System.out.println("Handle write event");
  68. SocketChannel sc = (SocketChannel) key.channel();
  69. ByteBuffer buffer = (ByteBuffer)key.attachment();
  70. if (buffer == null) {
  71. buffer = ByteBuffer.allocate(BUF_SIZE);
  72. buffer.put("return from server".getBytes());
  73. buffer.flip();
  74. key.attach(buffer);
  75. }
  76. int writes = sc.write(buffer);
  77. System.out.println("write bytes:" + writes);
  78. System.out.println("remain:" + buffer.remaining());
  79. if (buffer.remaining() == 0) {
  80. key.attach(null);
  81. key.interestOps(SelectionKey.OP_READ);
  82. }
  83. }
  84. private static void handleRead(SelectionKey key) throws IOException {
  85. System.out.println("Handle read event");
  86. SocketChannel sc = (SocketChannel) key.channel();
  87. ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
  88. long bytesRead = sc.read(buffer);
  89. System.out.println("read:" + bytesRead);
  90. if (bytesRead == -1) {
  91. System.out.println("Peer closed");
  92. sc.close();
  93. System.exit(0);
  94. }
  95. // 输出内容
  96. if (bytesRead > 0) {
  97. buffer.flip();
  98. while (buffer.hasRemaining()) {
  99. System.out.print((char) buffer.get());
  100. }
  101. }
  102. // won't have any effect this time, and will take effect after next select() invoked
  103. key.interestOps(SelectionKey.OP_WRITE|SelectionKey.OP_READ);
  104. }
  105. }

3.1.1 问题

  1. 假设在handleWrite方法中往客户端写入超过内核输出缓冲区大小的字节(假设内核的输出缓冲区大小为1024),会有何现象?

    这时会触发多次OP_WRITE事件,直至buffer中的数据全部写入内核缓冲区.

  2. 如果在handleAccept中,为新连接注册OP_CONNECT事件, 会有何后果?

    服务端将会陷入空转,select方法的timeout参数看似失效, 马上返回0,从而进入第40行,不断输出"===", 同时CPU飙升至100%.
    隔壁家老司机老王分析后认为是jdk跟内核关于OP_CONNECT事件的翻译机制出了问题.内核认为OP_CONNECT是可写事件,所以每次调用select都会马上返回;然而jdk这时候只认OP_CONNECT.
    可以理解为JDK的一个bug, 在netty中对该事件会做特殊处理:High CPU usage with NioEventLoop,相关处理代码

  3. 如果注释掉49行,也就是不处理accept事件,那么这个客户端的连接处于什么状态呢?能正常读写么?

    由于已经触发了OP_ACCEPT事件,说明TCP的三次握手已经完成,连接已经建立(见1.3 SO_BACKLOG内核参数的解释).但是由于服务端对这个连接没有注册读写事件,实际上并不能有效跟客户端通信.
    同时,由于没有完成accept操作,这个连接始终留在b队列中,从而反复触发OP_ACCEPT事件.

3.2 Reactor模式

这里主要参考Doug Lee在讲解NIO API时关于Reactor模型的描述.
事实上这个模式也应用于很多开源框架中,例如Netty

3.2.1 单线程Reactor模式

用单条线程来处理IO事件以及业务逻辑.

  1. public class Reactor implements Runnable {
  2. final Selector selector;
  3. final ServerSocketChannel serverSocket;
  4. Reactor(int port) throws IOException {
  5. selector = Selector.open();
  6. serverSocket = ServerSocketChannel.open();
  7. serverSocket.socket().bind(
  8. new InetSocketAddress(port));
  9. serverSocket.configureBlocking(false);
  10. SelectionKey sk =
  11. serverSocket.register(selector,
  12. SelectionKey.OP_ACCEPT);
  13. sk.attach(new Acceptor());
  14. }
  15. public void run() { // normally in a new Thread
  16. try {
  17. while (!Thread.interrupted()) {
  18. selector.select();
  19. Set selected = selector.selectedKeys();
  20. Iterator it = selected.iterator();
  21. while (it.hasNext())
  22. dispatch((SelectionKey) (it.next()));
  23. selected.clear();
  24. }
  25. } catch (IOException ex) { /* ... */ }
  26. }
  27. void dispatch(SelectionKey k) {
  28. Runnable r = (Runnable) (k.attachment());
  29. if (r != null)
  30. r.run();
  31. }
  32. class Acceptor implements Runnable { // inner
  33. public void run() {
  34. try {
  35. SocketChannel channel = serverSocket.accept();
  36. if (channel != null)
  37. new Handler(selector, channel);
  38. } catch (IOException ex) { /* ... */ }
  39. }
  40. }
  41. }
  42. final class Handler implements Runnable {
  43. private static final int MAXIN = 1024;
  44. private static final int MAXOUT = 1024;
  45. final SocketChannel socket;
  46. final SelectionKey sk;
  47. ByteBuffer input = ByteBuffer.allocate(MAXIN);
  48. ByteBuffer output = ByteBuffer.allocate(MAXOUT);
  49. static final int READING = 0, SENDING = 1;
  50. int state = READING;
  51. Handler(Selector sel, SocketChannel c)
  52. throws IOException {
  53. socket = c;
  54. c.configureBlocking(false);
  55. // Optionally try first read now
  56. sk = socket.register(sel, 0);
  57. sk.attach(this);
  58. sk.interestOps(SelectionKey.OP_READ);
  59. sel.wakeup();
  60. }
  61. boolean inputIsComplete() { /* ... */
  62. return true;
  63. }
  64. boolean outputIsComplete() { /* ... */
  65. return true;
  66. }
  67. void process() { /* ... */ }
  68. public void run() {
  69. try {
  70. if (state == READING) read();
  71. else if (state == SENDING) send();
  72. } catch (IOException ex) { /* ... */ }
  73. }
  74. void read() throws IOException {
  75. socket.read(input);
  76. if (inputIsComplete()) {
  77. process();
  78. state = SENDING;
  79. // Normally also do first write now
  80. sk.interestOps(SelectionKey.OP_WRITE);
  81. }
  82. }
  83. void send() throws IOException {
  84. socket.write(output);
  85. if (outputIsComplete()) sk.cancel();
  86. }
  87. }

单线程Reactor模型示意图如下:
此处输入图片的描述
主要流程:
1. Reactor启动ServerSocket ss以及Selector, 并把ss对应的selectionKey绑定Acceptor作为ss上事件的处理器.
2. Reactor在run方法里面进入无限循环,轮询Selector上的IO事件. 当有事件发生的时候,就通过dispatch分发出去
3. Acceptor负责处理ss的accept事件,并产生跟客户端的socket连接 s,同时把s注册到Selector上(关注READ事件,同时也关联了一个处理读写的Handler)

3.2.2 多线程Reactor模型

单线程模式下, IO线程既要处理IO事件也要处理业务逻辑. 当业务逻辑比较复杂序号比较大的耗时的时候,就会严重影响系统的吞吐量.
因而,IO线程跟业务逻辑线程分离是一个自然而然的设计.
此处输入图片的描述
如上图, 这里是用一个线程处理IO,然后通过线程池来处理业务逻辑.

3.2.3 主从Reactor模型

主从Reactor模型是一种非常流行的高性能NIO模型.在多线程Reactor模型的基础上, 分裂成2个Reactor,其中mainReactor主要用于处理客户端连接事件,subReactor主要处理已建立连接后的客户端读写事件.
此处输入图片的描述

四 Netty的IO以及线程模型

Netty的线程模型比较灵活,通过配置可以实现第三节所说的单线程/多线程模型,而官方推荐的是3.2.3中描述的主从Reactor模型,但又有所区别:
此处输入图片的描述
其中, mainReactor叫Boss, subReactor叫Worker.

下图从更贴近代码的角度看这个IO模型(原谅我做一次伸手党,下面是来自占小狼简书的配图):
此处输入图片的描述

这块占小狼对Netty的相关分析相当到位,自感无法超越,大家请移玉步.

这里主要以学习Netty时的思考问题为主.
首先,给出一个经典的服务端代码:

  1. public class EchoServer {
  2. private final int port;
  3. public EchoServer(int port) {
  4. this.port = port;
  5. }
  6. public void run() throws Exception {
  7. // Configure the server.
  8. // 1. 创建EventLoopGroup, 每个group有若干个EventLoop,每个EventLoop有一个selector
  9. EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
  10. EventLoopGroup workerGroup = new NioEventLoopGroup();
  11. try {
  12. ServerBootstrap b = new ServerBootstrap(); // (2)
  13. b.group(bossGroup, workerGroup)
  14. .channel(NioServerSocketChannel.class) // (3)
  15. .option(ChannelOption.SO_BACKLOG, 100)
  16. .handler(new LoggingHandler(LogLevel.INFO))
  17. .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
  18. @Override
  19. public void initChannel(SocketChannel ch) throws Exception {
  20. ch.pipeline().addLast(
  21. new EchoServerHandler());
  22. }
  23. });
  24. // Start the server.
  25. ChannelFuture f = b.bind(port).sync(); // (5)
  26. // Wait until the server socket is closed.
  27. f.channel().closeFuture().sync();
  28. } finally {
  29. // Shut down all event loops to terminate all threads.
  30. bossGroup.shutdownGracefully();
  31. workerGroup.shutdownGracefully();
  32. }
  33. }
  34. public static void main(String[] args) throws Exception {
  35. int port;
  36. if (args.length > 0) {
  37. port = Integer.parseInt(args[0]);
  38. } else {
  39. port = 8090;
  40. }
  41. new EchoServer(port).run();
  42. }
  43. }
  44. class EchoServerHandler extends ChannelInboundHandlerAdapter {
  45. private static final Logger logger = Logger.getLogger(
  46. EchoServerHandler.class.getName());
  47. @Override
  48. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  49. logger.log(Level.WARNING, "channelRead:" + msg);
  50. ctx.writeAndFlush(msg);
  51. }
  52. @Override
  53. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  54. logger.log(Level.WARNING, "channelReadComplete");
  55. }
  56. @Override
  57. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  58. // Close the connection when an exception is raised.
  59. logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
  60. ctx.close();
  61. }
  62. }

问题:
1.BossGroup跟WorkerGroup都是同一种类型(NioEventLoopGroup),那他们的角色分别是啥? 有啥讲究呢?

前者用于处理新客户端连接事件,后者用于处理建立好连接后的channel的IO事件.
默认情况下,一个NioEventLoopGroup会创建cpu核心数*2个EventLoop, 而我们知道每个EventLoop就有一个Selector.
如果你的服务端应用只绑定一个ip以及一个端口的话(99%的应用都是这种情况吧?),那么BossGroup里面只有一个EventLoop/Selector是起作用的.所以,建议BossGroup创建的时候指定只需要1个EventLoop:EventLoopGroup bossGroup = new NioEventLoopGroup(1);

2.上面的代码都看不到NIO的Selector,Channel,它们在哪?

每个NioEventLoop包括一个Selector,一个taskQueue(tq),而它本身又继承了Executor接口(从而拥有execute方法,但要注意一个NioEventLoop只有一条worker线程),同时还持有一个Executor成员e以及一个Thread成员t(这就是所谓的worker线程).
在ServerBootStrap的bind方法中,会创建一条NioServerSocketChannel并注册到bossGroup的一个EventLoop的Selector上.

3.什么时候关注OP_ACCEPT事件的呢?

4.1 ServerBootStrap.bind会通过initAndRegister创建一条NIOServerSocketChannel(在构造函数中会初始化其pipeline), 并初始化该channel(通过ChannelInitializer创建一个ServerBootstrapAcceptor)
4.2 然后是selector的注册. 在boss eventLoopGroup中找到一个EventLoop, 执行register操作, 并最终在unsafe对象中生成registerTask.
4.3 registerTask通过eventLoop.execute提交到任务列表中并执行. 这时候eventLoop的线程t会创建并进入到无限循环的run方法中.
4.4 在registerTask中, 给channel注册到eventLoop中,但op位为0:
selectionKey = javaChannel().register(eventLoop().selector, 0, this),
之后,
pipeline.fireChannelRegistered();
pipeline.fireChannelActive();

第一句触发ChannelInitializer.channelRegistered方法,最终把ServerBootstrapAcceptor加入到pipeline中.
第二句触发handler.read方法, 最终在doBeginRead中完成对OP_ACCEPT事件的关注.

4.某个连接的多次数据接收,是否总是在同一个线程中执行?

上面说到,某个连接是注册到workerGroup中的某个EventLoop的Selector的,该Selector的所有io事件都由该EventLoop的线程池处理,而在netty 4.x中, 这个线程池只有一个线程(SingleThreadEventLoop).
故某连接的所有io事件,都由同一个线程处理.

5.write&Flush 的线程处理模式?

netty中ctx.write有两种情况:
1. io线程(且要是该socket对应的EventLoop的io线程)直接调用ctx.write,这时候会直接写到ChannelOutBoundBuffer里面.
2. 如果是非IO线程调用ctx.write,那么会把返回消息封装成一个task扔到EventLoop的taskQueue里面.然后在后面的循环中, 返回消息会给io线程写到ChannelOutBoundBuffer里面.
如果是writeAndFlush, 那么相当于write完后再调用一次flush. flush的作用是把ChannelOutBoundBuffer里面的数据真正写入到Tcp内核缓冲区并通过网卡发送出去.

6.多个业务线程同时往客户端进行写操作,是否会出现包缠绕?例如线程A,B,C都同时写1个1M的返回包,是否会出现客户端收到 a1 b1 a2 b2 c1 ...

由上面第五个问题可知, 不会.
A,B,C这些业务线程实际上在写数据的时候,是各自把数据封装成一个task扔到eventLoop的任务队列里面, 然后后面的循环中(不一定是下一次哦), io线程依次取出每一个任务(串行)写到channelOutboundBuffer中, 然后有flush调用的时候返回给客户端.
尽管1M的包有点大, 但它们拆包的时候也是同一个包按顺序发送, 不至于出现乱序, 这个是由TCP的机制来保证的.

下面的代码是netty对write操作的处理(AbstractChannelHandlerContext)

  1. private void write(Object msg, boolean flush, ChannelPromise promise) {
  2. AbstractChannelHandlerContext next = findContextOutbound();
  3. EventExecutor executor = next.executor();
  4. if (executor.inEventLoop()) {
  5. next.invokeWrite(msg, promise);
  6. if (flush) {
  7. next.invokeFlush();
  8. }
  9. } else {
  10. AbstractWriteTask task;
  11. if (flush) {
  12. task = WriteAndFlushTask.newInstance(next, msg, promise);
  13. } else {
  14. task = WriteTask.newInstance(next, msg, promise);
  15. }
  16. safeExecute(executor, task, promise, msg);
  17. }
  18. }

7.Netty如何处理TCP粘包/拆包的问题的?

一般可通过三种方式:
3.1 固定长度. 通讯双方约定好每个消息的长度.
3.2 分隔符.例如回车换行或者约定好的特定分隔符.
3.3 消息头+消息体,其中消息头指定消息的长度.
Netty对于上述的三种方案都有现成的编解码器. 而实际中应用最广泛的是第三种.

五 Netty最佳实践

1. 避免在业务线程中触碰netty

netty应该作为一个框架级的基础通讯组件, 在业务层中不应该出现netty的相关操作.比如写操作, 因为IO线程是异步的, 你往channel写的东西,很可能在真正序列化之前,已经给业务线程又修改的面目全非了.

六 结束语

本文在快塑网CTO王在祥(江湖人称老王)指导下完成,谨表感激. 在写作本文的过程中也多次跟唯品会架构部的梁哥交流讨教, 同时在拜读唯品会另一大神白衣大大的文章中获益良多,占小狼简书上的源码分析也很到位, 一并感谢.

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注