PHP多进程系列笔记(包含原生多进程和Swoole)

PHP多进程系列笔记(一)

 

  1.   本系列文章将向大家讲解 pcntl_*系列函数,从而更深入的理解进程相关知识。
  2.   PCNTL在PHP中进程控制支持默认是关闭的。您需要使用 --enable-pcntl 配置选项重新编译PHP的 CGI或CLI版本以打开进程控制支持。
  3.   Note: 此扩展在 Windows 平台上不可用。

pcntl_fork

  1.   int pcntl_fork ( void )
  2.    

用于创建子进程。成功时,在父进程执行线程内返回产生的子进程的PID,在子进程执行线程内返回0。失败时,在父进程上下文返回-1,不会创建子进程,并且会引发一个PHP错误。

  1.   fork.php
  2.   <?php
  3.   $pid = pcntl_fork();
  4.   if($pid == -1){
  5.   //错误处理:创建子进程失败时返回-1.
  6.   die( 'could not fork' );
  7.   }elseif($pid){
  8.   //父进程会得到子进程号,所以这里是父进程执行的逻辑
  9.   $id = getmypid();
  10.   echo "Parent process,pid {$id}, child pid {$pid}\n";
  11.   }else{
  12.   //子进程得到的$pid为0, 所以这里是子进程执行的逻辑
  13.   $id = getmypid();
  14.   echo "Child process,pid {$id}\n";
  15.   sleep(10);
  16.   }

命令行运行:

  1.   $ php fork.php
  2.   Parent process,pid 98, child pid 99
  3.   Child process,pid 99

该例里父进程还没有来得及等子进程运行完毕就自动退出了,子进程由 init进程接管。通过 ps-ef|grep php 看到子进程还在运行:

  1.   [root@9355490fe5da /]# ps -ef | grep php
  2.   root 105 1 0 16:46 pts/0 00:00:00 php fork.php
  3.   root 107 27 0 16:46 pts/1 00:00:00 grep php

子进程成为孤立进程,ppid(父进程id)变成1了。如果在父进程里也加个 sleep(5),你会看到子进程ppid本来是大于1的,后来就变成1了。

注:如果是docker环境,孤立进程的ppid可能是0。

pcntl_wait

pcntl_wait()函数用来让父进程等待子进程退出,默认情况下会阻塞主进程。

阻塞模式

紧接着上面的例子,如果想等子进程运行结束后父进程再退出,该怎么办?那就用到 pcntl_wait了。

	int pcntl_wait ( int &$status [, int $options = 0 ] )

该函数阻塞当前进程,只到当前进程的一个子进程退出或者收到一个结束当前进程的信号。 我们修改代码:

  1.   <?php
  2.   $pid = pcntl_fork();
  3.   if($pid == -1){
  4.   exit("fork fail");
  5.   }elseif($pid){
  6.   $id = getmypid();
  7.   echo "Parent process,pid {$id}, child pid {$pid}\n";
  8.   pcntl_wait($status);
  9.   //pcntl_waitpid($pid, $status);
  10.   }else{
  11.   $id = getmypid();
  12.   echo "Child process,pid {$id}\n";
  13.   sleep(10);
  14.   }

此时再次运行程序,父进程就会一直等待子进程运行结束然后退出。 pcntl_waitpid()和 pcntl_wait()功能相同。前者第一个参数支持指定pid参数,当指定-1作为 pid的值等同于后者。

	int pcntl_waitpid ( int $pid , int &$status [, int $options = 0 ] )

当已知子进程pid的时候,可以使用 pcntl_waitpid()。 这两个函数返回退出的子进程进程号(>1),发生错误时返回-1,如果提供了 WNOHANG 作为option(wait3可用的系统)并且没有可用子进程时返回0。 返回值为退出的子进程进程号时,想了解如何退出,可以通过 $status状态码反应。

非阻塞模式

pcntl_wait()默认情况下会阻塞主进程,直到子进程执行完毕才继续往下运行。如果设置最后一个参数为常量 WNOHANG,那么就不会阻塞主进程,而是继续执行后续代码, 此时 pcntl_waitpid 就会返回0。 示例:

  1.   <?php
  2.   $pid = pcntl_fork();
  3.   if($pid == -1){
  4.   exit("fork fail");
  5.   }elseif($pid){
  6.   $id = getmypid();
  7.   echo "Parent process,pid {$id}, child pid {$pid}\n";
  8.   while(1){
  9.   $res = pcntl_wait($status, WNOHANG);
  10.   //$res = pcntl_waitpid($pid, $status, WNOHANG);
  11.   if ($res == -1 || $res > 0){
  12.   sleep(10);//此处为了方便看效果,实际不需要
  13.   break;
  14.   }
  15.   }
  16.   }else{
  17.   $id = getmypid();
  18.   echo "Child process,pid {$id}\n";
  19.   sleep(2);
  20.   }

