[关闭]
@lishuhuakai 2016-11-15T22:40:18.000000Z 字数 5788 阅读 1447

一起来写ftp server 03 -- 加强版本


这一次的版本我们在第一版的基础上添加了一些有意思的东西,增强了我们的功能.

下载限速和上传限速

代码的实现里有一些很有意思的地方,下载限速和上传限速就是其中一例,这些东西听起来非常高大上,可是如果你真的做了的话,会发现这些东西真的也就是这样.

限速的原理很简单,如果发现下载速度过快,那么我就休息一下,不下那么快,这样速度就降下来的.上传同理.

SpeedBarrier

  1. /* 下载或者上传速度限制器 */
  2. class SpeedBarrier : boost::noncopyable
  3. {
  4. public:
  5. void StartTimer()
  6. {
  7. start_ = Timestamp::now();
  8. }
  9. void limitSpeed(int64_t maxSpeed, size_t bytesTransed);
  10. static int64_t maxDownloadSpeed; /* 最大的下载速度 */
  11. static int64_t maxUploadSpeed; /* 最大的上传速度 */
  12. private:
  13. void nanoSleep(int64_t microSeconds);
  14. private:
  15. Timestamp start_; /* 开始的时间 */
  16. };

如何使用这个速度限制器呢?很简单,首先,我们要执行StartTimer()函数开始计时.我们可以看到上面的实现代码,就是获取当前的时间.

然后传输一点数据之后,然后调用limitSpeed函数,将允许的最大传输速度和你已经发送的数据量作为参数传递过去,然后就可以了:

  1. void SpeedBarrier::limitSpeed(int64_t maxSpeed, size_t bytesTransed) {
  2. /* 开始限制速度 */
  3. Timestamp now = Timestamp::now();
  4. int64_t timePassed = now.microSecondsSinceEpoch() - start_.microSecondsSinceEpoch();
  5. int64_t speedNow = bytesTransed / (static_cast<double>(timePassed) / Timestamp::kMicroSecondsPerSecond); /* 求出每秒钟传送的字节数 */
  6. if (speedNow > maxSpeed) {
  7. /* 然后我们就必须休眠 */
  8. int64_t diff = speedNow - maxSpeed; /* 求出两者之间的差值 */
  9. int64_t sleepMicroSeconds = (diff / static_cast<double>(maxSpeed)) * timePassed;
  10. nanoSleep(sleepMicroSeconds);
  11. //usleep(sleepMicroSeconds); // usleep要求参数小于1000000,也就是1秒,这是不现实的.
  12. }
  13. start_ = Timestamp::now(); /* 重新计时 */
  14. }

如果传输的速度超过了,那么就要休眠对应的时间.

断点续传

这也是一个很有意思的功能,以前以为断点续传一定是一个高大上的功能,实现了之后发现一文不值.

