@FunC
2018-05-15T19:24:41.000000Z
字数 8631
阅读 1858
unix
I/O 复用典型使用场合:
* 处理多个描述符时(如交互式输入和网络套接字)
* 一个用户处理多个套接字
* 一个TCP服务器既要处理监听套接字,又要处理已连接套接字时。
* 一个服务器既要处理TCP,又要处理UDP时
* 一个服务器要处理多个服务或多个协议
I/O复用并非只限于网络编程
阻塞式I/O
发起调用后,在成功返回前都是阻塞的。
非阻塞式I/O
必须立即返回,若未准备好泽返回一个错误。应用程序轮询(polling)内核,这种做法耗费大量CPU时间
I/O复用(select和poll)
进程阻塞在 select 系统调用中,但select可以处理不同情况的多个描述符(后面提到)
信号驱动式I/O
利用上一章提到的 sigaction
系统调用安装一个信号处理函数。安装完后进程不阻塞,继续执行。
收到SIGIO信号后,可以在信号处理函数中读取数据报,也可以立即通知主循环来读取数据报。
异步I/O(POSIX的aio_系列函数)
真正意义上的异步,系统调用完全不阻塞,在数据复制到进程后信号才发出。罕见
可见,前四种模型的第二阶段都是一样的,阻塞于recvfrom调用,区别在于第一阶段。
该函数允许进程指示内核对哪些描述符(的读、写或异常)感兴趣以及最长等待多久。任何条件满足则select唤醒进程:
#include <sys/select.h>
#include <sys/time.h>
int select(int maxfdpl, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);
/* 返回:若有就绪描述符则为其数目,若超时则为0,出错为-1 */
第一个参数maxfdp1指定待测试的描述符个数,值为待测试的最大描述符+1。描述符[0..maxfdp1-1]都会被测试。(设计该参数的目的在于提高效率,避免测试全部1024个描述符)
参数2~4指定我们让内核测试读、写和异常的描述符集合。描述符集通常用整数数组表示,使用位(bit)来标记。具体实现细节与应用程序无关,可以通过一下四个宏来操作:
void FD_ZERO(fd_set *fdset);
void FD_SET(int fd, fd_set *fdset);
void FD_CLR(int fd, fd_set *fdset);
void FD_ISSET(int fd, fd_set *fdset);
如果我们对readset, writeset和exceptset中任何一个条件不感兴趣,可以设置为空指针(全部为空时则可实现一个精度为微秒的定时器)。
注意这三个参数都是 值-结果 参数,也就是函数会修改其中的值,如果相应的描述符已经就绪,函数会将相应的位置0。因此,每次重新调用select函数时,需要再次把所有描述符集内所有关心的位置1。
因为是 值-结果 参数,参数的内容会被修改,因此需要自行记录好关心的描述符,并自行即使清除使用完毕的描述符。
最后一个参数是最大等待时长,结构如下:
struct timeval{
long tv_sec; /* 秒 */
long tv_usec; /* 微秒 */
}
有三种可能:
1. 永远等下去,知道有描述符准备好再返回:设置为空指针
2. 等一段固定时间
3. 根本不等待,检查后立即返回,即轮询(polling)。将秒和微秒都设置为0
同时参数中的const限定词表示它不会被select所修改
我们需要进一步明确说明具体的就绪条件是什么(即什么时候可以读、写或者异常):
需要注意到,当某个套接字上发生错误时,select会将其标志为既可读又可写。
在第五章中的str_cli函数在阻塞于fgets调用时,如果发生了某些事件,将无法得知。有了select函数后,我们就可以改为阻塞与select调用。这意味着我们既可以等待标准输入可读,也可以等待套接字可读:
修改后的版本如下:
#include "unp.h"
void str_cli(FILE *fp, int sockfd)
{
int maxfdp1;
fd_set rset;
char sendline[MAXLINE], recvline[MAXLINE];
FD_ZERO(&rset); /* 描述符集初始化 */
for ( ; ; ) {
FD_SET(fileno(fp), &rset);
FD_SET(sockfd, &rset);
maxfdp1 = max(fileno(fp), sockfd) + 1;
Select(maxfdp1, &rset, NULL, NULL, NULL);
// 分别检查两种情况是否可读,若可读则采取相应操作
if (FD_ISSET(sockfd, &rset)) { /* socket is readable */
if (Readline(sockfd, recvline, MAXLINE) == 0)
err_quit("str_cli: server terminated prematurely");
Fputs(recvline, stdout);
}
if (FD_ISSET(fileno(fp), &rset)) { /* input is readable */
// 注意此处在收到EOF时便结束,但是存在某些问题(见下文)
if (Fgets(sendline, MAXLINE, fp) == NULL)
return; /* all done */
Writen(sockfd, sendline, strlen(sendline));
}
}
}
上面的修订版代码在交互式输入的场景下没有太大问题,但在Unix的shell环境下,重定向标准输入和标准输入是轻而易举的事,所以我们能很容易地以批量的形式运行client。这时就会出现一个问题:
可见,我们在发送请求9(也就是EOF)的时候,根据代码,这时fgets返回NULL,str_cli函数返回至main函数,随后终止。然而这时我们才刚收到应答2。
也就是说,请求发送完毕后应该只关闭写(请求)的一半,然后等待全部应答都收到后再关闭剩下的读的一半。这点可以通过接下来要说的shutdown函数来实现。
同样的情况出现在使用了缓冲机制的函数中,如stdio等。原因是stdio缓冲区还有数据待消费,但select不知道,它只关心read系统调用有无数据可读,这时就出现了错误。
通常终止网络连接使用close函数,然而close函数有两个限制:
1. close只会把描述符的引用计数-1,而套接字仅在引用计数变为0时才会被关闭。如果使用shutdown则可以不理会引用计数,直接激发TCP的正常连接终止序列。
2. TCP是全双工的,当我们完成了数据发送,发送FIN后,对面可能仍有数据需要发送,这时就需要调用shutdown进行半关闭:
#include <sys/socket.h>
int shutdown(int sockfd, int howto);
/* 返回:若成功则为0,若出错则为-1 */
其行为取决于howto参数的值:
* SHUT_RD:关闭连接读的这一半。这样由该套接字接受的来自对端的任何数据都会被确认(ACK),然后悄然丢弃
* SHUT_WR:关闭连接写的这一半,对于TCP套接字,这成为半关闭。当前留在套接字发送缓冲区的数据将被发送掉,然后跟着TCP的正常连接终止序列(FIN)。
* SHUT_RDWR:相当于调用两次shutdown分别关闭读写两部分。
这次修订将解决过早关闭套接字,并避免了对缓冲区带来的问题。
#include "unp.h"
void str_cli(FILE *fp, int sockfd)
{
int maxfdp1, stdineof;
fd_set rset;
char buf[MAXLINE];
int n;
stdineof = 0;
FD_ZERO(&rset);
for ( ; ; ) {
// 若stdineof标志为0,则用select检查标准输入
if (stdineof == 0)
FD_SET(fileno(fp), &rset);
FD_SET(sockfd, &rset);
maxfdp1 = max(fileno(fp), sockfd) + 1;
Select(maxfdp1, &rset, NULL, NULL, NULL);
if (FD_ISSET(sockfd, &rset)) { /* socket is readable */
// 改用read调用,避免缓冲区的影响
if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {
// 套接字上读到EOF时,如果标准输入已经读入EOF,则是正常退出
if (stdineof == 1)
return; /* normal termination */
else
err_quit("str_cli: server terminated prematurely");
}
Write(fileno(stdout), buf, n);
}
if (FD_ISSET(fileno(fp), &rset)) { /* input is readable */
if ( (n = Read(fileno(fp), buf, MAXLINE)) == 0) {
// 标准输入读到EOF时,将stdineof标志置1
stdineof = 1;
// 半关闭
Shutdown(sockfd, SHUT_WR); /* send FIN */
// 清理rset
FD_CLR(fileno(fp), &rset);
// 进入下轮循环
continue;
}
Writen(sockfd, buf, n);
}
}
}
上面我们用select处理了标准输入描述符和套接字描述符。既然select可以处理多个描述符,那先前为每一个新的连接都fork一个新的子进程的做法,则能改为由单一进程配合select来代替。
上面提到过,select的参数(如rset)是 值-结果 参数,也就是说其内容会发生变化,因此我们需要另外维护一个数组,来方便我们维护当前的连接。这个数组的大小是FD_SETSIZE和内核允许本进程打开的最大描述符书的较小者。
下面给出实现
初始化部分:
/* 初始化 */
#include "unp.h"
int main(int argc, char **argv)
{
int i, maxi, maxfd, listenfd, connfd, sockfd;
int nready, client[FD_SETSIZE];
ssize_t n;
fd_set rset, allset;
char buf[MAXLINE];
socklen_t clilen;
struct sockaddr_in cliaddr, servaddr;
listenfd = Socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);
Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));
Listen(listenfd, LISTENQ);
maxfd = listenfd; /* initialize */
maxi = -1; /* 已连接的套接字数组client[]中的最大下标 */
for (i = 0; i < FD_SETSIZE; i++)
client[i] = -1; /* -1 代表该位置可用 */
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
/* end 初始化 */
/* 循环 */
for ( ; ; ) {
rset = allset; /* structure assignment */
nready = Select(maxfd+1, &rset, NULL, NULL, NULL);
if (FD_ISSET(listenfd, &rset)) { /* new client connection */
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (SA *) &cliaddr, &clilen);
#ifdef NOTDEF
printf("new client: %s, port %d\n",
Inet_ntop(AF_INET, &cliaddr.sin_addr, 4, NULL),
ntohs(cliaddr.sin_port));
#endif
for (i = 0; i < FD_SETSIZE; i++)
if (client[i] < 0) {
client[i] = connfd; /* 记录新的描述符 */
break;
}
if (i == FD_SETSIZE)
err_quit("too many clients");
FD_SET(connfd, &allset); /* 将新的描述符加入集合中 */
if (connfd > maxfd)
maxfd = connfd; /* for select */
if (i > maxi)
maxi = i; /* 更新最大下标 */
if (--nready <= 0)
continue; /* 如果没有其他可读描述符,则直接进入下轮循环 */
}
for (i = 0; i <= maxi; i++) { /* check all clients for data */
if ( (sockfd = client[i]) < 0)
continue;
if (FD_ISSET(sockfd, &rset)) {
if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {
/* connection closed by client */
Close(sockfd);
FD_CLR(sockfd, &allset);
client[i] = -1;
} else
Writen(sockfd, buf, n);
if (--nready <= 0)
break; /* no more readable descriptors */
}
}
}
}
/* end 循环 */
当一个服务器在处理多个客户时,它绝对不能阻塞于只与单个客户相关的某个函数调用。否则可能导致服务器被刮起,拒绝为其他客户提供服务。
可能的解决办法:
1. 使用非阻塞式I/O
2. 让每个客户由单独的控制线程提供服务(但要注意资源耗尽攻击)
3. 对I/O操作设置一个超时
#include <sys/select.h>
#include <signal.h>
#include <time.h>
int pselect(int maxfdp1, fd_set *readset, fd_set *writeset, fs_set *exceptset, const struct timespec *timeout, const sigset_t *sigmask);
/* 返回:返回就绪描述符数目,超时为0,出错为-1 */
pselect类似于select,不过有两个不同:
1. pselect使用的timespec结构,精度是纳秒:
struct timespec {
time_t tv_sec;
long tv_nsec;
}
考虑以下情况:
if (intr_flag) {
handle_intr();
}
// if SIGINT emit from here
if ( (nready = select( ... )) < 0) {
if (errno = EINTR) {
if (intr_flag)
handle_intr();
}
...
}
如果SIGINT信号在检查intr_flag后,调用select前,那么信号将丢失,同时select将永远阻塞。
有了pselect后:
sigset_t newmask, oldmask, zeromask;
sigemptyset(&zeromask);
sigemptyset(&newmask);
sigaddset(&newmask, SIGINT);
sigprocmask(SIG_BLOCK, &newmask, &oldmask); /* block SIGINT */
if (intr_flag) {
handle_intr();
}
if ( (nready = pselect( ..., &zeromask )) < 0) {
// after pselect, SIG came back
if (errno = EINTR) {
if (intr_flag)
handle_intr();
}
...
}
pselect函数在返回后,进程的信号掩码又被重置为调用pselect之前的值。
Poll提供的功能与select类似,不过在处理流设备时,它能提供额外的信息。
#include <poll.h>
int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);
/* 返回:返回就绪的描述符数目,超时为0,出错为-1 */
```c
第一个参数是指向一个结构数组第一个元素的指针。每个数组元素都是一个pollfd结构,用于指定测试某个给定描述符fd的条件:
<div class="md-section-divider"></div>
```c
struct pollfd {
int fd; /* 想检查的fd */
short events; /* 关心的events */
short revents; /* 发生的事件 */
}
也就是说以fd为单位来检查,区别于select以不同的状态的fd集合为单位。同时使用多了一个变量来保存返回结果,免去使用 值-结果 参数重新使用时需要重置的麻烦。
下面是用于指定events标志和revents标志的一些常值:
(注意其中POLLIN和POLLOUT都是更早出现的,常值保留是为了向后兼容)
而timeout参数则指定poll函数返回前等待多长时间。他是一个指定应等待毫秒数的值:
在select的使用中,我们需要手动记录描述符集合的数目。而在poll中,通知内核数组的长度则成了调用者的责任,内核不再需要知道类似fd_set的固定大小的数据类型。
我们现在用poll替代select重写TCP回射服务器程序(关键部分已注释):
#include "unp.h"
#include <limits.h> /* for OPEN_MAX */
int main(int argc, char **argv)
{
// ... 省略初始化部分
client[0].fd = listenfd; /* pollfd 的第一个fd是监听套接字 */
client[0].events = POLLRDNORM;
for (i = 1; i < OPEN_MAX; i++)
client[i].fd = -1; /* 设为-1代表该位置可用 */
maxi = 0; /* max index into client[] array */
/* 初始化结束 */
/* 使用poll部分 */
for ( ; ; ) {
nready = Poll(client, maxi+1, INFTIM);
if (client[0].revents & POLLRDNORM) { /* 有新的连接 */
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (SA *) &cliaddr, &clilen);
#ifdef NOTDEF
printf("new client: %s\n", Sock_ntop((SA *) &cliaddr, clilen));
#endif
for (i = 1; i < OPEN_MAX; i++)
if (client[i].fd < 0) {
client[i].fd = connfd; /* 找出可用的位置,并记录fd */
break;
}
if (i == OPEN_MAX)
err_quit("too many clients");
client[i].events = POLLRDNORM;
if (i > maxi)
maxi = i; /* max index in client[] array */
if (--nready <= 0)
continue; /* 当前没有可用描述符,等待下一次poll返回 */
}
for (i = 1; i <= maxi; i++) { /* 检查已连接的客户有无新数据 */
if ( (sockfd = client[i].fd) < 0)
continue;
if (client[i].revents & (POLLRDNORM | POLLERR)) {
if ( (n = read(sockfd, buf, MAXLINE)) < 0) {
if (errno == ECONNRESET) {
#ifdef NOTDEF
printf("client[%d] aborted connection\n", i);
#endif
Close(sockfd);
client[i].fd = -1; /* 出错则关闭连接并置-1 */
} else
err_sys("read error");
} else if (n == 0) {
#ifdef NOTDEF
printf("client[%d] closed connection\n", i);
#endif
Close(sockfd);
client[i].fd = -1;
} else
Writen(sockfd, buf, n);
if (--nready <= 0)
break; /* no more readable descriptors */
}
}
}
}