该示例里只有一个子进程,看不出来非阻塞的好处,我们修改一下:

  1.   <?php
  2.   $child_pids = [];
  3.   for($i=0;$i<3; $i++){
  4.   $pid = pcntl_fork();
  5.   if($pid == -1){
  6.   exit("fork fail");
  7.   }elseif($pid){
  8.   $child_pids[] = $pid;
  9.   $id = getmypid();
  10.   echo time()." Parent process,pid {$id}, child pid {$pid}\n";
  11.   }else{
  12.   $id = getmypid();
  13.   $rand = rand(1,3);
  14.   echo time()." Child process,pid {$id},sleep $rand\n";
  15.   sleep($rand); //#1 故意设置时间不一样
  16.   exit();//#2 子进程需要exit,防止子进程也进入for循环
  17.   }
  18.   }
  19.   while(count($child_pids)){
  20.   foreach ($child_pids as $key => $pid) {
  21.   // $res = pcntl_wait($status, WNOHANG);
  22.   $res = pcntl_waitpid($pid, $status, WNOHANG);//#3
  23.   if ($res == -1 || $res > 0){
  24.   echo time()." Child process exit,pid {$pid}\n";
  25.   unset($child_pids[$key]);
  26.   }else{
  27.   // echo time()." Wait End,pid {$pid}\n"; //#4
  28.   }
  29.   }
  30.   }
  31.    
  32.   #3处首先先去掉 WNOHANG参数,运行:
  33.   $ php fork.1.php
  34.   1528637334 Parent process,pid 6600, child pid 6601
  35.   1528637334 Child process,pid 6601,sleep 2
  36.   1528637334 Parent process,pid 6600, child pid 6602
  37.   1528637334 Child process,pid 6602,sleep 2
  38.   1528637334 Parent process,pid 6600, child pid 6603
  39.   1528637334 Child process,pid 6603,sleep 1
  40.   1528637336 Child process exit,pid 6601
  41.   1528637336 Child process exit,pid 6602
  42.   1528637336 Child process exit,pid 6603

我们看到,6603号进程运行时间最短,但是是最后回收。我们再加上 WNOHANG参数,运行:

  1.   $ php fork.1.php
  2.   1528637511 Parent process,pid 6695, child pid 6696
  3.   1528637511 Child process,pid 6696,sleep 2
  4.   1528637511 Parent process,pid 6695, child pid 6697
  5.   1528637511 Child process,pid 6697,sleep 1
  6.   1528637511 Parent process,pid 6695, child pid 6698
  7.   1528637511 Child process,pid 6698,sleep 3
  8.   1528637512 Child process exit,pid 6697
  9.   1528637513 Child process exit,pid 6696
  10.   1528637514 Child process exit,pid 6698

6697进程最先回收!说明确实是异步非阻塞的。感兴趣的朋友还可以开启 #4处代码,未使用 WNOHANG参数的时候,里面的代码是不会运行的。

注意: #2处需要注意子进程需要exit,防止子进程也进入for循环。如果没有 exit(),最终创建的子进程不只3个。

  • 检测status函数

在 pcntl_wait和 pcntl_waitpid两个函数中的 $status中存了子进程的状态信息,这个参数可以用于 pcntl_wifexited、 pcntl_wifstopped、 pcntl_wifsignaled、 pcntl_wexitstatus、 pcntl_wtermsig、 pcntl_wstopsig、 pcntl_waitpid这些函数。

