@lishuhuakai
2016-11-15T14:40:18.000000Z
字数 5788
阅读 1674
这一次的版本我们在第一版的基础上添加了一些有意思的东西,增强了我们的功能.
代码的实现里有一些很有意思的地方,下载限速和上传限速就是其中一例,这些东西听起来非常高大上,可是如果你真的做了的话,会发现这些东西真的也就是这样.
限速的原理很简单,如果发现下载速度过快,那么我就休息一下,不下那么快,这样速度就降下来的.上传同理.
SpeedBarrier
/* 下载或者上传速度限制器 */class SpeedBarrier : boost::noncopyable{public:void StartTimer(){start_ = Timestamp::now();}void limitSpeed(int64_t maxSpeed, size_t bytesTransed);static int64_t maxDownloadSpeed; /* 最大的下载速度 */static int64_t maxUploadSpeed; /* 最大的上传速度 */private:void nanoSleep(int64_t microSeconds);private:Timestamp start_; /* 开始的时间 */};
如何使用这个速度限制器呢?很简单,首先,我们要执行StartTimer()函数开始计时.我们可以看到上面的实现代码,就是获取当前的时间.
然后传输一点数据之后,然后调用limitSpeed函数,将允许的最大传输速度和你已经发送的数据量作为参数传递过去,然后就可以了:
void SpeedBarrier::limitSpeed(int64_t maxSpeed, size_t bytesTransed) {/* 开始限制速度 */Timestamp now = Timestamp::now();int64_t timePassed = now.microSecondsSinceEpoch() - start_.microSecondsSinceEpoch();int64_t speedNow = bytesTransed / (static_cast<double>(timePassed) / Timestamp::kMicroSecondsPerSecond); /* 求出每秒钟传送的字节数 */if (speedNow > maxSpeed) {/* 然后我们就必须休眠 */int64_t diff = speedNow - maxSpeed; /* 求出两者之间的差值 */int64_t sleepMicroSeconds = (diff / static_cast<double>(maxSpeed)) * timePassed;nanoSleep(sleepMicroSeconds);//usleep(sleepMicroSeconds); // usleep要求参数小于1000000,也就是1秒,这是不现实的.}start_ = Timestamp::now(); /* 重新计时 */}
如果传输的速度超过了,那么就要休眠对应的时间.
这也是一个很有意思的功能,以前以为断点续传一定是一个高大上的功能,实现了之后发现一文不值.
断点续传的实现简单粗暴,首先,客户端发送rest命令,传递需要续传的位置,然后传输文件的时候,将文件读写指针指向那个位置,然后开始发送.
void CmdHandle::REST() {/* 这次要实现的一个比较有意思的功能是断点续传 */resumePoint_ = str2longlong(argv_.c_str()); /* 记录断点 */Reply("%d we will transfer the file from the position we got!\r\n", FTP_RESTOK);}void CmdHandle::RETR() {Connection conn = GetConnect(); /* 获得连接 */int fd = open(argv_.c_str(), O_RDONLY, NULL); /* 打开文件 */if (fd < 0) {Reply("%d Open local file fail.\r\n", FTP_FILEFAIL);return;}/* 开始传送文件 */char text[1024] = { 0 };struct stat fileInfo;size_t size;{FileRDLock lock(fd); /* 读锁,如果不能读的话,会一直阻塞 */fstat(fd, &fileInfo);if (!S_ISREG(fileInfo.st_mode)) { /* 判断是否为普通文件,设备文件不能下载 */Reply("%d It is not a regular file.\r\n", FTP_FILEFAIL);return;}if (mode_ == binary)sprintf(text, "Opening BINARY mode data connection for %s (%lld bytes).",argv_.c_str(), (long long)fileInfo.st_size);elsesprintf(text, "Opening ASCII mode data connection for %s (%lld bytes).",argv_.c_str(), (long long)fileInfo.st_size);Reply("%d %s\r\n", FTP_DATACONN, text);// 读取文件内容,写入套接字size = fileInfo.st_size;if (resumePoint_ != 0) {Lseek(fd, resumePoint_, SEEK_SET); /* 重定位文件 */size -= resumePoint_; /* 需要传送的字节的数目 */}barrier_.StartTimer(); /* 开始计时 */while (size > 0) {int sended = sendfile(conn->GetFd(), fd, NULL, bytesPerTime); /* 每次发送一点 */if (sended == -1) {break;}barrier_.limitSpeed(SpeedBarrier::maxDownloadSpeed, sended); /* 开始限速 */size -= sended;}}if (size == 0)Reply("%d Transfer complete.\r\n", FTP_TRANSFEROK);else {Reply("%d Transfer failed.\r\n", FTP_BADSENDNET); /* 连接关闭,放弃传输 */}utility::Close(fd); /* 关闭文件 */resumePoint_ = 0; /* 文件已经传送完毕了,要将断点复原 */}
abort命令整个程序真正有点难度的是abort命令.
客户端正在下载文件,突然用户点了取消,客户端给你发送紧急消息怎么弄?如果你不屏蔽SIGPIPE消息的话,你的程序多半会挂掉.如果你不处理客户端发送的紧急消息的话,你下一次接收命令的时候应该会碰到一堆的\377之类的东西,parse没弄好的话,分分钟将你服务端搞死.
SIGPIPE消息最近被SIGPIPE消息坑了很久.所以立志要一次性解决它.
SIGPIPE消息的由来对一个对端已经关闭的socket调用两次write, 第二次将会生成SIGPIPE信号, 该信号默认结束进程.
具体的分析可以结合TCP的”四次握手”关闭. TCP是全双工的信道,可以看作两条单工信道, TCP连接两端的两个端点各负责一条. 当对端调用close时, 虽然本意是关闭整个两条信道, 但本端只是收到FIN包. 按照TCP协议的语义, 表示对端只是关闭了其所负责的那一条单工信道, 仍然可以继续接收数据. 也就是说, 因为TCP协议的限制, 一个端点无法获知对端已经完全关闭.

对一个已经收到FIN包的socket调用read方法, 如果接收缓冲已空, 则返回0, 这就是常说的表示连接关闭. 但第一次对其调用write方法时, 如果发送缓冲没问题, 会返回正确写入(发送). 但发送的报文会导致对端发送RST报文, 因为对端的socket已经调用了close, 完全关闭, 既不发送, 也不接收数据. 所以, 第二次调用write方法(假设在收到RST之后), 会生成SIGPIPE信号, 导致进程退出.
那么,我们如何屏蔽SIGPIPE消息?
SIGPIPE消息屏蔽方法对于单进程而言,有下面几种方法可以尝试一下:
signal函数有意思的是,这种方法在我的电脑上完全行不通,我不知道是不是都是这样,但是还是在这里贴一下:
signal(SIGPIPE, SIG_IGN); /* 忽略掉SIGPIPE消息 */
signal函数是很老的东西,它由ISO C定义,由于ISO C不涉及多线程,进程组等,所以它对信号的定义非常模糊,以致于对Unix系统而言几乎毫无用处.所以说,不推荐系统提供的signal函数.
如果你实在要使用signal的话,stevents老爷子用sigaction.函数给我们重新实现了一遍signal函数,我们比较推荐这个版本.这里也顺带在这里贴一下:
void unix_error(const char *msg) /* unix-style error */{fprintf(stderr, "%s: %s\n", msg, strerror(errno));exit(0);}typedef void handler_t(int);handler_t *Signal(int signum, handler_t *handler) /* 用于注册信号处理函数 */{struct sigaction action, old_action;action.sa_handler = handler; /* 信号处理函数 */sigemptyset(&action.sa_mask); /* block sigs of type being handled */action.sa_flags = SA_RESTART; /* 如果可能的话,重启系统调用 */if (sigaction(signum, &action, &old_action) < 0)unix_error("Signal error");return (old_action.sa_handler);}
我测试一下这个函数,对其他的信号貌似都很管用,在我的电脑上对SIGPIPE消息却没什么用处,真是奇怪.
另外的方法也是有的,除了signal方法,其实我们也可以使用信号集的方法.
我这里给一个函数:
void BlockSigno(int signo) { /* 阻塞掉某个信号 */sigset_t signal_mask;sigemptyset(&signal_mask); /* 初始化信号集,并清除signal_mask中的所有信号 */sigaddset(&signal_mask, signo); /* 将signo添加到信号集中 */sigprocmask(SIG_BLOCK, &signal_mask, NULL); /* 这个进程屏蔽掉signo信号 */}
然后调用:
BlockSigno(SIGPIPE);
即可.如果想具体了解这几个函数,可以去看APUE.
SIGPIPE屏蔽方法其实线程的屏蔽方法和单进程差不太多,不行,你可以看:
void BlockSigno(int signo){sigset_t signal_mask;sigemptyset(&signal_mask);sigaddset(&signal_mask, signo);pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);}
然后我们调用:
BlockSigno(SIGPIPE);
即可.当然,上面的代码都做了简化,没有错误处理,你自己可以添加.
首先,我们要注册SIGURG消息.注册这个消息有一点很有意思,那就是我们一般要添加这样的一段代码:
/*-* 这条命令还是很有必要的,因为系统可不知道cmdfd_所属的进程,所以它也不知道应该向谁发送SIGURG信号.* 而设置了fd所属的进程之后,系统一旦检测到了客户端发来了紧急数据,就会立马通知该进程.否则的话* 该进程是收不到SIGURG信号的.*/fcntl(fd, F_SETOWN, getpid());
我这里SIGURG处理很随便,直接丢掉:
void CmdHandle::HandleUrg() { /* 处理紧急数据 *//*-* 一般而言,对于正在传输中的数据连接,客户端会给服务器发送紧急数据,当然,我这里也没有处理,直接丢弃.* 发送的紧急数据是什么呢? \377\364 \377\362之类的.* \377\364 \377\362就是telnet协议中规定的操作,我翻译一下(操作序列):'IAC' 'Interrupt Process' 'IAC' 'Data Mark'* 其中:(1) \364表示操作:'Interrupt Process',即实施telnet的The function IP。含义:ftp客户机告诉你这个FTP服务器,* 赶快放下你现在手头的事情,马上处理我的事件(我有紧急数据到来)。* (2) \362表示操作:'Data Mark',这个字节是ftp客户机以TCP的紧急模式发送的一个字节。含义:即:其后的数据必须立即读取。* (3) \377即IAC,是telnet中的转义字节(即:255),每一个telnet操作(如:\364、\362)都必须以IAC开始。* 如何处理? 因为f2是通过TCP紧急模式发送的一个字节而已。你只要将字节f2(即telnet操作:\362)丢弃即可以了。以上仅供你参考.*/char cmd[256] = { 0 };int errorno = 0;buffer_.readFd(cmdfd_, &errorno);strncpy(cmd, buffer_.peek(), buffer_.readableBytes()); /* 调试的时候你可以查看收到了什么东西 */if (errorno == 0 && false == buffer_.getLine(cmd, sizeof(cmd))) /* 没有出错 */buffer_.retrieveAll(); /* 丢弃掉紧急数据 */printf("recv urg!\n");}