[关闭]
@FunC 2018-05-15T19:24:41.000000Z 字数 8631 阅读 1858

UNP CH06

unix


I/O 复用:select 和 poll 函数

I/O 复用典型使用场合:
* 处理多个描述符时(如交互式输入和网络套接字)
* 一个用户处理多个套接字
* 一个TCP服务器既要处理监听套接字,又要处理已连接套接字时。
* 一个服务器既要处理TCP,又要处理UDP时
* 一个服务器要处理多个服务或多个协议

I/O复用并非只限于网络编程

五种I/O模型

  1. 阻塞式I/O

    发起调用后,在成功返回前都是阻塞的。

  2. 非阻塞式I/O

    必须立即返回,若未准备好泽返回一个错误。应用程序轮询(polling)内核,这种做法耗费大量CPU时间

  3. I/O复用(select和poll)

    进程阻塞在 select 系统调用中,但select可以处理不同情况的多个描述符(后面提到)

  4. 信号驱动式I/O

    利用上一章提到的 sigaction 系统调用安装一个信号处理函数。安装完后进程不阻塞,继续执行。
    收到SIGIO信号后,可以在信号处理函数中读取数据报,也可以立即通知主循环来读取数据报。

  5. 异步I/O(POSIX的aio_系列函数)

    真正意义上的异步,系统调用完全不阻塞,在数据复制到进程后信号才发出。罕见

简单比较

可见,前四种模型的第二阶段都是一样的,阻塞于recvfrom调用,区别在于第一阶段。

select 函数

该函数允许进程指示内核对哪些描述符(的读、写或异常)感兴趣以及最长等待多久。任何条件满足则select唤醒进程:

  1. #include <sys/select.h>
  2. #include <sys/time.h>
  3. int select(int maxfdpl, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);
  4. /* 返回:若有就绪描述符则为其数目,若超时则为0,出错为-1 */

第一个参数maxfdp1指定待测试的描述符个数,值为待测试的最大描述符+1。描述符[0..maxfdp1-1]都会被测试。(设计该参数的目的在于提高效率,避免测试全部1024个描述符)

参数2~4指定我们让内核测试读、写和异常的描述符集合。描述符集通常用整数数组表示,使用位(bit)来标记。具体实现细节与应用程序无关,可以通过一下四个宏来操作:

  1. void FD_ZERO(fd_set *fdset);
  2. void FD_SET(int fd, fd_set *fdset);
  3. void FD_CLR(int fd, fd_set *fdset);
  4. void FD_ISSET(int fd, fd_set *fdset);

如果我们对readset, writeset和exceptset中任何一个条件不感兴趣,可以设置为空指针(全部为空时则可实现一个精度为微秒的定时器)。

注意这三个参数都是 值-结果 参数,也就是函数会修改其中的值,如果相应的描述符已经就绪,函数会将相应的位置0。因此,每次重新调用select函数时,需要再次把所有描述符集内所有关心的位置1。

因为是 值-结果 参数,参数的内容会被修改,因此需要自行记录好关心的描述符,并自行即使清除使用完毕的描述符。

最后一个参数是最大等待时长,结构如下:

  1. struct timeval{
  2. long tv_sec; /* 秒 */
  3. long tv_usec; /* 微秒 */
  4. }

有三种可能:
1. 永远等下去,知道有描述符准备好再返回:设置为空指针
2. 等一段固定时间
3. 根本不等待,检查后立即返回,即轮询(polling)。将秒和微秒都设置为0

同时参数中的const限定词表示它不会被select所修改

描述符的就绪条件

我们需要进一步明确说明具体的就绪条件是什么(即什么时候可以读、写或者异常):

需要注意到,当某个套接字上发生错误时,select会将其标志为既可读又可写。

str_cli 函数(修订版)

在第五章中的str_cli函数在阻塞于fgets调用时,如果发生了某些事件,将无法得知。有了select函数后,我们就可以改为阻塞与select调用。这意味着我们既可以等待标准输入可读,也可以等待套接字可读:

修改后的版本如下:

  1. #include "unp.h"
  2. void str_cli(FILE *fp, int sockfd)
  3. {
  4. int maxfdp1;
  5. fd_set rset;
  6. char sendline[MAXLINE], recvline[MAXLINE];
  7. FD_ZERO(&rset); /* 描述符集初始化 */
  8. for ( ; ; ) {
  9. FD_SET(fileno(fp), &rset);
  10. FD_SET(sockfd, &rset);
  11. maxfdp1 = max(fileno(fp), sockfd) + 1;
  12. Select(maxfdp1, &rset, NULL, NULL, NULL);
  13. // 分别检查两种情况是否可读,若可读则采取相应操作
  14. if (FD_ISSET(sockfd, &rset)) { /* socket is readable */
  15. if (Readline(sockfd, recvline, MAXLINE) == 0)
  16. err_quit("str_cli: server terminated prematurely");
  17. Fputs(recvline, stdout);
  18. }
  19. if (FD_ISSET(fileno(fp), &rset)) { /* input is readable */
  20. // 注意此处在收到EOF时便结束,但是存在某些问题(见下文)
  21. if (Fgets(sendline, MAXLINE, fp) == NULL)
  22. return; /* all done */
  23. Writen(sockfd, sendline, strlen(sendline));
  24. }
  25. }
  26. }

