[PHP] 使用 pcntl 库实现PHP多进程

最近因项目需要,需要大量同步数据,数据量基数在3000万条左右,因此想到了开启多进程来处理,下面是处理的完整代码,基于laravel 5.1框架。
这是经过实际环境验证过的,所以类似场景可以简单修改下就可使用。

    /**
     * ******数据同步脚本
     *
     * @author yedonghai
     */
    namespace App\Console\Commands;

    use DB;
    use Illuminate\Console\Command;
    use App\Services\ZzcService;

    class ZzcSyncCommand extends Command
    {
        /**
         * The name and signature of the console command.
         *
         * @var string
         */
        protected $signature = 'zzcsync:data';

        /**
         * The console command description.
         *
         * @var string
         */
        protected $description = 'sync zzc apply info';

        /**
         * Execute the console command.
         *
         * @return mixed
         */
        public function handle()
        {
            $userNum = 6500000;
            $workers = 30;
            $block = 50000;
            $loop = 0;
            $flag = 0;

            $processIds = [];
            do {

                $flag = $loop * $workers * $block;

                for ($i = 0; $i < $workers; $i++) {
                    $minUserId = ($block * $i) + $flag;
                    $maxUserId = $block * ($i + 1) + $flag;

                    if ($minUserId < $userNum) {
                        $processIds[$i] = pcntl_fork();
                        switch ($processIds[$i]) {
                            case -1 :
                                echo "fork failed : {$i} \r\n";
                                exit;
                            case 0 :
                                $this->_userReport($minUserId, $maxUserId);
                                exit;
                            default :
                                break;
                        }
                    } else {
                        break;
                    }
                }

                while(count($processIds) > 0) {
                    $mypid = pcntl_waitpid(-1, $status, WNOHANG);
                    foreach ($processIds as $key => $pid) {
                        if ($mypid == $pid || $mypid == -1) {
                            unset($processIds[$key]);
                        }
                    }
                }

                $loop++;

            } while (empty($processIds) && $flag < $userNum);
        }

        /**
         * 子进程获取指定数据
         *
         * @param integer  $minUserId  读取区间的下限
         * @param integer  $maxUserId  读取区间的上限
         *
         * @return array
         */
        private function _userReport($minUserId, $maxUserId)
        {
            $users = DB::table('users')->leftJoin('user_credits', 'user_credits.user_id', '=', 'users.id')
                ->select('users.id', 'users.user_name as mobile', 'users.id_number as pid', 'users.truename as name')
                ->where('user_credits.audit_limit', '>', 0)
                ->where('users.id', '>=', $minUserId)
                ->where('users.id', '<', $maxUserId)
                ->get();

            foreach ($users as $userObj) {

                $userExist = DB::table('zzc_apply')->where('user_id', $userObj->id)->first();
                if (!empty((array)$userExist)) {
                    continue;
                }

                $userArr = [];

                $userArr['loan_type'] = '消费贷';
                $userArr['loan_term'] = '3';
                $userArr['loan_purpose'] = '购物';
                $userArr['applicant']['name']   = $userObj->name;
                $userArr['applicant']['pid']    = $userObj->pid;
                $userArr['applicant']['mobile'] = $userObj->mobile;

                $user = json_encode($userArr);
                $zzcService = new ZzcService();
                $zzcService->createNezha($user);
            }
        }

    }
上一篇:Objective-C:随机的读取文件中的内容


下一篇:Windows Phone 应用程序的全球化