代码片段:

  1.   while(1){
  2.   $res = pcntl_wait($status);
  3.   if ($res == -1 || $res > 0){
  4.   if(!pcntl_wifexited($status)){
  5.   //进程非正常退出
  6.   echo "service exit unusally; pid is $pid\n";
  7.   }else{
  8.   //获取进程终端的退出状态码;
  9.   $code = pcntl_wexitstatus($status);
  10.   echo "service exit code: $code;pid is $pid \n";
  11.   }
  12.   if(pcntl_wifsignaled($status)){
  13.   //不是通过接受信号中断
  14.   echo "service term not by signal;pid is $pid \n";
  15.   }else{
  16.   $signal = pcntl_wtermsig($status);
  17.   echo "service term by signal $signal;pid is $pid\n";
  18.   }
  19.   if(pcntl_wifstopped($status)){
  20.   echo "service stop not unusally;pid is $pid \n";
  21.   }else{
  22.   $signal = pcntl_wstopsig($status);
  23.   echo "service stop by signal $signal;pid is $pid\n";
  24.   }
  25.   break;
  26.   }
  • 参考

1、php多进程 防止出现僵尸进程 https://www.cnblogs.com/jkko123/p/6351615.html?utmsource=itdadao&utmmedium=referral

2、PCNTL函数族--PHP多进程编程 (转) https://www.cnblogs.com/zox2011/archive/2013/02/19/2917448.html

PHP多进程系列笔记(二)

  1.   作者:飞鸿影~
  2.    
  3.   出处:http://52fhy.cnblogs.com/
  4.    

僵尸(zombie)进程

这里说下僵尸进程:

僵尸进程是指的父进程已经退出,而该进程dead之后没有进程接受,就成为僵尸进程(zombie)进程。任何进程在退出前(使用exit退出) 都会变成僵尸进程(用于保存进程的状态等信息),然后由init进程接管。如果不及时回收僵尸进程,那么它在系统中就会占用一个进程表项,如果这种僵尸进程过多,最后系统就没有可以用的进程表项,于是也无法再运行其它的程序。

通过如下命令查看是否有僵尸进程,如果有,类似下面这样:

  1.   $ ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
  2.   Z+ 282 283 [php] <defunct>
  • 如果子进程还没有结束时,父进程就结束了,那么init进程会自动接手这个子进程,进行回收。
  • 如果父进程是循环,又没有安装SIGCHLD信号处理函数调用wait或waitpid()等待子进程结束。那么子进程结束后,没有回收,就产生僵尸进程了。

示例: fork_zombie.php

  1.   <?php
  2.    
  3.   $pid = pcntl_fork();
  4.   if($pid == -1){
  5.   exit("fork fail");
  6.   }elseif($pid){
  7.   $id = getmypid();
  8.   echo "Parent process,pid {$id}, child pid {$pid}\n";
  9.    
  10.   while(1){sleep(3);} //#1
  11.   }else{
  12.   $id = getmypid();
  13.   echo "Child process,pid {$id}\n";
  14.   sleep(2);
  15.   exit();
  16.   }

命令行里运行程序,然后新终端查看:

  1.   $ ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
  2.   Z+ 7252 7253 [php] <defunct>

出现了一个僵尸进程。这时候就算手动结束脚本程序也无法关闭这个僵尸子进程了。需要使用kill -9关闭。

pcntl_signal

bool pcntl_signal ( int $signo , callback $handler [, bool $restart_syscalls = true ] )

该函数为signo指定的信号安装一个新的信号处理器。

  • 安装SIGCHLD信号

上一节里,我们讲到僵尸进程产生的原因:

如果父进程是循环,又没有安装SIGCHLD信号处理函数调用wait或waitpid()等待子进程结束。那么子进程结束后,没有回收,就产生僵尸进程了。

本小节我们通过安装SIGCHLD信号处理函数来解决僵尸进程问题。示例:

  1.   <?php
  2.    
  3.   //表示每执行一条低级指令,就检查一次信号,如果检测到注册的信号,就调用其信号处理器
  4.   declare(ticks = 1);
  5.    
  6.   //安装SIGCHLD信号
  7.   pcntl_signal(SIGCHLD, function(){
  8.   echo "SIGCHLD \r\n";
  9.   pcntl_wait($status);
  10.   }); //#2
  11.    
  12.   $pid = pcntl_fork();
  13.   if($pid == -1){
  14.   exit("fork fail");
  15.   }elseif($pid){
  16.   $id = getmypid();
  17.   echo "Parent process,pid {$id}, child pid {$pid}\n";
  18.    
  19.   //先sleep一下,否则代码一直循环,无法处理信号接收
  20.   while(1){sleep(3);} //#1
  21.   }else{
  22.   $id = getmypid();
  23.   echo "Child process,pid {$id}\n";
  24.   sleep(2);
  25.   exit();
  26.   }

第一次注释掉#1和#2处的代码,父进程提前结束,子进程被init进程接手,所以没有产生僵尸进程。 第二次我们注释掉#2处的代码,开启#1处的代码,即父进程是个死循环,又没有回收子进程,就产生僵尸进程了。 第三次我们开启#1处和#2处的代码,父进程由于安装了信号处理,并调用wait函数等待子进程结束,所以也没有产生僵尸进程。

  1.   对子进程的结束不感兴趣
  2.   如果父进程不关心子进程什么时候结束,那么可以用pcntl_signal(SIGCHLD, SIG_IGN)通知内核,自己对子进程的结束不感兴趣,那么子进程结束后,内核会回收,并不再给父进程发送信号。这样我们就不写子进程退出的处理函数了。

说明:

如果去掉declare( ticks = 1 );无法响应信号。因php的信号处理函数是基于ticks来实现的,而不是注册到真正系统底层的信号处理函数中。
  • 安装其他信号

我们可以在主进程安装更多信号,例如:

  1.   <?php
  2.   declare( ticks = 1 );
  3.    
  4.   //信号处理函数
  5.   function sig_handler ( $signo )
  6.   {
  7.    
  8.   switch ( $signo ) {
  9.   case SIGTERM :
  10.   // 处理SIGTERM信号
  11.   exit;
  12.   break;
  13.   case SIGHUP :
  14.   //处理SIGHUP信号
  15.   break;
  16.   case SIGUSR1 :
  17.   echo "Caught SIGUSR1...\n" ;
  18.   break;
  19.   default:
  20.   // 处理所有其他信号
  21.   }
  22.    
  23.   }
  24.    
  25.   echo "Installing signal handler...\n" ;
  26.    
  27.   //安装信号处理器
  28.   pcntl_signal ( SIGTERM , "sig_handler" );
  29.   pcntl_signal ( SIGHUP , "sig_handler" );
  30.   pcntl_signal ( SIGUSR1 , "sig_handler" );
  31.    
  32.   echo "Generating signal SIGTERM to self...\n" ;
  33.    
  34.   //向当前进程发送SIGUSR1信号
  35.   posix_kill ( posix_getpid (), SIGUSR1 );
  36.    
  37.   echo "Done\n"

注:通过 kill -l 可以看到Linux下所有的信号常量。

  1.   $ kill -l
  2.   1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP
  3.   6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1
  4.   11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
  5.   16) SIGSTKFLT 17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP
  6.   21) SIGTTIN 22) SIGTTOU 23) SIGURG 24) SIGXCPU 25) SIGXFSZ
  7.   26) SIGVTALRM 27) SIGPROF 28) SIGWINCH 29) SIGIO 30) SIGPWR
  8.   31) SIGSYS 34) SIGRTMIN 35) SIGRTMIN+1 36) SIGRTMIN+2 37) SIGRTMIN+3
  9.   38) SIGRTMIN+4 39) SIGRTMIN+5 40) SIGRTMIN+6 41) SIGRTMIN+7 42) SIGRTMIN+8
  10.   43) SIGRTMIN+9 44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13
  11.   48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12
  12.   53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9 56) SIGRTMAX-8 57) SIGRTMAX-7
  13.   58) SIGRTMAX-6 59) SIGRTMAX-5 60) SIGRTMAX-4 61) SIGRTMAX-3 62) SIGRTMAX-2
  14.   63) SIGRTMAX-1 64) SIGRTMAX
  15.    

