php并发多进程任务

<?php
/*
* 业务需求 php开启5个子进程同步主表与从表数据
* 函数 pcntl_fork() 创建子进程  pcntl_wait回收子进程 
*/

include_once "./sql/Medoo.php";

use Medoo\Medoo;  //medoo框架 轻量级的PHP数据库框架, 提高开发效率

class Master
{
    private $forkNums = 5; //并发数
    private $pids = array(); //子进程临时存储
    private $limit = 0;
    private $offset = 100;
    private $time = 2; //并发任务间隔时间
    private $masterTableName = 'master'; //主表
    private $prepareTableName = 'spare'; //从表

    private function getSqlObj($type = 'M')
    {
        try {
            $masterClientData = include '../config/masterDb.php'; //主表配置
            $prepareClientData = include '../config/prepareDb.php'; //从表配置
            $configData = $type == 'M' ? $masterClientData : $prepareClientData;
            return new Medoo($configData);
        } catch (Exception $e) {
            $this->logWirte('error', $e->getMessage());
            exit;
        }
    }

    private function logWirte($type, $msg)
    {
        date_default_timezone_set('PRC');
        $fileName = $type == 'info' ? '../logs/info.log' : '../logs/error.log';
        $time = 'startTime:' . date('Y-m-d H:i:s') . " ";
        return file_put_contents($fileName, $time . $msg . PHP_EOL, FILE_APPEND);
    }

    public function sendTask()
    {
        try {
            $sqlObj = $this->getSqlObj('M');
            $total = $sqlObj->count("student_master", "*");
            $needNumber = ceil($total / $this->offset); //需要进程数
            $this->logWirte('info', "=======开始任务======本次需要执行任务数: " . $needNumber);
            while ($needNumber) {
                $this->logWirte('info', "=======开始任务======: " . $needNumber);
                $actualNumber = $needNumber < $this->forkNums ? $needNumber : $this->forkNums;
                $this->logWirte('info', "=======并发任务数======: " . $actualNumber);
                for ($i = 0; $i < $actualNumber; ++$i) {
                    $this->pids[$i] = pcntl_fork();
                    if ($this->pids[$i] == -1) {
                        die('fork error');
                    } else if ($this->pids[$i]) {
                        $this->limit += $this->offset;
                        $needNumber--;
                        pcntl_wait($status, WNOHANG);
                    } else {
                        $sqlObj = $this->getSqlObj('M');
                        $returndata = $sqlObj->select($this->masterTableName, "*", ['LIMIT' => [$this->limit, $this->offset]]);

                        $prepareMysqlObj = $this->getSqlObj('P');
                        $prepareMysqlObj->insert($this->prepareTableName, $returndata);

                        echo "父进程ID: ", posix_getppid(), " 进程ID : ", posix_getpid(), " {$i} \r\n";
                        $this->logWirte('info', "父进程ID: " . posix_getppid() . " 子进程ID : " . posix_getpid());
                        exit;
                    }
                }
                sleep($this->time);
            }
        } catch (Exception $e) {
            $this->logWirte('error', $e->getMessage());
            exit;
        }
    }
}

$obj = new Master();
$obj->sendTask();

参考文章 https://www.cnblogs.com/jkko123/p/6294602.html

上一篇:实验3:GVRP的配置


下一篇:链路聚合、VLAN路由、GVRP