<?php
/*
* 业务需求 php开启5个子进程同步主表与从表数据
* 函数 pcntl_fork() 创建子进程 pcntl_wait回收子进程
*/
include_once "./sql/Medoo.php";
use Medoo\Medoo; //medoo框架 轻量级的PHP数据库框架, 提高开发效率
class Master
{
private $forkNums = 5; //并发数
private $pids = array(); //子进程临时存储
private $limit = 0;
private $offset = 100;
private $time = 2; //并发任务间隔时间
private $masterTableName = 'master'; //主表
private $prepareTableName = 'spare'; //从表
private function getSqlObj($type = 'M')
{
try {
$masterClientData = include '../config/masterDb.php'; //主表配置
$prepareClientData = include '../config/prepareDb.php'; //从表配置
$configData = $type == 'M' ? $masterClientData : $prepareClientData;
return new Medoo($configData);
} catch (Exception $e) {
$this->logWirte('error', $e->getMessage());
exit;
}
}
private function logWirte($type, $msg)
{
date_default_timezone_set('PRC');
$fileName = $type == 'info' ? '../logs/info.log' : '../logs/error.log';
$time = 'startTime:' . date('Y-m-d H:i:s') . " ";
return file_put_contents($fileName, $time . $msg . PHP_EOL, FILE_APPEND);
}
public function sendTask()
{
try {
$sqlObj = $this->getSqlObj('M');
$total = $sqlObj->count("student_master", "*");
$needNumber = ceil($total / $this->offset); //需要进程数
$this->logWirte('info', "=======开始任务======本次需要执行任务数: " . $needNumber);
while ($needNumber) {
$this->logWirte('info', "=======开始任务======: " . $needNumber);
$actualNumber = $needNumber < $this->forkNums ? $needNumber : $this->forkNums;
$this->logWirte('info', "=======并发任务数======: " . $actualNumber);
for ($i = 0; $i < $actualNumber; ++$i) {
$this->pids[$i] = pcntl_fork();
if ($this->pids[$i] == -1) {
die('fork error');
} else if ($this->pids[$i]) {
$this->limit += $this->offset;
$needNumber--;
pcntl_wait($status, WNOHANG);
} else {
$sqlObj = $this->getSqlObj('M');
$returndata = $sqlObj->select($this->masterTableName, "*", ['LIMIT' => [$this->limit, $this->offset]]);
$prepareMysqlObj = $this->getSqlObj('P');
$prepareMysqlObj->insert($this->prepareTableName, $returndata);
echo "父进程ID: ", posix_getppid(), " 进程ID : ", posix_getpid(), " {$i} \r\n";
$this->logWirte('info', "父进程ID: " . posix_getppid() . " 子进程ID : " . posix_getpid());
exit;
}
}
sleep($this->time);
}
} catch (Exception $e) {
$this->logWirte('error', $e->getMessage());
exit;
}
}
}
$obj = new Master();
$obj->sendTask();
参考文章 https://www.cnblogs.com/jkko123/p/6294602.html