批量输入

上面的修订版代码在交互式输入的场景下没有太大问题,但在Unix的shell环境下,重定向标准输入和标准输入是轻而易举的事,所以我们能很容易地以批量的形式运行client。这时就会出现一个问题:

可见,我们在发送请求9(也就是EOF)的时候,根据代码,这时fgets返回NULL,str_cli函数返回至main函数,随后终止。然而这时我们才刚收到应答2。

也就是说,请求发送完毕后应该只关闭写(请求)的一半,然后等待全部应答都收到后再关闭剩下的读的一半。这点可以通过接下来要说的shutdown函数来实现。

同样的情况出现在使用了缓冲机制的函数中,如stdio等。原因是stdio缓冲区还有数据待消费,但select不知道,它只关心read系统调用有无数据可读,这时就出现了错误。

shutdown函数

通常终止网络连接使用close函数,然而close函数有两个限制:
1. close只会把描述符的引用计数-1,而套接字仅在引用计数变为0时才会被关闭。如果使用shutdown则可以不理会引用计数,直接激发TCP的正常连接终止序列。
2. TCP是全双工的,当我们完成了数据发送,发送FIN后,对面可能仍有数据需要发送,这时就需要调用shutdown进行半关闭:

  1. #include <sys/socket.h>
  2. int shutdown(int sockfd, int howto);
  3. /* 返回:若成功则为0,若出错则为-1 */

其行为取决于howto参数的值:
* SHUT_RD:关闭连接读的这一半。这样由该套接字接受的来自对端的任何数据都会被确认(ACK),然后悄然丢弃
* SHUT_WR:关闭连接写的这一半,对于TCP套接字,这成为半关闭。当前留在套接字发送缓冲区的数据将被发送掉,然后跟着TCP的正常连接终止序列(FIN)。
* SHUT_RDWR:相当于调用两次shutdown分别关闭读写两部分。

str_cli函数(再修订版)

这次修订将解决过早关闭套接字,并避免了对缓冲区带来的问题。

  1. #include "unp.h"
  2. void str_cli(FILE *fp, int sockfd)
  3. {
  4. int maxfdp1, stdineof;
  5. fd_set rset;
  6. char buf[MAXLINE];
  7. int n;
  8. stdineof = 0;
  9. FD_ZERO(&rset);
  10. for ( ; ; ) {
  11. // 若stdineof标志为0,则用select检查标准输入
  12. if (stdineof == 0)
  13. FD_SET(fileno(fp), &rset);
  14. FD_SET(sockfd, &rset);
  15. maxfdp1 = max(fileno(fp), sockfd) + 1;
  16. Select(maxfdp1, &rset, NULL, NULL, NULL);
  17. if (FD_ISSET(sockfd, &rset)) { /* socket is readable */
  18. // 改用read调用,避免缓冲区的影响
  19. if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {
  20. // 套接字上读到EOF时,如果标准输入已经读入EOF,则是正常退出
  21. if (stdineof == 1)
  22. return; /* normal termination */
  23. else
  24. err_quit("str_cli: server terminated prematurely");
  25. }
  26. Write(fileno(stdout), buf, n);
  27. }
  28. if (FD_ISSET(fileno(fp), &rset)) { /* input is readable */
  29. if ( (n = Read(fileno(fp), buf, MAXLINE)) == 0) {
  30. // 标准输入读到EOF时,将stdineof标志置1
  31. stdineof = 1;
  32. // 半关闭
  33. Shutdown(sockfd, SHUT_WR); /* send FIN */
  34. // 清理rset
  35. FD_CLR(fileno(fp), &rset);
  36. // 进入下轮循环
  37. continue;
  38. }
  39. Writen(sockfd, buf, n);
  40. }
  41. }
  42. }

TCP回射服务器程序(修订版)

上面我们用select处理了标准输入描述符和套接字描述符。既然select可以处理多个描述符,那先前为每一个新的连接都fork一个新的子进程的做法,则能改为由单一进程配合select来代替。

上面提到过,select的参数(如rset)是 值-结果 参数,也就是说其内容会发生变化,因此我们需要另外维护一个数组,来方便我们维护当前的连接。这个数组的大小是FD_SETSIZE和内核允许本进程打开的最大描述符书的较小者。