断点续传的实现简单粗暴,首先,客户端发送rest命令,传递需要续传的位置,然后传输文件的时候,将文件读写指针指向那个位置,然后开始发送.

  1. void CmdHandle::REST() {
  2. /* 这次要实现的一个比较有意思的功能是断点续传 */
  3. resumePoint_ = str2longlong(argv_.c_str()); /* 记录断点 */
  4. Reply("%d we will transfer the file from the position we got!\r\n", FTP_RESTOK);
  5. }
  6. void CmdHandle::RETR() {
  7. Connection conn = GetConnect(); /* 获得连接 */
  8. int fd = open(argv_.c_str(), O_RDONLY, NULL); /* 打开文件 */
  9. if (fd < 0) {
  10. Reply("%d Open local file fail.\r\n", FTP_FILEFAIL);
  11. return;
  12. }
  13. /* 开始传送文件 */
  14. char text[1024] = { 0 };
  15. struct stat fileInfo;
  16. size_t size;
  17. {
  18. FileRDLock lock(fd); /* 读锁,如果不能读的话,会一直阻塞 */
  19. fstat(fd, &fileInfo);
  20. if (!S_ISREG(fileInfo.st_mode)) { /* 判断是否为普通文件,设备文件不能下载 */
  21. Reply("%d It is not a regular file.\r\n", FTP_FILEFAIL);
  22. return;
  23. }
  24. if (mode_ == binary)
  25. sprintf(text, "Opening BINARY mode data connection for %s (%lld bytes).",
  26. argv_.c_str(), (long long)fileInfo.st_size);
  27. else
  28. sprintf(text, "Opening ASCII mode data connection for %s (%lld bytes).",
  29. argv_.c_str(), (long long)fileInfo.st_size);
  30. Reply("%d %s\r\n", FTP_DATACONN, text);
  31. // 读取文件内容,写入套接字
  32. size = fileInfo.st_size;
  33. if (resumePoint_ != 0) {
  34. Lseek(fd, resumePoint_, SEEK_SET); /* 重定位文件 */
  35. size -= resumePoint_; /* 需要传送的字节的数目 */
  36. }
  37. barrier_.StartTimer(); /* 开始计时 */
  38. while (size > 0) {
  39. int sended = sendfile(conn->GetFd(), fd, NULL, bytesPerTime); /* 每次发送一点 */
  40. if (sended == -1) {
  41. break;
  42. }
  43. barrier_.limitSpeed(SpeedBarrier::maxDownloadSpeed, sended); /* 开始限速 */
  44. size -= sended;
  45. }
  46. }
  47. if (size == 0)
  48. Reply("%d Transfer complete.\r\n", FTP_TRANSFEROK);
  49. else {
  50. Reply("%d Transfer failed.\r\n", FTP_BADSENDNET); /* 连接关闭,放弃传输 */
  51. }
  52. utility::Close(fd); /* 关闭文件 */
  53. resumePoint_ = 0; /* 文件已经传送完毕了,要将断点复原 */
  54. }

abort命令

整个程序真正有点难度的是abort命令.

客户端正在下载文件,突然用户点了取消,客户端给你发送紧急消息怎么弄?如果你不屏蔽SIGPIPE消息的话,你的程序多半会挂掉.如果你不处理客户端发送的紧急消息的话,你下一次接收命令的时候应该会碰到一堆的\377之类的东西,parse没弄好的话,分分钟将你服务端搞死.

如何屏蔽SIGPIPE消息


最近被SIGPIPE消息坑了很久.所以立志要一次性解决它.

SIGPIPE消息的由来

对一个对端已经关闭的socket调用两次write, 第二次将会生成SIGPIPE信号, 该信号默认结束进程.

具体的分析可以结合TCP的”四次握手”关闭. TCP是全双工的信道,可以看作两条单工信道, TCP连接两端的两个端点各负责一条. 当对端调用close时, 虽然本意是关闭整个两条信道, 但本端只是收到FIN包. 按照TCP协议的语义, 表示对端只是关闭了其所负责的那一条单工信道, 仍然可以继续接收数据. 也就是说, 因为TCP协议的限制, 一个端点无法获知对端已经完全关闭.

示意图

对一个已经收到FIN包的socket调用read方法, 如果接收缓冲已空, 则返回0, 这就是常说的表示连接关闭. 但第一次对其调用write方法时, 如果发送缓冲没问题, 会返回正确写入(发送). 但发送的报文会导致对端发送RST报文, 因为对端的socket已经调用了close, 完全关闭, 既不发送, 也不接收数据. 所以, 第二次调用write方法(假设在收到RST之后), 会生成SIGPIPE信号, 导致进程退出.

那么,我们如何屏蔽SIGPIPE消息?

单进程的SIGPIPE消息屏蔽方法

对于单进程而言,有下面几种方法可以尝试一下:

使用signal函数

有意思的是,这种方法在我的电脑上完全行不通,我不知道是不是都是这样,但是还是在这里贴一下:

  1. signal(SIGPIPE, SIG_IGN); /* 忽略掉SIGPIPE消息 */

signal函数是很老的东西,它由ISO C定义,由于ISO C不涉及多线程,进程组等,所以它对信号的定义非常模糊,以致于对Unix系统而言几乎毫无用处.所以说,不推荐系统提供的signal函数.

