[关闭]
@phper 2016-12-20T15:24:38.000000Z 字数 5448 阅读 5236

swoole深入学习 4. process

swoole


swoole-1.7.2增加了一个进程管理模块,用来替代PHP的pcntl扩展。pcntl是php新增的一个多进程扩展,用来实现多进程,但是有很多不完善的地方,swoole 就完善了这些地方,而且使得使用非常简单。

创建一个多进程

swoole创建多进程很简单:new Swoole\Process('callback_function') 就可以了。

比如我要同时创建10个进程,就for 循环10次就可以了。

  1. for($i=0; $i<=10 ; $i++){
  2. $process = new Swoole\Process('callback_function');
  3. $pid = $process->start();
  4. echo PHP_EOL . $pid;//
  5. }

非常简单。

来个具体的例子,我们对比下普通的模式和多进程模式的时间速度区别:

多进程下并发3次,每次循环1亿次。和普通模式下。

  1. <?php
  2. echo time() .PHP_EOL ;
  3. $worker_num = 3;//创建的进程数
  4. for ($i=0; $i < $worker_num ; $i++) {
  5. $process = new Swoole\Process('callback_function');
  6. $pid = $process->start();
  7. }
  8. function callback_function($worker) {
  9. for($i=0;$i<100000000;$i++){}
  10. echo time() .PHP_EOL;
  11. }

看下打印的时间, 算最大的时间,总共用了8秒。

  1. swoole git:(master) php test.php
  2. 1481791259 #开始时间,没有被阻塞。
  3. swoole git:(master)
  4. 1481791267 #
  5. 1481791267
  6. 1481791267

具体的资源占用:

  1. $ time php test.php
  2. 0.04s user
  3. 0.02s system
  4. 50% cpu
  5. 0.129 total

再看下普通循环模式:

  1. <?php
  2. echo time() .PHP_EOL;
  3. for($i=0;$i<100000000;$i++){}
  4. for($i=0;$i<100000000;$i++){}
  5. for($i=0;$i<100000000;$i++){}
  6. echo time() .PHP_EOL;

看下打印时间,用了16秒。

  1. 1481791591
  2. 1481791607

具体的资源占用:

  1. time php test.php
  2. 15.09s user
  3. 0.11s system
  4. 96% cpu
  5. 15.720 total

进程间的通信

如果是非常简单的多进程执行任务,那么进程间就不需要通讯了,实际情况下,很多业务是需要通讯的,比如,发邮件,如果自进程发送失败了,那么是要通知主进程的等等。

再看进程间通信之前,先看下 Swoole\Process 的几个参数:

Swoole\Process(mixed $function, $redirect_stdin_stdout = false, $create_pipe = true);

它有三个参数:

$function:子进程创建成功后要执行的函数

$redirect_stdin_stdout:重定向子进程的标准输入和输出。 设置为true,则在进程内echo将不是打印屏幕,而是写入到管道,读取键盘输入将变为从管道中读取数据。 默认为false,阻塞读取。

$create_pipe:是否创建管道,启用$redirect_stdin_stdout后,此选项将忽略用户参数,强制为true 如果子进程内没有进程间通信,可以设置为false。

swoole_process进程间支持3种通信方式:

管道通信

管道通信是swoole_process默认的一种通信方式。当然我们也可以在实例化的时候通过参数来设定:

$process = new Swoole\Process('callback_function', false, true);

这样就创建了一个管道通信进程。我们打印下$process这个对象的值:

  1. var_dump($process)
  2. object(swoole_process)#1 (3) {
  3. ["pipe"]=>
  4. int(2)
  5. ["callback"]=>
  6. string(26) "callback_function"
  7. ["pid"]=>
  8. int(4333)
  9. }

里面有个字段pipe是管道id,还有一个pid是进程id,所以:

每次创建一个进程后,就会随之创建一个管道,主进程想和哪一个进程通信,就向那个进程的管道写入/读取数据。

管道有2个方法,分别来写入数据,和读取数据。

write()  和  read() 

来个例子,看下如何通过管道通信

  1. <?php
  2. /**
  3. * process 管道通信
  4. * User: yangyi
  5. * Date: 2016/12/9
  6. * Time: 17:10
  7. */
  8. //进程数量
  9. $worker_num = 2;
  10. $workers = [];
  11. for ($i = 0; $i < $worker_num; $i++) {
  12. $process = new Swoole\Process('callback_function', false);
  13. $pid = $process->start();
  14. //将每一个进程的句柄存起来
  15. $workers[$pid] = $process;
  16. }
  17. // 主进程,通过管道给子进程发送数据
  18. foreach ($workers as $pid => $process) {
  19. //向子进程管道里写内容:$process->write($data);
  20. $process->write("hello worker[$pid]\n");
  21. //从子进程管道里面读取信息:$process->read();
  22. echo "From Worker: ".$process->read();
  23. }
  24. //子进程执行的回调函数
  25. function callback_function($worker){
  26. //从主进程管道中读取
  27. $recv = $worker->read();
  28. echo PHP_EOL. "From Master: $recv\n";
  29. //向主进程管道中写入数据
  30. $worker->write("hello master , this pipe is ". $worker->pipe ."; this pid is ".$worker->pid."\n");
  31. $worker->exit(0);
  32. }

