[关闭]
@lishuhuakai 2016-11-04T01:27:24.000000Z 字数 2838 阅读 1628

一起来写web server 05 -- 多线程进阶版本


这个版本的web server比第4版稍微做了一点改进,那就是由主线程统一接收连接,然后连接的处理由子线程来完成.因此,这里就引入了条件变量以及同步互斥的问题.

同步机制

muduo库中有一个关于同步机制的封装,我这里就直接采用了.我这里来介绍一下这个封装吧.

下面是Conditon这个类的代码:

  1. class Condition : noncopyable
  2. {
  3. private:
  4. MutexLock& mutex_; /* 之前的锁的一个引用 */
  5. pthread_cond_t pcond_; /* 系统定义的条件变量的类型 */
  6. ... ...
  7. }

这个类的构造函数用于初始化同步变量:

  1. explicit Condition(MutexLock& mutex)
  2. : mutex_(mutex)
  3. {
  4. pthread_cond_init(&pcond_, NULL); /* 初始化同步变量 */
  5. }

析构函数就销毁掉同步变量:

  1. ~Condition()
  2. {
  3. pthread_cond_destroy(&pcond_); /* 销毁条件变量 */
  4. }

等待某个条件:

  1. void wait()
  2. {
  3. MutexLock::UnassignGuard ug(mutex_);
  4. pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()); /* 等待Mutex */
  5. }

通知单个线程:

  1. void notify()
  2. {
  3. pthread_cond_signal(&pcond_); /* 唤醒一个线程 */
  4. }

条件变量只有一种正确的使用方式,几乎不可能用错,对于wait端:
1. 必须与mutex一起使用,该布尔表达式的读写需受此mutex保护.
2. 在mutex已经上锁的时候才能调用wait().
3. 把判断布尔条件和wait()放到while循环中.
写成代码是这个样子的:

  1. MutexLock mutex;
  2. Condition cond(mutex);
  3. std::deque<int> queue;
  4. int dequeue() {
  5. MutexLockGuard lock(mutex); /* 加锁 */
  6. while (queue.empty()) {
  7. cond.wait();
  8. }
  9. assert(!queue.empty());
  10. int top = queue.front();
  11. queue.pop_front();
  12. return top;
  13. }

对于sinal/broadcast端:
1. 不一定要在mutex已经上锁的情况下调用signal(理论上).
2. 在signal之前一般要修改布尔表达式.
3. 修改布尔表达式通常要用mutex保护.
4. 注意区分signalbroadcast:"broadcast"通常用于表明状态变化,而signal表示资源可用.
写成代码是:

  1. void enqueue(int x)
  2. {
  3. MutexLockGuard lock(mutex); // 加锁
  4. queue.push_back(x);
  5. cond.signal(); // 可以移出临界区之外
  6. }

以上引自linux多线程服务端编程.

我来谈一下我的理解:

cond中之所以需要mutex,是因为在执行到

  1. while (condition) {
  2. cond.wait();
  3. }

时,需要将cond中持有的mutex解锁.一旦接收到signal,它需要重新抢夺这个mutex,抢到了,才能从wait函数中返回.

为什么cond.wait()要放入while循环中呢?一方面是因为spurious wakeup,之所以会有这个东西,是速度的考量,一般来说,即使没有spurious wakeup,你也要这么写代码,举个栗子.

在生产者消费者模型之中,消费者1获得锁,发现queue为空,wait,消费者2获得锁,发现queue为空,wait,生产者3获得锁,将生产的产品放入queue,调用signal,并且释放了mutex,t1,t2被唤醒,可以预见的是,这两者只会有一个获得锁,消费完这个产品,然后另一个获得锁,发现为空,还是得继续等待,这就是while的由来,当然,至于signal为什么会唤醒多个线程,man手册上就是这么说的.

我们的代码

  1. ```cpp
  2. /*-
  3. * 线程池的加强版本.主要是主线程统一接收连接,其余都是工作者线程,这里的布局非常类似于一个生产者.
  4. * 多个消费者.
  5. */
  6. #define MAXNCLI 100
  7. MutexLock mutex; /* 全局的锁 */
  8. Condition cond(mutex); /* 全局的条件变量 */
  9. int clifd[MAXNCLI], iget, iput;
  10. int main(int argc, char *argv[])
  11. {
  12. int listenfd = Open_listenfd(8080); /* 8080号端口监听 */
  13. signal(SIGPIPE, SIG_IGN);
  14. pthread_t tids[10];
  15. void* thread_main(void *);
  16. for (int i = 0; i < 10; ++i) {
  17. int *arg = (int *)Malloc(sizeof(int));
  18. *arg = i;
  19. Pthread_create(&tids[i], NULL, thread_main, (void *)arg);
  20. }
  21. struct sockaddr cliaddr; /* 用于存储对方的ip信息 */
  22. socklen_t clilen;
  23. for (; ; ) {
  24. int connfd = Accept(listenfd, &cliaddr, &clilen);
  25. {
  26. MutexLockGuard lock(mutex); /* 加锁 */
  27. clifd[iput] = connfd; /* 涉及到对共享变量的修改,要加锁 */
  28. if (++iput == MAXNCLI) iput = 0;
  29. if (iput == iget) unix_error("clifd is not big enough!\n");
  30. }
  31. cond.notify(); /* 通知一个线程有数据啦! */
  32. }
  33. return 0;
  34. }

线程的代码是这样的:

  1. void*
  2. thread_main(void *arg)
  3. {
  4. int connfd;
  5. printf("thread %d starting\n", *(int *)arg);
  6. Free(arg);
  7. for ( ; ;) {
  8. {
  9. MutexLockGuard lock(mutex); /* 加锁 */
  10. while (iget == iput) { /* 没有新的连接到来 */
  11. /*-
  12. * 代码必须用while循环来等待条件变量,原因是spurious wakeup
  13. */
  14. cond.wait(); /* 这一步会原子地unlock mutex并进入等待,wait执行完毕会自动重新加锁 */
  15. }
  16. connfd = clifd[iget]; /* 获得连接套接字 */
  17. if (++iget == MAXNCLI) iget = 0;
  18. }
  19. doit(connfd);
  20. close(connfd);
  21. }
  22. }

总结

这个版本在原来的版本上增加了同步互斥操作,在某种程度上增加了难度.

具体代码还是看这里吧!:https://github.com/lishuhuakai/Spweb

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注