Python-多进程

一、进程的创建-multiprocessing

multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为一个独立的进程,可以执行另外的事情 1、Process语法结构如下:
  • target: 如果传递了函数的引用,可以任务这个子进程就执行这里的代码
  • args: 给target指定的函数传递的参数,以元祖的方式传递
  • kwargs: 给target指定的函数传递命名参数
  • name: 给进程敲定一个名字,可以不设定
  • group: 指定进程组,大多数情况下用不到
  2、Process创建的实例对象的常用方法:
  • start(): 启动子进程实例(创建子进程)
  • is_alive(): 判断进程子进程是否还在活着
  • join(timeout): 是否等待子进程执行结束,或等待多少秒
  • terminate(): 不管任务是否完成,立即终止子进程
  3、Process创建的实例对象的常用属性
  • name :当前进程的别名,默认为Process -N,N为从1开始递增的整数
  • pid: 当前进程的pid(进程号)
#coding : utf-8

import multiprocessing
import time


def test1():
    while True:
        print(f"--------test1---------")
        time.sleep(1)

def test2():
    while True:
        print(f"--------test2--------")
        time.sleep(1)

def main():
  	#创建进程
    p1 = multiprocessing.Process(target=test1)
    p2 = multiprocessing.Process(target=test2)
	
    #启动进程
    p1.start()
    p2.start()
    
    #等待进程执行结束
    p1.join()

if __name__ == '__main__':
    main()
说明: 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动   操作系统的调度算法: 时间片轮转 优先级调度  

二、获取进程的Pid

#coding : utf-8

import multiprocessing
import os
import time


def test1():
    while True:
        print(f"--------in 子进程 pid = {os.getpid()},父进程 pid = {os.getppid()}---------")
        time.sleep(1)

def test2():
    while True:
        print(f"--------in 子进程 pid = {os.getpid()},父进程 pid = {os.getppid()}--------")
        time.sleep(1)

def main():
    print(f"--------in 主进程 pid = {os.getpid()}")
    p1 = multiprocessing.Process(target=test1)
    p2 = multiprocessing.Process(target=test2)

    p1.start()
    p2.start()

if __name__ == '__main__':
    main()
运行结果:
--------in 主进程 pid = 13882
--------in 子进程 pid = 13884,父进程 pid = 13882---------
--------in 子进程 pid = 13885,父进程 pid = 13882--------
--------in 子进程 pid = 13884,父进程 pid = 13882---------
--------in 子进程 pid = 13885,父进程 pid = 13882--------
--------in 子进程 pid = 13884,父进程 pid = 13882---------
结论: 1、进程与线程一样,进程执行没有顺序 2、主进程等待子进程死掉再死   查看进程linux命令
top --按照占用资源的进程倒序排列 
htop --查看几核处理器下运行的进程 
ps -aux --查看当前系统进程
  多进程多任务:主进程死了之后,子进程能够继续执行。 多线程多任务:主线程死了之后,子线程必死。  

三、给进程传递参数

#coding : utf-8

import multiprocessing
import os
import time

def test1(a,b,c,*args,**kwargs):
    print(a)
    print(b)
    print(c)
    print(args)
    print(kwargs)

def main():
    p1 = multiprocessing.Process(target=test1, args=(11,22,33,44,55,66), kwargs={"mm":99})
    p1.start()

if __name__ == '__main__':
    main()
执行结果:
11
22
33
(44, 55, 66)
{'mm': 99}

四、通过队列完成多进程间通信

不同平台(Mac, Win等)对多进程的的处理不一样,multiprocessing支持三种启动进程的方法。这些方法有:

  • spawn
  • fork
  • forkserver

先来一起查看一下源代码声明:当你的系统为Win32时,默认使用spawn方法启动进程,如果不是,则按系统调用get_context(method='fork/spawn/forkserver')方法来获取上下文对象,并声明进程启动方法,或者调用set_start_method(method="fork"),此方法不能被多次调用,具体看以下例子:

if sys.platform != "win32":
    @overload
    def get_context(method: None = ...) -> DefaultContext: ...
    @overload
    def get_context(method: Literal["spawn"]) -> SpawnContext: ...
    @overload
    def get_context(method: Literal["fork"]) -> ForkContext: ...
    @overload
    def get_context(method: Literal["forkserver"]) -> ForkServerContext: ...
    @overload
    def get_context(method: str) -> BaseContext: ...

else:
    @overload
    def get_context(method: None = ...) -> DefaultContext: ...
    @overload
    def get_context(method: Literal["spawn"]) -> SpawnContext: ...
    @overload
    def get_context(method: str) -> BaseContext: ...

如:你的操作系统为MacOS,启动多进程并想要相互通信时,报错:FileNotFoundError: [Errno 2] No such file or directory,在主模块中调用set_start_method()或get_context()解决

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/forkserver.py", line 274, in main
    code = _serve_one(child_r, fds,
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/forkserver.py", line 313, in _serve_one
    code = spawn._main(child_r, parent_sentinel)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory

解决方案1:

# coding : utf-8
import multiprocessing
import time


def down_from_web(q):
    """下载数据"""
    #模拟从网上下载的数据
    datas = [11, 22, 33, 44]
    #向队列当中写入数据
    for temp in datas:
        q.put(temp)
    print(f"---下载器已经下载了数据并且存入到队列中")

def analysis_data(q):
    """数据处理"""
    waiting_analysis_data = list()
    #从队列中获取数据
    while True:
        data = q.get()
        waiting_analysis_data.append(data)
        if q.empty():
            break
    #模拟数据处理
    print(waiting_analysis_data)

def main():
    multiprocessing.set_start_method('fork')
    # 创建一个队列
    q = multiprocessing.Queue()
    #创建多个进程,将队列的引用当做实参进行传递数据
    q1 = multiprocessing.Process(target=down_from_web, args=(q, ))
    q2 = multiprocessing.Process(target=analysis_data, args=(q, ))

    q1.start()
    q1.join()
    q2.start()
    # q2.terminate()


if __name__ == "__main__":
    main()

解决方案2:

# coding : utf-8
import multiprocessing
import time


def down_from_web(q):
    """下载数据"""
    #模拟从网上下载的数据
    datas = [11, 22, 33, 44]
    #向队列当中写入数据
    for temp in datas:
        q.put(temp)
    print(f"---下载器已经下载了数据并且存入到队列中")

def analysis_data(q):
    """数据处理"""
    waiting_analysis_data = list()
    #从队列中获取数据
    while True:
        data = q.get()
        waiting_analysis_data.append(data)
        if q.empty():
            break
    #模拟数据处理
    print(waiting_analysis_data)

def main():
    ctx = multiprocessing.get_context('fork')
    # 创建一个队列
    q = ctx.Queue()
    #创建多个进程,将队列的引用当做实参进行传递数据
    q1 = ctx.Process(target=down_from_web, args=(q, ))
    q2 = ctx.Process(target=analysis_data, args=(q, ))

    q1.start()
    q1.join()
    q2.start()
    # q2.terminate()


if __name__ == "__main__":
    main()
上一篇:linux练习


下一篇:windows 解决端口占用