运行看下效果:

  1. $ php process_pipe.php
  2. From Master: hello worker[6759]
  3. From Worker: hello master , this pipe is 4; this pid is 6759
  4. From Master: hello worker[6760]
  5. From Worker: hello master , this pipe is 6; this pid is 6760

注意 主进程和子进程中的 read() 不能一开始都读,得写反。不然管道中没数据,就会阻塞住了。

所以可以参考下面的图:

流程图

所以一般的顺序是:

master-->write()

work-->read()

work-->write()

master-->read()

这样才能有序的使用通道,才不会被阻塞。而且是一对一的,write 2 次,也要read 2次,先write先read。

第二个参数 $redirect_stdin_stdout 说,设置为 true ,子进程会将 echo 写入到主管道。我把上面的代码改一下,看下输出结果是啥。

$process = new Swoole\Process('callback_function', true, true);

紧紧改动了这一行,再看下:

  1. //进程数量
  2. $worker_num = 2;
  3. $workers = [];
  4. for ($i = 0; $i < $worker_num; $i++) {
  5. $process = new Swoole\Process('callback_function', true);
  6. $pid = $process->start();
  7. //将每一个进程的句柄存起来
  8. $workers[$pid] = $process;
  9. }
  10. // 主进程,通过管道给子进程发送数据
  11. foreach ($workers as $pid => $process) {
  12. //向子进程管道里写内容:$process->write($data);
  13. $process->write("hello worker[$pid]\n");
  14. //从子进程管道里面读取信息:$process->read();
  15. echo "From Worker: ".$process->read();
  16. }
  17. //子进程执行的回调函数
  18. function callback_function($worker){
  19. //从主进程管道中读取
  20. $recv = $worker->read();
  21. //这个echo 相当于在往master管道里写数据。write('From Master: hello worker[9251]')
  22. echo "From Master: $recv\n";
  23. //第二次写, 但是主进程没有第二次read(),所以没有被读到。
  24. $worker->write("hello master , this pipe is ". $worker->pipe ."; this pid is ".$worker->pid."\n");
  25. $worker->exit(0);
  26. }

运行下输出结果为:

  1. $ php test.php
  2. From Worker: From Master: hello worker[9251]
  3. From Worker: From Master: hello worker[9252]

来分析下,因为$redirect_stdin_stdout为true,所以子进程中echo的内容就到了主管道里面,而不是打印在屏幕上,所以,主进程从管道里读到的内容,就是子进程中echo的内容。
也就造成了上面的输出结果。

那么如何使子进程中的第二个write,能被主进程读到呢?很简单,在主进程中在 read() 一次就可以了:

  1. // 主进程,通过管道给子进程发送数据
  2. foreach ($workers as $pid => $process) {
  3. //向子进程管道里写内容:$process->write($data);
  4. $process->write("hello worker[$pid]\n");
  5. //从子进程管道里面读取信息:$process->read();
  6. echo "From Worker: ".$process->read();
  7. //第二次读
  8. echo "From Worker: ".$process->read();
  9. }

再看下打印结果:

  1. From Worker: From Master: hello worker[9328]
  2. From Worker: hello master , this pipe is 4; this pid is 9328
  3. From Worker: From Master: hello worker[9329]
  4. From Worker: hello master , this pipe is 6; this pid is 9329

消息队列

swoole进程通信还有第二种方式就是“消息队列”,这个消息队列其实就是Linux系统里面的msgqueue

swoole提供了2个方法,来实现消息队列的通信。

pop() 和 push() 

要使用队列,必须在start方法前使用useQueue

  1. <?php
  2. $workers = [];
  3. $worker_num = 2;
  4. for($i = 0; $i < $worker_num; $i++)
  5. {
  6. $process = new Swoole\Process('callback_function',false,false);
  7. $process->useQueue();
  8. $pid = $process->start();
  9. $workers[$pid] = $process;
  10. }
  11. foreach($workers as $pid => $process)
  12. {
  13. //给子进程发布消息
  14. $process->push("hello worker[$pid]\n");
  15. }
  16. function callback_function($worker)
  17. {
  18. //接受来自主进程的消息
  19. $recv = $worker->pop();
  20. echo "From Master: $recv\n";
  21. $worker->exit(0);
  22. }
  23. //等待消息停止
  24. for($i = 0; $i < $worker_num; $i++)
  25. {
  26. $ret = Swoole\Process::wait();
  27. $pid = $ret['pid'];
  28. unset($workers[$pid]);
  29. echo "Worker Exit, PID=".$pid.PHP_EOL;
  30. }

运行下:

  1. $ php process_msg.php
  2. From Master: hello worker[18219]
  3. From Master: hello worker[18218]
  4. Worker Exit, PID=18218
  5. Worker Exit, PID=18219

消息队列,依赖于linux系统ipcs,所以,如果空间满了,可能会阻塞。或者出现其他一些异常,具体可以参考swoole内核设置:https://wiki.swoole.com/wiki/page/p-server/sysctl.html

与消息队列相关的几个系统命令:

查看消息队列

ipcs -q

ipcrm 删除消息队列

ipcrm -q MessageId

批量删除所有的队列

ipcs -q | sed "$ d; 1,2d" |  awk '{ print "Removing " $2; system("ipcrm -q " $2) }'
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注