ticks相关

  1.   PHP的 ticks=1 表示每执行1行PHP代码就回调此函数(指的pcntl_signal_dispatch)。实际上大部分时间都没有信号产生,但ticks的函数一直会执行。如果一个服务器程序1秒中接收1000次请求,平均每个请求要执行1000行PHP代码。那么PHP的pcntl_signal,就带来了额外的 1000 * 1000,也就是100万次空的函数调用。这样会浪费大量的CPU资源。
  2.   (摘自:韩天峰(Rango)的博客 » PHP官方的pcntl_signal性能极差
  3.   http://rango.swoole.com/archives/364)

pcntl_signal_dispatch的作用就是查看是否收到了信号需要处理,如果有信号的话,就调用相应的信号处理函数。

所以上述问题比较好的做法是去掉ticks,转而手动调用pcntl_signal_dispatch,在代码循环中自行处理信号。

我们把上一小节的例子改改,不使用ticks:

  1.   <?php
  2.    
  3.   //declare( ticks = 1 );
  4.    
  5.   //信号处理函数
  6.   function sig_handler ( $signo )
  7.   {
  8.    
  9.   switch ( $signo ) {
  10.   case SIGUSR1 :
  11.   echo "Caught SIGUSR1...\n" ;
  12.   break;
  13.   default:
  14.   // 处理所有其他信号
  15.   }
  16.    
  17.   }
  18.    
  19.   echo "Installing signal handler...\n" ;
  20.    
  21.   //安装信号处理器
  22.   pcntl_signal ( SIGUSR1 , "sig_handler" );
  23.    
  24.   echo "Generating signal SIGTERM to self...\n" ;
  25.    
  26.   //向当前进程发送SIGUSR1信号
  27.   posix_kill ( posix_getpid (), SIGUSR1 );
  28.   pcntl_signal_dispatch();
  29.    
  30.   echo "Done\n";

运行结果:

  1.   Installing signal handler...
  2.   Generating signal SIGTERM to self...
  3.   Caught SIGUSR1...
  4.   Done

相比每执行一条php语句都会调用 pcntl_signal_dispatch 一次,效率好多了。

pcntl_alarm

int pcntl_alarm ( int $seconds )

该函数创建一个计时器,在指定的秒数后向进程发送一个 SIGALRM 信号。每次对 pcntl_alarm() 的调用都会取消之前设置的alarm信号。注意不是定时器,只会运行一次。

下面是一个隔5秒发送一个SIGALRM信号,并由signal_handler函数获取,然后打印一个 SIGALRM 的例子:

  1.   <?php
  2.   declare(ticks = 1);
  3.    
  4.   //安装SIGALRM信号
  5.   pcntl_signal(SIGALRM, function(){
  6.   echo "SIGALRM\n";
  7.   pcntl_alarm(5); //再次调用,会重新发送一个SIGALRM信号
  8.   });
  9.   pcntl_alarm(5);//发送一个SIGALRM信号
  10.    
  11.   echo "run...\n";
  12.    
  13.   //死循环,否则进程会退出
  14.   while(1){sleep(1);}
  15.    

注:如果不想使用ticks,那么需要在主循环里主动增加pcntl_signal_dispatch()调用。

PHP多进程系列笔记(三)

多进程实例

本节讲解几个多进程的实例。
  • Master-Worker结构

下面例子实现了简单的多进程管理:

  1. 支持设置最大子进程数
  2. Master-Worker结构:Worker挂掉,Master进程会重新创建一个
  1.   <?php
  2.    
  3.   $pids = []; //存储子进程pid
  4.   $MAX_PROCESS = 3;//最大进程数
  5.    
  6.   $pid = pcntl_fork();
  7.   if($pid <0){
  8.   exit("fork fail\n");
  9.   }elseif($pid > 0){
  10.   exit;//父进程退出
  11.   }else{
  12.   // 从当前终端分离
  13.   if (posix_setsid() == -1) {
  14.   die("could not detach from terminal");
  15.   }
  16.    
  17.   $id = getmypid();
  18.   echo time()." Master process, pid {$id}\n";
  19.    
  20.   for($i=0; $i<$MAX_PROCESS;$i++){
  21.   start_worker_process();
  22.   }
  23.    
  24.   //Master进程等待子进程退出,必须是死循环
  25.   while(1){
  26.   foreach($pids as $pid){
  27.   if($pid){
  28.   $res = pcntl_waitpid($pid, $status, WNOHANG);
  29.   if ( $res == -1 || $res > 0 ){
  30.   echo time()." Worker process $pid exit, will start new... \n";
  31.   start_worker_process();
  32.   unset($pids[$pid]);
  33.   }
  34.   }
  35.   }
  36.   }
  37.   }
  38.    
  39.   /**
  40.   * 创建worker进程
  41.   */
  42.   function start_worker_process(){
  43.   global $pids;
  44.   $pid = pcntl_fork();
  45.   if($pid <0){
  46.   exit("fork fail\n");
  47.   }elseif($pid > 0){
  48.   $pids[$pid] = $pid;
  49.   // exit; //此处不可退出,否则Master进程就退出了
  50.   }else{
  51.   //实际代码
  52.   $id = getmypid();
  53.   $rand = rand(1,3);
  54.   echo time()." Worker process, pid {$id}. run $rand s\n";
  55.   while(1){
  56.   sleep($rand);
  57.   }
  58.   }
  59.   }

多进程Server

下面我们使用多进程实现一个tcp服务器,支持:

  1. 多进程处理客户端连接
  2. 子进程退出,Master进程会重新创建一个
  3. 支持事件回调
  1.   <?php
  2.    
  3.   class TcpServer{
  4.   const MAX_PROCESS = 3;//最大进程数
  5.   private $pids = []; //存储子进程pid
  6.   private $socket;
  7.    
  8.   public function __construct(){
  9.   $pid = pcntl_fork();
  10.   if($pid <0){
  11.   exit("fork fail\n");
  12.   }elseif($pid > 0){
  13.   exit;//父进程退出
  14.   } else{
  15.   // 从当前终端分离
  16.   if (posix_setsid() == -1) {
  17.   die("could not detach from terminal");
  18.   }
  19.    
  20.   umask(0);
  21.    
  22.   $id = getmypid();
  23.   echo time()." Master process, pid {$id}\n";
  24.    
  25.   //创建tcp server
  26.   $this->socket = stream_socket_server("tcp://0.0.0.0:9201", $errno, $errstr);
  27.   if(!$this->socket) exit("start server err: $errstr --- $errno");
  28.   }
  29.   }
  30.    
  31.   public function run(){
  32.   for($i=0; $i<self::MAX_PROCESS;$i++){
  33.   $this->start_worker_process();
  34.   }
  35.    
  36.   echo "waiting client...\n";
  37.    
  38.   //Master进程等待子进程退出,必须是死循环
  39.   while(1){
  40.   foreach($this->pids as $k=>$pid){
  41.   if($pid){
  42.   $res = pcntl_waitpid($pid, $status, WNOHANG);
  43.   if ( $res == -1 || $res > 0 ){
  44.   echo time()." Worker process $pid exit, will start new... \n";
  45.   $this->start_worker_process();
  46.   unset($this->pids[$k]);
  47.   }
  48.   }
  49.   }
  50.   sleep(1);//让出1s时间给CPU
  51.   }
  52.   }
  53.    
  54.   /**
  55.   * 创建worker进程,接受客户端连接
  56.   */
  57.   private function start_worker_process(){
  58.   $pid = pcntl_fork();
  59.   if($pid <0){
  60.   exit("fork fail\n");
  61.   }elseif($pid > 0){
  62.   $this->pids[] = $pid;
  63.   // exit; //此处不可退出,否则Master进程就退出了
  64.   }else{
  65.   $this->acceptClient();
  66.   }
  67.   }
  68.    
  69.   private function acceptClient()
  70.   {
  71.   //子进程一直等待客户端连接,不能退出
  72.   while(1){
  73.   $conn = stream_socket_accept($this->socket, -1);
  74.   if($this->onConnect) call_user_func($this->onConnect, $conn); //回调连接事件
  75.    
  76.   //开始循环读取消息
  77.   $recv = ''; //实际收到消息
  78.   $buffer = ''; //缓冲消息
  79.   while(1){
  80.   $buffer = fread($conn, 20);
  81.    
  82.   //没有收到正常消息
  83.   if($buffer === false || $buffer === ''){
  84.   if($this->onClose) call_user_func($this->onClose, $conn); //回调断开连接事件
  85.   break;//结束读取消息,等待下一个客户端连接
  86.   }
  87.    
  88.   $pos = strpos($buffer, "\n"); //消息结束符
  89.   if($pos === false){
  90.   $recv .= $buffer;
  91.   }else{
  92.   $recv .= trim(substr($buffer, 0, $pos+1));
  93.    
  94.   if($this->onMessage) call_user_func($this->onMessage, $conn, $recv); //回调收到消息事件
  95.    
  96.   //客户端强制关闭连接
  97.   if($recv == "quit"){
  98.   echo "client close conn\n";
  99.   fclose($conn);
  100.   break;
  101.   }
  102.    
  103.   $recv = ''; //清空消息,准备下一次接收
  104.   }
  105.   }
  106.   }
  107.   }
  108.    
  109.   function __destruct() {
  110.   @fclose($this->socket);
  111.   }
  112.   }
  113.    
  114.   $server = new TcpServer();
  115.    
  116.   $server->onConnect = function($conn){
  117.   echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
  118.   fwrite($conn,"conn success\n");
  119.   };
  120.    
  121.   $server->onMessage = function($conn,$msg){
  122.   echo "onMessage --" . $msg . "\n";
  123.   fwrite($conn,"received ".$msg."\n");
  124.   };
  125.    
  126.   $server->onClose = function($conn){
  127.   echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
  128.   fwrite($conn,"onClose "."\n");
  129.   };
  130.    
  131.   $server->run();

运行:

  1.   $ php process_multi.server.php
  2.   1528734803 Master process, pid 9110
  3.   waiting client...

此时服务端已经变成守护进程了。新开终端,我们使用ps命令查看进程:

  1.   $ ps -ef | grep php
  2.   yjc 9110 1 0 00:33 ? 00:00:00 php process_multi.server.php
  3.   yjc 9111 9110 0 00:33 ? 00:00:00 php process_multi.server.php
  4.   yjc 9112 9110 0 00:33 ? 00:00:00 php process_multi.server.php
  5.   yjc 9113 9110 0 00:33 ? 00:00:00 php process_multi.server.php
  6.   yjc 9134 8589 0 00:35 pts/1 00:00:00 grep php

可以看到4个进程:1个主进程,3个子进程。使用kill命令结束子进程,主进程会重新拉起一个新的子进程。

然后我们使用telnet测试连接:

  1.   $ telnet 127.0.0.1 9201
  2.   Trying 127.0.0.1...
  3.   Connected to 127.0.0.1.
  4.   Escape character is '^]'.
  5.   conn success
  6.   hello server!
  7.   received hello server!
  8.   quit
  9.   received quit
  10.   Connection closed by foreign host.

PHP多进程系列笔记(四)

Posix常用函数

本节主要讲解Posix常用函数和进程池的概念,也会涉及到守护进程的知识。本节难度较低。
  • posix_kill

向指定pid进程发送信号。成功时返回 TRUE , 或者在失败时返回 FALSE 。

bool posix_kill ( int $pid , int $sig )

$sig=0,可以检测进程是否存在,不会发送信号。

示例:

  1.   //向当前进程发送SIGUSR1信号
  2.   posix_kill ( posix_getpid (), SIGUSR1 );

注:通过 kill -l 可以看到Linux下所有的信号常量。

posix_getpid 返回当前进程id。

posix_getppid 返回父进程id。

posix_setsid 设置新会话组长,脱离终端。成功时返回session id,失败返回 -1。写守护进程(Daemon) 用到该函数。下面引用Workerman源代码里的一段示例:

  1.   function daemonize(){
  2.   umask(0);
  3.   $pid = pcntl_fork();
  4.   if (-1 === $pid) {
  5.   die('fork fail');
  6.   } elseif ($pid > 0) {
  7.   exit(0);
  8.   }
  9.    
  10.   if (-1 === posix_setsid()) {
  11.   die("setsid fail");
  12.   }
  13.    
  14.   // Fork again avoid SVR4 system regain the control of terminal.
  15.   $pid = pcntl_fork();
  16.   if (-1 === $pid) {
  17.   die("fork fail");
  18.   } elseif (0 !== $pid) {
  19.   exit(0);
  20.   }
  21.   }

如果程序需要以守护进程的方式执行,在业务代码之前调用该函数即可。

进程池

什么是进程池? 其实是很简单的概念,就是预先创建一组子进程,当有新任务来时,系统通过调配该组进程中的某个子进程完成此任务。

前面几节的示例里我们都是使用这种方式,预先创建好进程,而不是动态创建。

引入《Linux高性能服务器编程》的一段话,描述动态创建进程的缺点:

动态创建进程(或线程)比较耗费时间,这将导致较慢的客户响应。 动态创建的子进程通常只用来为一个客户服务,这样导致了系统上产生大量的细微进程(或线程)。进程和线程间的切换将消耗大量CPU时间。 动态创建的子进程是当前进程的完整映像,当前进程必须谨慎的管理其分配的文件描述符和堆内存等系统资源,否则子进程可能复制这些资源,从而使系统的可用资源急剧下降,进而影响服务器的性能。 所以任何时候,建议预先创建好进程,也就是使用进程池的方式实现。

像我们熟知的php-fpm还支持最大创建多少个进程、初始创建多少个进程这种方式,大家感兴趣可以研究研究。

PHP多进程系列笔记(五)

swoole多进程

前面几节都是讲解pcntl扩展实现的多进程程序。本节给大家介绍swoole扩展的swoole_process模块。

swoole_process 是swoole提供的进程管理模块,用来替代PHP的pcntl扩展。

首先,确保安装的swoole版本大于1.7.2:

  1.   $ php --ri swoole
  2.    
  3.   swoole
  4.    
  5.   swoole support => enabled
  6.   Version => 1.10.1
注意:swoole_process在最新的1.8.0版本已经禁止在Web环境中使用了,所以也只能支持命令行。

swoole提供的多进程扩展基本功能和pcntl提供的一样,但swoole更易简单上手,并且提供了:

  1. 默认基于unixsock的进程间通信;
  2. 支持消息队列作为进程间通信;
  3. 基于signalfd和eventloop处理信号,几乎没有任何额外消耗;
  4. 高精度微秒定时器;
  5. 配合swoole_event模块,创建的PHP子进程可以异步的事件驱动模式
  • 基础方法
  1.   swoole_process::__construct
  2.   swoole_process->start
  3.   swoole_process->name
  4.   swoole_process->exec
  5.   swoole_process->close
  6.   swoole_process->exit
  7.   swoole_process::kill
  8.   swoole_process::wait
  9.   swoole_process::daemon
  10.   swoole_process::setAffinity
  • 管道通信
  1.   swoole_process->write
  2.   swoole_process->read
  3.   swoole_process->setTimeout
  4.   swoole_process->setBlocking
  • 消息队列通信
  1.   swoole_process->useQueue
  2.   swoole_process->statQueue
  3.   swoole_process->freeQueue
  4.   swoole_process->push
  5.   swoole_process->pop
  • 信号与定时器
  1.   swoole_process::signal
  2.   swoole_process::alarm

基础应用

  • 本例实现的是tcp server,特性:
  1. 多进程处理客户端连接
  2. 子进程退出,Master进程会重新创建一个
  3. 支持事件回调
  4. 主进程退出,子进程在干完手头活后退出
  1.   <?php
  2.    
  3.   class TcpServer{
  4.   const MAX_PROCESS = 3;//最大进程数
  5.   private $pids = []; //存储子进程pid
  6.   private $socket;
  7.   private $mpid;
  8.    
  9.   public function run(){
  10.   $process = new swoole_process(function(){
  11.   $this->mpid = $id = getmypid();
  12.   echo time()." Master process, pid {$id}\n";
  13.    
  14.   //创建tcp server
  15.   $this->socket = stream_socket_server("tcp://0.0.0.0:9201", $errno, $errstr);
  16.   if(!$this->socket) exit("start server err: $errstr --- $errno");
  17.    
  18.   for($i=0; $i<self::MAX_PROCESS;$i++){
  19.   $this->start_worker_process();
  20.   }
  21.    
  22.   echo "waiting client...\n";
  23.    
  24.   //Master进程等待子进程退出,必须是死循环
  25.   while(1){
  26.   foreach($this->pids as $k=>$pid){
  27.   if($pid){
  28.   $res = swoole_process::wait(false);
  29.   if ( $res ){
  30.   echo time()." Worker process $pid exit, will start new... \n";
  31.   $this->start_worker_process();
  32.   unset($this->pids[$k]);
  33.   }
  34.   }
  35.   }
  36.   sleep(1);//让出1s时间给CPU
  37.   }
  38.   }, false, false); //不启用管道通信
  39.   swoole_process::daemon(); //守护进程
  40.   $process->start();//注意:start之后的变量子进程里面是获取不到的
  41.   }
  42.    
  43.   /**
  44.   * 创建worker进程,接受客户端连接
  45.   */
  46.   private function start_worker_process(){
  47.   $process = new swoole_process(function(swoole_process $worker){
  48.   $this->acceptClient($worker);
  49.   }, false, false);
  50.   $pid = $process->start();
  51.   $this->pids[] = $pid;
  52.   }
  53.    
  54.   private function acceptClient(&$worker)
  55.   {
  56.   //子进程一直等待客户端连接,不能退出
  57.   while(1){
  58.    
  59.   $conn = stream_socket_accept($this->socket, -1);
  60.   if($this->onConnect) call_user_func($this->onConnect, $conn); //回调连接事件
  61.    
  62.   //开始循环读取消息
  63.   $recv = ''; //实际收到消息
  64.   $buffer = ''; //缓冲消息
  65.   while(1){
  66.   $this->checkMpid($worker);
  67.    
  68.   $buffer = fread($conn, 20);
  69.    
  70.   //没有收到正常消息
  71.   if($buffer === false || $buffer === ''){
  72.   if($this->onClose) call_user_func($this->onClose, $conn); //回调断开连接事件
  73.   break;//结束读取消息,等待下一个客户端连接
  74.   }
  75.    
  76.   $pos = strpos($buffer, "\n"); //消息结束符
  77.   if($pos === false){
  78.   $recv .= $buffer;
  79.   }else{
  80.   $recv .= trim(substr($buffer, 0, $pos+1));
  81.    
  82.   if($this->onMessage) call_user_func($this->onMessage, $conn, $recv); //回调收到消息事件
  83.    
  84.   //客户端强制关闭连接
  85.   if($recv == "quit"){
  86.   echo "client close conn\n";
  87.   fclose($conn);
  88.   break;
  89.   }
  90.    
  91.   $recv = ''; //清空消息,准备下一次接收
  92.   }
  93.   }
  94.   }
  95.   }
  96.    
  97.   //检查主进程是否存在,若不存在子进程在干完手头活后退出
  98.   public function checkMpid(&$worker){
  99.   if(!swoole_process::kill($this->mpid,0)){
  100.   $worker->exit();
  101.   // 这句提示,实际是看不到的.需要写到日志中
  102.   echo "Master process exited, I [{$worker['pid']}] also quit\n";
  103.   }
  104.   }
  105.    
  106.   function __destruct() {
  107.   @fclose($this->socket);
  108.   }
  109.   }
  110.    
  111.   $server = new TcpServer();
  112.    
  113.   $server->onConnect = function($conn){
  114.   echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
  115.   fwrite($conn,"conn success\n");
  116.   };
  117.    
  118.   $server->onMessage = function($conn,$msg){
  119.   echo "onMessage --" . $msg . "\n";
  120.   fwrite($conn,"received ".$msg."\n");
  121.   };
  122.    
  123.   $server->onClose = function($conn){
  124.   echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
  125.   fwrite($conn,"onClose "."\n");
  126.   };
  127.    
  128.   $server->run();
  129.    

运行后可以使用telnet连接:

telnet 127.0.0.1 9201

由于设置了最大三个子进程,最多只能接受3个客户端连接。

进程间通信

前面讲解的例子里,主进程和子进程直接是没有直接的数据交互的。如果主进程需要得到的来自子进程的反馈,或者子进程接受来自主进程的数据,那么就需要进程间通信了。

swoole内置了管道通信和消息队列通信。

  • 管道通信

管道通信主要是数据传输:一个进程需要将数据发送给另外一个进程。

这个swoole封装后,使用非常简单:

  1.   <?php
  2.    
  3.   $workers = [];
  4.    
  5.   for ($i=0; $i<3; $i++) {
  6.   $process = new swoole_process(function(swoole_process $worker){
  7.   //子进程逻辑
  8.   $cmd = $worker->read();
  9.    
  10.   ob_start();
  11.   passthru($cmd);//执行外部程序并且显示未经处理的、原始输出,会直接打印输出。
  12.   $return = ob_get_clean() ? : ' ';
  13.   $return = trim($return).". worker pid:".$worker->pid."\n";
  14.    
  15.   // $worker->write($return);//写入数据到管道
  16.   echo $return;//写入数据到管道。注意:子进程里echo也是写入到管道
  17.   }, true); //第二个参数为true,启用管道通信
  18.   $pid = $process->start();
  19.   $workers[$pid] = $process;
  20.   }
  21.    
  22.   foreach($workers as $pid=>$worker){
  23.   $worker->write('whoami'); //通过管道发数据到子进程。管道是单向的:发出的数据必须由另一端读取。不能读取自己发出去的
  24.   $recv = $worker->read();//同步阻塞读取管道数据
  25.   echo "recv result: $recv";
  26.   }
  27.    
  28.   //回收子进程
  29.   while(count($workers)){
  30.   // echo time(). "\n";
  31.   foreach($workers as $pid=>$worker){
  32.   $ret = swoole_process::wait(false);
  33.   if($ret){
  34.   echo "worker exit: $pid\n";
  35.   unset($workers[$pid]);
  36.   }
  37.   }
  38.   }
  39.    
  40.    

运行:

  1.   $ php swoole_process_pipe.php
  2.   recv result: Linux
  3.   recv result: 2018年 06月 24日 星期日 16:18:01 CST
  4.   recv result: yjc
  5.   worker exit: 14519
  6.   worker exit: 14522
  7.   worker exit: 14525

注意点:

  1. 管道数据读取是同步阻塞的;上面的例子里如果子进程里再加一句$worker->read(),会一直阻塞。可以使用swoole_event_add将管道加入到事件循环中,变为异步模式。
  2. 子进程里的输出(例如echo)与write效果相同。
  3. 通过管道发数据到子进程。管道是单向的:发出的数据必须由另一端读取。不能读取自己发出去的。

这里额外讲解一下swoole_process::wait():

  1. swoole_process::wait()默认是阻塞的, swoole_process::wait(false)则是非阻塞的;
  2. swoole_process::wait()阻塞模式调用一次仅能回收一个子进程,非阻塞模式调用一次不一定能当前就能回收子进程;
  3. 如果不加swoole_process::wait(),主进程又是死循环,主进程退出后会变成僵尸进程。
  1.   ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'可以查询僵尸进程。
  2.    
  • 消息队列通信

消息队列与管道有些不一样:消息队列是全局的,所有进程都可以发送、读取。你可以把它看做redis list结构。

消息队列更常见的用途是主进程分配任务,子进程消费执行。

  1.   <?php
  2.    
  3.   $workers = [];
  4.    
  5.   for ($i=0; $i<3; $i++) {
  6.   $process = new swoole_process(function(swoole_process $worker){
  7.   //子进程逻辑
  8.   sleep(1); //防止父进程还未往消息队列中加入内容直接退出
  9.   while($cmd = $worker->pop()){
  10.   // echo "recv from master: $cmd\n";
  11.    
  12.   ob_start();
  13.   passthru($cmd);//执行外部程序并且显示未经处理的、原始输出,会直接打印输出。
  14.   $return = ob_get_clean() ? : ' ';
  15.   $return = "res: ".trim($return).". worker pid: ".$worker->pid."\n";
  16.    
  17.   echo $return;
  18.   // sleep(1);
  19.   }
  20.    
  21.   $worker->exit(0);
  22.   }, false, false); //不创建管道
  23.    
  24.   $process->useQueue(1, 2 | swoole_process::IPC_NOWAIT); //使用消息队列
  25.   $pid = $process->start();
  26.   $workers[$pid] = $process;
  27.   }
  28.    
  29.   //由于所有进程是共享使用一个消息队列,所以只需向一个子进程发送消息即可
  30.   $worker = current($workers);
  31.   for ($i=0; $i<3; $i++) {
  32.   $worker->push('whoami'); //发送消息
  33.   }
  34.    
  35.    
  36.   //回收子进程
  37.   while(count($workers)){
  38.   foreach($workers as $pid=>$worker){
  39.   $ret = swoole_process::wait();
  40.   if($ret){
  41.   echo "worker exit: $pid\n";
  42.   unset($workers[$pid]);
  43.   }
  44.   }
  45.   }
  46.    

运行结果:

  1.   $ php swoole_process_quene.php
  2.   res: yjc. worker pid: 15885
  3.   res: yjc. worker pid: 15886
  4.   res: yjc. worker pid: 15887
  5.   worker exit: 15885
  6.   worker exit: 15886
  7.   worker exit: 15887
  8.    

注意点:

  1. 所有进程共享使用一个消息队列;
  2. 消息队列的读取操作是阻塞的,可以在useQueue的时候第2个参数mode改为2 | swoole_process::IPC_NOWAIT,则是异步的。mode仅仅设置为2是阻塞的,示例里去掉swoole_process::IPC_NOWAIT后读取消息的while会死循环。
  3. 子进程前面加了个sleep(1);,这是为了防止父进程还未往消息队列中加入内容直接退出。
  4. 子进程末尾也加了sleep,这是为了防止一个进程把所有消息都消费完了,实际应用需要去掉。
  • 信号与定时器 swoole_process::alarm支持微秒定时器:
  1.   <?php
  2.    
  3.   function ev_timer(){
  4.   static $i = 0;
  5.   echo "#{$i}\talarm\n";
  6.   $i++;
  7.   if ($i > 5) {
  8.   //清除定时器
  9.   swoole_process::alarm(-1);
  10.    
  11.   //退出进程
  12.   swoole_process::kill(getmypid());
  13.    
  14.   }
  15.   }
  16.    
  17.   //安装信号
  18.   swoole_process::signal(SIGALRM, 'ev_timer');
  19.    
  20.   //触发定时器信号:单位为微秒。如果为负数表示清除定时器
  21.   swoole_process::alarm(100 * 1000);//100ms
  22.    
  23.   echo getmypid()."\n"; //该句会顺序执行,后续无需使用while循环防止进程直接退出
  24.    

运行:

  1.   $ php swoole_process_alarm.php
  2.   13660
  3.   #0 alarm
  4.   #1 alarm
  5.   #2 alarm
  6.   #3 alarm
  7.   #4 alarm
  8.   #5 alarm

注:alarm不能和Swoole\Timer同时使用。

上一篇:PHP进程间通信


下一篇:PHP多进程系列笔(转)