下面给出实现
初始化部分:

  1. /* 初始化 */
  2. #include "unp.h"
  3. int main(int argc, char **argv)
  4. {
  5. int i, maxi, maxfd, listenfd, connfd, sockfd;
  6. int nready, client[FD_SETSIZE];
  7. ssize_t n;
  8. fd_set rset, allset;
  9. char buf[MAXLINE];
  10. socklen_t clilen;
  11. struct sockaddr_in cliaddr, servaddr;
  12. listenfd = Socket(AF_INET, SOCK_STREAM, 0);
  13. bzero(&servaddr, sizeof(servaddr));
  14. servaddr.sin_family = AF_INET;
  15. servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  16. servaddr.sin_port = htons(SERV_PORT);
  17. Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));
  18. Listen(listenfd, LISTENQ);
  19. maxfd = listenfd; /* initialize */
  20. maxi = -1; /* 已连接的套接字数组client[]中的最大下标 */
  21. for (i = 0; i < FD_SETSIZE; i++)
  22. client[i] = -1; /* -1 代表该位置可用 */
  23. FD_ZERO(&allset);
  24. FD_SET(listenfd, &allset);
  25. /* end 初始化 */
  26. /* 循环 */
  27. for ( ; ; ) {
  28. rset = allset; /* structure assignment */
  29. nready = Select(maxfd+1, &rset, NULL, NULL, NULL);
  30. if (FD_ISSET(listenfd, &rset)) { /* new client connection */
  31. clilen = sizeof(cliaddr);
  32. connfd = Accept(listenfd, (SA *) &cliaddr, &clilen);
  33. #ifdef NOTDEF
  34. printf("new client: %s, port %d\n",
  35. Inet_ntop(AF_INET, &cliaddr.sin_addr, 4, NULL),
  36. ntohs(cliaddr.sin_port));
  37. #endif
  38. for (i = 0; i < FD_SETSIZE; i++)
  39. if (client[i] < 0) {
  40. client[i] = connfd; /* 记录新的描述符 */
  41. break;
  42. }
  43. if (i == FD_SETSIZE)
  44. err_quit("too many clients");
  45. FD_SET(connfd, &allset); /* 将新的描述符加入集合中 */
  46. if (connfd > maxfd)
  47. maxfd = connfd; /* for select */
  48. if (i > maxi)
  49. maxi = i; /* 更新最大下标 */
  50. if (--nready <= 0)
  51. continue; /* 如果没有其他可读描述符,则直接进入下轮循环 */
  52. }
  53. for (i = 0; i <= maxi; i++) { /* check all clients for data */
  54. if ( (sockfd = client[i]) < 0)
  55. continue;
  56. if (FD_ISSET(sockfd, &rset)) {
  57. if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {
  58. /* connection closed by client */
  59. Close(sockfd);
  60. FD_CLR(sockfd, &allset);
  61. client[i] = -1;
  62. } else
  63. Writen(sockfd, buf, n);
  64. if (--nready <= 0)
  65. break; /* no more readable descriptors */
  66. }
  67. }
  68. }
  69. }
  70. /* end 循环 */

拒绝服务型攻击

当一个服务器在处理多个客户时,它绝对不能阻塞于只与单个客户相关的某个函数调用。否则可能导致服务器被刮起,拒绝为其他客户提供服务。
可能的解决办法:
1. 使用非阻塞式I/O
2. 让每个客户由单独的控制线程提供服务(但要注意资源耗尽攻击)
3. 对I/O操作设置一个超时

pselect函数

  1. #include <sys/select.h>
  2. #include <signal.h>
  3. #include <time.h>
  4. int pselect(int maxfdp1, fd_set *readset, fd_set *writeset, fs_set *exceptset, const struct timespec *timeout, const sigset_t *sigmask);
  5. /* 返回:返回就绪描述符数目,超时为0,出错为-1 */

pselect类似于select,不过有两个不同:
1. pselect使用的timespec结构,精度是纳秒:

  1. struct timespec {
  2. time_t tv_sec;
  3. long tv_nsec;
  4. }
  1. pselect函数增加了第六个参数:一个指向信号掩码的指针。该参数允许程序先禁止递交某些信号,调用pselect后重新设置信号掩码。

考虑以下情况:

  1. if (intr_flag) {
  2. handle_intr();
  3. }
  4. // if SIGINT emit from here
  5. if ( (nready = select( ... )) < 0) {
  6. if (errno = EINTR) {
  7. if (intr_flag)
  8. handle_intr();
  9. }
  10. ...
  11. }