如果你实在要使用signal的话,stevents老爷子用sigaction.函数给我们重新实现了一遍signal函数,我们比较推荐这个版本.这里也顺带在这里贴一下:

  1. void unix_error(const char *msg) /* unix-style error */
  2. {
  3. fprintf(stderr, "%s: %s\n", msg, strerror(errno));
  4. exit(0);
  5. }
  6. typedef void handler_t(int);
  7. handler_t *Signal(int signum, handler_t *handler) /* 用于注册信号处理函数 */
  8. {
  9. struct sigaction action, old_action;
  10. action.sa_handler = handler; /* 信号处理函数 */
  11. sigemptyset(&action.sa_mask); /* block sigs of type being handled */
  12. action.sa_flags = SA_RESTART; /* 如果可能的话,重启系统调用 */
  13. if (sigaction(signum, &action, &old_action) < 0)
  14. unix_error("Signal error");
  15. return (old_action.sa_handler);
  16. }

我测试一下这个函数,对其他的信号貌似都很管用,在我的电脑上对SIGPIPE消息却没什么用处,真是奇怪.

信号集

另外的方法也是有的,除了signal方法,其实我们也可以使用信号集的方法.
我这里给一个函数:

  1. void BlockSigno(int signo) { /* 阻塞掉某个信号 */
  2. sigset_t signal_mask;
  3. sigemptyset(&signal_mask); /* 初始化信号集,并清除signal_mask中的所有信号 */
  4. sigaddset(&signal_mask, signo); /* 将signo添加到信号集中 */
  5. sigprocmask(SIG_BLOCK, &signal_mask, NULL); /* 这个进程屏蔽掉signo信号 */
  6. }

然后调用:

  1. BlockSigno(SIGPIPE);

即可.如果想具体了解这几个函数,可以去看APUE.

多线程的SIGPIPE屏蔽方法

其实线程的屏蔽方法和单进程差不太多,不行,你可以看:

  1. void BlockSigno(int signo)
  2. {
  3. sigset_t signal_mask;
  4. sigemptyset(&signal_mask);
  5. sigaddset(&signal_mask, signo);
  6. pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
  7. }

然后我们调用:

  1. BlockSigno(SIGPIPE);

即可.当然,上面的代码都做了简化,没有错误处理,你自己可以添加.

如何处理紧急消息

首先,我们要注册SIGURG消息.注册这个消息有一点很有意思,那就是我们一般要添加这样的一段代码:

  1. /*-
  2. * 这条命令还是很有必要的,因为系统可不知道cmdfd_所属的进程,所以它也不知道应该向谁发送SIGURG信号.
  3. * 而设置了fd所属的进程之后,系统一旦检测到了客户端发来了紧急数据,就会立马通知该进程.否则的话
  4. * 该进程是收不到SIGURG信号的.
  5. */
  6. fcntl(fd, F_SETOWN, getpid());

我这里SIGURG处理很随便,直接丢掉:

  1. void CmdHandle::HandleUrg() { /* 处理紧急数据 */
  2. /*-
  3. * 一般而言,对于正在传输中的数据连接,客户端会给服务器发送紧急数据,当然,我这里也没有处理,直接丢弃.
  4. * 发送的紧急数据是什么呢? \377\364 \377\362之类的.
  5. * \377\364 \377\362就是telnet协议中规定的操作,我翻译一下(操作序列):'IAC' 'Interrupt Process' 'IAC' 'Data Mark'
  6. * 其中:(1) \364表示操作:'Interrupt Process',即实施telnet的The function IP。含义:ftp客户机告诉你这个FTP服务器,
  7. * 赶快放下你现在手头的事情,马上处理我的事件(我有紧急数据到来)。
  8. * (2) \362表示操作:'Data Mark',这个字节是ftp客户机以TCP的紧急模式发送的一个字节。含义:即:其后的数据必须立即读取。
  9. * (3) \377即IAC,是telnet中的转义字节(即:255),每一个telnet操作(如:\364、\362)都必须以IAC开始。
  10. * 如何处理? 因为f2是通过TCP紧急模式发送的一个字节而已。你只要将字节f2(即telnet操作:\362)丢弃即可以了。以上仅供你参考.
  11. */
  12. char cmd[256] = { 0 };
  13. int errorno = 0;
  14. buffer_.readFd(cmdfd_, &errorno);
  15. strncpy(cmd, buffer_.peek(), buffer_.readableBytes()); /* 调试的时候你可以查看收到了什么东西 */
  16. if (errorno == 0 && false == buffer_.getLine(cmd, sizeof(cmd))) /* 没有出错 */
  17. buffer_.retrieveAll(); /* 丢弃掉紧急数据 */
  18. printf("recv urg!\n");
  19. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注