一、进程的创建-multiprocessing
multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为一个独立的进程,可以执行另外的事情 1、Process语法结构如下:- target: 如果传递了函数的引用,可以任务这个子进程就执行这里的代码
- args: 给target指定的函数传递的参数,以元祖的方式传递
- kwargs: 给target指定的函数传递命名参数
- name: 给进程敲定一个名字,可以不设定
- group: 指定进程组,大多数情况下用不到
- start(): 启动子进程实例(创建子进程)
- is_alive(): 判断进程子进程是否还在活着
- join(timeout): 是否等待子进程执行结束,或等待多少秒
- terminate(): 不管任务是否完成,立即终止子进程
- name :当前进程的别名,默认为Process -N,N为从1开始递增的整数
- pid: 当前进程的pid(进程号)
#coding : utf-8
import multiprocessing
import time
def test1():
while True:
print(f"--------test1---------")
time.sleep(1)
def test2():
while True:
print(f"--------test2--------")
time.sleep(1)
def main():
#创建进程
p1 = multiprocessing.Process(target=test1)
p2 = multiprocessing.Process(target=test2)
#启动进程
p1.start()
p2.start()
#等待进程执行结束
p1.join()
if __name__ == '__main__':
main()
说明:
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动
操作系统的调度算法:
时间片轮转
优先级调度
二、获取进程的Pid
#coding : utf-8
import multiprocessing
import os
import time
def test1():
while True:
print(f"--------in 子进程 pid = {os.getpid()},父进程 pid = {os.getppid()}---------")
time.sleep(1)
def test2():
while True:
print(f"--------in 子进程 pid = {os.getpid()},父进程 pid = {os.getppid()}--------")
time.sleep(1)
def main():
print(f"--------in 主进程 pid = {os.getpid()}")
p1 = multiprocessing.Process(target=test1)
p2 = multiprocessing.Process(target=test2)
p1.start()
p2.start()
if __name__ == '__main__':
main()
运行结果:
--------in 主进程 pid = 13882
--------in 子进程 pid = 13884,父进程 pid = 13882---------
--------in 子进程 pid = 13885,父进程 pid = 13882--------
--------in 子进程 pid = 13884,父进程 pid = 13882---------
--------in 子进程 pid = 13885,父进程 pid = 13882--------
--------in 子进程 pid = 13884,父进程 pid = 13882---------
结论:
1、进程与线程一样,进程执行没有顺序
2、主进程等待子进程死掉再死
查看进程linux命令
top --按照占用资源的进程倒序排列
htop --查看几核处理器下运行的进程
ps -aux --查看当前系统进程
多进程多任务:主进程死了之后,子进程能够继续执行。
多线程多任务:主线程死了之后,子线程必死。
三、给进程传递参数
#coding : utf-8
import multiprocessing
import os
import time
def test1(a,b,c,*args,**kwargs):
print(a)
print(b)
print(c)
print(args)
print(kwargs)
def main():
p1 = multiprocessing.Process(target=test1, args=(11,22,33,44,55,66), kwargs={"mm":99})
p1.start()
if __name__ == '__main__':
main()
执行结果:
11
22
33
(44, 55, 66)
{'mm': 99}
四、通过队列完成多进程间通信
不同平台(Mac, Win等)对多进程的的处理不一样,multiprocessing支持三种启动进程的方法。这些方法有:
- spawn
- fork
- forkserver
先来一起查看一下源代码声明:当你的系统为Win32时,默认使用spawn方法启动进程,如果不是,则按系统调用get_context(method='fork/spawn/forkserver')方法来获取上下文对象,并声明进程启动方法,或者调用set_start_method(method="fork"),此方法不能被多次调用,具体看以下例子:
if sys.platform != "win32":
@overload
def get_context(method: None = ...) -> DefaultContext: ...
@overload
def get_context(method: Literal["spawn"]) -> SpawnContext: ...
@overload
def get_context(method: Literal["fork"]) -> ForkContext: ...
@overload
def get_context(method: Literal["forkserver"]) -> ForkServerContext: ...
@overload
def get_context(method: str) -> BaseContext: ...
else:
@overload
def get_context(method: None = ...) -> DefaultContext: ...
@overload
def get_context(method: Literal["spawn"]) -> SpawnContext: ...
@overload
def get_context(method: str) -> BaseContext: ...
如:你的操作系统为MacOS,启动多进程并想要相互通信时,报错:FileNotFoundError: [Errno 2] No such file or directory,在主模块中调用set_start_method()或get_context()解决
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/forkserver.py", line 274, in main
code = _serve_one(child_r, fds,
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/forkserver.py", line 313, in _serve_one
code = spawn._main(child_r, parent_sentinel)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
解决方案1:
# coding : utf-8
import multiprocessing
import time
def down_from_web(q):
"""下载数据"""
#模拟从网上下载的数据
datas = [11, 22, 33, 44]
#向队列当中写入数据
for temp in datas:
q.put(temp)
print(f"---下载器已经下载了数据并且存入到队列中")
def analysis_data(q):
"""数据处理"""
waiting_analysis_data = list()
#从队列中获取数据
while True:
data = q.get()
waiting_analysis_data.append(data)
if q.empty():
break
#模拟数据处理
print(waiting_analysis_data)
def main():
multiprocessing.set_start_method('fork')
# 创建一个队列
q = multiprocessing.Queue()
#创建多个进程,将队列的引用当做实参进行传递数据
q1 = multiprocessing.Process(target=down_from_web, args=(q, ))
q2 = multiprocessing.Process(target=analysis_data, args=(q, ))
q1.start()
q1.join()
q2.start()
# q2.terminate()
if __name__ == "__main__":
main()
解决方案2:
# coding : utf-8
import multiprocessing
import time
def down_from_web(q):
"""下载数据"""
#模拟从网上下载的数据
datas = [11, 22, 33, 44]
#向队列当中写入数据
for temp in datas:
q.put(temp)
print(f"---下载器已经下载了数据并且存入到队列中")
def analysis_data(q):
"""数据处理"""
waiting_analysis_data = list()
#从队列中获取数据
while True:
data = q.get()
waiting_analysis_data.append(data)
if q.empty():
break
#模拟数据处理
print(waiting_analysis_data)
def main():
ctx = multiprocessing.get_context('fork')
# 创建一个队列
q = ctx.Queue()
#创建多个进程,将队列的引用当做实参进行传递数据
q1 = ctx.Process(target=down_from_web, args=(q, ))
q2 = ctx.Process(target=analysis_data, args=(q, ))
q1.start()
q1.join()
q2.start()
# q2.terminate()
if __name__ == "__main__":
main()