如果SIGINT信号在检查intr_flag后,调用select前,那么信号将丢失,同时select将永远阻塞。
有了pselect后:

  1. sigset_t newmask, oldmask, zeromask;
  2. sigemptyset(&zeromask);
  3. sigemptyset(&newmask);
  4. sigaddset(&newmask, SIGINT);
  5. sigprocmask(SIG_BLOCK, &newmask, &oldmask); /* block SIGINT */
  6. if (intr_flag) {
  7. handle_intr();
  8. }
  9. if ( (nready = pselect( ..., &zeromask )) < 0) {
  10. // after pselect, SIG came back
  11. if (errno = EINTR) {
  12. if (intr_flag)
  13. handle_intr();
  14. }
  15. ...
  16. }

pselect函数在返回后,进程的信号掩码又被重置为调用pselect之前的值。

poll函数

Poll提供的功能与select类似,不过在处理流设备时,它能提供额外的信息。

  1. #include <poll.h>
  2. int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);
  3. /* 返回:返回就绪的描述符数目,超时为0,出错为-1 */
  4. ```c
  5. 第一个参数是指向一个结构数组第一个元素的指针。每个数组元素都是一个pollfd结构,用于指定测试某个给定描述符fd的条件:
  6. <div class="md-section-divider"></div>
  7. ```c
  8. struct pollfd {
  9. int fd; /* 想检查的fd */
  10. short events; /* 关心的events */
  11. short revents; /* 发生的事件 */
  12. }

也就是说以fd为单位来检查,区别于select以不同的状态的fd集合为单位。同时使用多了一个变量来保存返回结果,免去使用 值-结果 参数重新使用时需要重置的麻烦。

下面是用于指定events标志和revents标志的一些常值:

(注意其中POLLIN和POLLOUT都是更早出现的,常值保留是为了向后兼容)

而timeout参数则指定poll函数返回前等待多长时间。他是一个指定应等待毫秒数的值:

在select的使用中,我们需要手动记录描述符集合的数目。而在poll中,通知内核数组的长度则成了调用者的责任,内核不再需要知道类似fd_set的固定大小的数据类型。

TCP回射服务器程序(再修订版)

我们现在用poll替代select重写TCP回射服务器程序(关键部分已注释):

  1. #include "unp.h"
  2. #include <limits.h> /* for OPEN_MAX */
  3. int main(int argc, char **argv)
  4. {
  5. // ... 省略初始化部分
  6. client[0].fd = listenfd; /* pollfd 的第一个fd是监听套接字 */
  7. client[0].events = POLLRDNORM;
  8. for (i = 1; i < OPEN_MAX; i++)
  9. client[i].fd = -1; /* 设为-1代表该位置可用 */
  10. maxi = 0; /* max index into client[] array */
  11. /* 初始化结束 */
  12. /* 使用poll部分 */
  13. for ( ; ; ) {
  14. nready = Poll(client, maxi+1, INFTIM);
  15. if (client[0].revents & POLLRDNORM) { /* 有新的连接 */
  16. clilen = sizeof(cliaddr);
  17. connfd = Accept(listenfd, (SA *) &cliaddr, &clilen);
  18. #ifdef NOTDEF
  19. printf("new client: %s\n", Sock_ntop((SA *) &cliaddr, clilen));
  20. #endif
  21. for (i = 1; i < OPEN_MAX; i++)
  22. if (client[i].fd < 0) {
  23. client[i].fd = connfd; /* 找出可用的位置,并记录fd */
  24. break;
  25. }
  26. if (i == OPEN_MAX)
  27. err_quit("too many clients");
  28. client[i].events = POLLRDNORM;
  29. if (i > maxi)
  30. maxi = i; /* max index in client[] array */
  31. if (--nready <= 0)
  32. continue; /* 当前没有可用描述符,等待下一次poll返回 */
  33. }
  34. for (i = 1; i <= maxi; i++) { /* 检查已连接的客户有无新数据 */
  35. if ( (sockfd = client[i].fd) < 0)
  36. continue;
  37. if (client[i].revents & (POLLRDNORM | POLLERR)) {
  38. if ( (n = read(sockfd, buf, MAXLINE)) < 0) {
  39. if (errno == ECONNRESET) {
  40. #ifdef NOTDEF
  41. printf("client[%d] aborted connection\n", i);
  42. #endif
  43. Close(sockfd);
  44. client[i].fd = -1; /* 出错则关闭连接并置-1 */
  45. } else
  46. err_sys("read error");
  47. } else if (n == 0) {
  48. #ifdef NOTDEF
  49. printf("client[%d] closed connection\n", i);
  50. #endif
  51. Close(sockfd);
  52. client[i].fd = -1;
  53. } else
  54. Writen(sockfd, buf, n);
  55. if (--nready <= 0)
  56. break; /* no more readable descriptors */
  57. }
  58. }
  59. }
  60. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注