Multiprocessing
Multiprocessing | 进程是什么
To be continued
Last Modified Date: 2022/1/2
总览
概念
进程是什么?
通俗地理解就是一个运行的程序或者软件,进程是操作系统资源分配的基本单位
一个程序至少有一个进程,一个进程至少有一个线程,多进程可以完成多任务。
工作中,任务数往往大于cpu的核数,即一定有一些任务正在执行,而另外一些任务在等待cpu进行执行,因此导致了有了不同的状态。
比如initial
,started
,stopped
等等。
创建子进程其实就是对主进程资源的拷贝。
主进程会等待所有的子进程执行完成程序再退出。
主进程会等待所有的子进程执行完成程序再退出
对象
创建一个子进程对象并命名为Subprocess
参数
Subprocess = multiprocessing.Process(group=None,
target=None,
name=None,
arg=(),
kwargs={},
*,
daemon=None)
参数 | 含义 |
---|---|
group | ? |
target | 子进程会调用的函数 |
name | 子进程的命名 |
args | 传入子进程的以元组为形式的参数 |
kwargs | 传入子进程的以字典为形式的参数 |
daemon | 是否为为主进程的守护线程 |
信息
信息 | 方法 |
---|---|
进程信息 |
multiprocessing.current_process() 子进程: Subprocess
|
进程id | 主进程:multiprocessing.parent_process().pid os.getppid() 子进程: multiprocessing.current_process().pid os.getpid()
|
进程状态 |
multiprocessing.current_process().is_alive() Subprocess.is_alive()
|
方法
方法 | 用途 |
---|---|
is_alive() |
判断进程是否还在alive状态: 如果是,返回 True ; 如果否,返回 False
|
join() |
阻塞调用该方法的进程,直到其结束,主进程再结束。 |
使用
进程如何使用?
创建子进程
此处举一个简单的例子: 创建2个分别名为A
和B
的子线程, 2个子线程在各自的时间限制内完成相应的工作。
import multiprocessing
import time
import datetime
def func1(startTime):
'''
:param startTime: func 1 start time
within 10s, print info per 2s.
:return:
'''
i = 0
while time.time() - startTime < 10.0:
print(datetime.datetime.now(), multiprocessing.current_process().name,
'is Alive:', multiprocessing.current_process().is_alive(), i+1)
i += 1
time.sleep(2)
def func2(startTime):
'''
:param startTime: fun 2 start time
within 30s, print info per 5s.
:return:
'''
i = 0
while time.time() - startTime < 30.0:
print(datetime.datetime.now(), multiprocessing.current_process().name,
'is Alive:', multiprocessing.current_process().is_alive(), i+1)
i += 1
time.sleep(5)
if __name__ == "__main__":
startTime = time.time()
ProA = multiprocessing.Process(target=func1, name='A', args=(startTime, ))
ProB = multiprocessing.Process(target=func2, name='B', args=(startTime, ))
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
print(ProA)
print(ProB)
ProA.start()
ProB.start()
print('*'*10, 'Since Now Start 2 Sub Processes')
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
print(ProA)
print(ProB)
###########################################################
2022-01-01 11:15:03.043149 A is Alive: False
2022-01-01 11:15:03.043205 B is Alive: False
<Process name='A' parent=71015 initial>
<Process name='B' parent=71015 initial>
********** Since Now Start 2 Sub Processes
2022-01-01 11:15:03.052383 A is Alive: True
2022-01-01 11:15:03.052439 B is Alive: True
<Process name='A' pid=71017 parent=71015 started>
<Process name='B' pid=71018 parent=71015 started>
2022-01-01 11:15:03.074236 B is Alive: True 1
2022-01-01 11:15:03.074280 A is Alive: True 1
2022-01-01 11:15:05.079339 A is Alive: True 2
2022-01-01 11:15:07.079945 A is Alive: True 3
2022-01-01 11:15:08.079214 B is Alive: True 2
2022-01-01 11:15:09.083995 A is Alive: True 4
2022-01-01 11:15:11.087975 A is Alive: True 5
2022-01-01 11:15:13.084359 B is Alive: True 3
2022-01-01 11:15:18.089441 B is Alive: True 4
2022-01-01 11:15:23.090834 B is Alive: True 5
2022-01-01 11:15:28.094172 B is Alive: True 6
Process finished with exit code 0
进程间
非共享
全局变量
进程间不共享全局变量。此处举一个简单的例子,全局变量gv
为一个列表,创建2个分别名为A
和B
的子线程, A
向列表gv
中添加元素,B
在A
结束后读取列表gv
import multiprocessing
import time
import datetime
# global variance gv
gv = []
# define target func for sub process
def func1():
for i in range(10):
gv.append(i)
time.sleep(0.1)
print(datetime.datetime.now(), multiprocessing.current_process().name, gv)
def func2():
print(datetime.datetime.now(), multiprocessing.current_process().name, gv)
if __name__ == "__main__":
ProA = multiprocessing.Process(target=func1, name='A')
ProB = multiprocessing.Process(target=func2, name='B')
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProA.start()
print('*'*10, 'Since Now Start Subprocess A')
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProA.join()
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProB.start()
print('*'*10, 'Since Now Start Subprocess B')
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
###########################################################
2022-01-01 13:25:55.476295 A is Alive: False
2022-01-01 13:25:55.476317 B is Alive: False
********** Since Now Start Subprocess A
2022-01-01 13:25:55.481328 A is Alive: True
2022-01-01 13:25:55.481381 B is Alive: False
2022-01-01 13:25:56.547771 A [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2022-01-01 13:25:56.553970 A is Alive: False
2022-01-01 13:25:56.554019 B is Alive: False
********** Since Now Start Subprocess B
2022-01-01 13:25:56.555426 A is Alive: False
2022-01-01 13:25:56.555611 B is Alive: True
2022-01-01 13:25:56.588454 B []
Process finished with exit code 0
通信
队列
可以使用Queue
实现多进程之间的数据传递,Queue本身是一个消息队列程序。
from multiprocessing import Process, Queue, current_process
import datetime
def func1(q):
for i in range(5):
if not q.full():
print('Put %s to queue...' % i)
q.put(i)
else:
print(current_process().name, 'is Full', current_process())
def func2(q):
while True:
if not q.empty():
i = q.get(True)
print('Get %s from queue.' % i)
else:
break
if __name__=='__main__':
q = Queue(2)
ProA = Process(target=func1, name='A', args=(q,))
ProB = Process(target=func2, name='B', args=(q,))
ProA.daemon = True
ProB.daemon = True
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProA.start()
print('*' * 10, 'Since Now Start Subprocess A')
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProA.join()
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProB.start()
print('*' * 10, 'Since Now Start Subprocess B')
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProB.join()
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
###########################################################
2022-01-01 15:49:42.447990 A is Alive: False
2022-01-01 15:49:42.448040 B is Alive: False
********** Since Now Start Subprocess A
2022-01-01 15:49:42.449512 A is Alive: True
2022-01-01 15:49:42.449565 B is Alive: False
Put 0 to queue...
Put 1 to queue...
A is Full <Process name='A' parent=74585 started daemon>
A is Full <Process name='A' parent=74585 started daemon>
A is Full <Process name='A' parent=74585 started daemon>
2022-01-01 15:49:42.487308 A is Alive: False
2022-01-01 15:49:42.487346 B is Alive: False
********** Since Now Start Subprocess B
2022-01-01 15:49:42.488337 A is Alive: False
2022-01-01 15:49:42.488456 B is Alive: True
Get 0 from queue.
Get 1 from queue.
2022-01-01 15:49:42.516255 A is Alive: False
2022-01-01 15:49:42.516273 B is Alive: False
Process finished with exit code 0
如果此时把q = Queue(2)
改为q = Queue(10)
,则可以得到以下结果,
###########################################################
2022-01-01 15:53:25.808938 A is Alive: False
2022-01-01 15:53:25.809006 B is Alive: False
********** Since Now Start Subprocess A
2022-01-01 15:53:25.810559 A is Alive: True
2022-01-01 15:53:25.810614 B is Alive: False
Put 0 to queue...
Put 1 to queue...
Put 2 to queue...
Put 3 to queue...
Put 4 to queue...
2022-01-01 15:53:25.852826 A is Alive: False
2022-01-01 15:53:25.852879 B is Alive: False
********** Since Now Start Subprocess B
2022-01-01 15:53:25.854026 A is Alive: False
2022-01-01 15:53:25.854077 B is Alive: True
Get 0 from queue.
Get 1 from queue.
Get 2 from queue.
Get 3 from queue.
Get 4 from queue.
2022-01-01 15:53:25.903884 A is Alive: False
2022-01-01 15:53:25.903918 B is Alive: False
Process finished with exit code 0
如果要塞进
q
的msg数量比初始化Queue
时填入的数量多,应该以q.full()
作为一个判断条件限制填入q
的数量。
进程池
池子里面放的是进程,进程池会根据任务执行情况自动创建进程,而且尽量少创建进程,合理利用进程池中的进程完成多任务。
当需要创建的子进程数量不多时,可以直接利用multiprocess中的Process动态生成多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocess模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。
使用进程池创建的进程是守护主进程的状态,默认自己通过Process创建的进程是不守护主进程的状态
通信
from multiprocessing import Manager, Pool, current_process, parent_process
import os
def func2(q):
print(current_process().name, os.getpid(), 'start')
for i in range(q.qsize()):
print("get from q:%s" % q.get(True))
def func1(q):
print(current_process().name, os.getpid(), 'start')
msg = ['A', 'B', 'C', 'D', 'E']
for i in msg:
print("put in p:%s" % i)
q.put(i)
if __name__ == "__main__":
print(current_process().name, os.getpid(), 'start')
q = Manager().Queue()
po = Pool()
po.apply(func1, (q,))
po.apply(func2, (q,))
po.close()
po.join()
print(current_process().name, os.getpid(), 'end')
###########################################################
MainProcess 80712 start
SpawnPoolWorker-2 80715 start
put in p:A
put in p:B
put in p:C
put in p:D
put in p:E
SpawnPoolWorker-6 80719 start
get from q:A
get from q:B
get from q:C
get from q:D
get from q:E
MainProcess 80712 end
Process finished with exit code 0
同步
进程池同步执行任务表示进程池中的进程在执行任务的时候一个执行完成另外一个才能执行,如果没有执行完会等待上一个进程执行。
在下面的示例中,进程池数量被限制在3,但总共需要调用func
5次。同一时间会开启3个进程,3个进程先后调用func
并且每个仅调用一次,不总是维持在3个进程,直到累计调用func
5次后同时结束3个进程。
from multiprocessing import Pool, current_process
import os, time, datetime
def func():
print(datetime.datetime.now(), current_process().name, current_process().pid, 'start;',
'current process count', len(psutil.pids()))
time.sleep(2)
print(datetime.datetime.now(), current_process().name, current_process().pid, 'end')
if __name__ == '__main__':
print(current_process().name, os.getpid(), 'start;', 'current process count', len(psutil.pids()))
pool = Pool(3)
for i in range(5):
pool.apply(func)
pool.close()
pool.join()
print(current_process().name, os.getpid(), 'end;', 'current process count', len(psutil.pids()))
###########################################################
MainProcess 82913 start; current process count 427
2022-01-02 13:34:59.638308 SpawnPoolWorker-1 82915 start; current process count 431
2022-01-02 13:35:01.643672 SpawnPoolWorker-1 82915 end
2022-01-02 13:35:01.646509 SpawnPoolWorker-2 82916 start; current process count 431
2022-01-02 13:35:03.652383 SpawnPoolWorker-2 82916 end
2022-01-02 13:35:03.654136 SpawnPoolWorker-3 82917 start; current process count 431
2022-01-02 13:35:05.657574 SpawnPoolWorker-3 82917 end
2022-01-02 13:35:05.658831 SpawnPoolWorker-1 82915 start; current process count 428
2022-01-02 13:35:07.662861 SpawnPoolWorker-1 82915 end
2022-01-02 13:35:07.664195 SpawnPoolWorker-2 82916 start; current process count 427
2022-01-02 13:35:09.670667 SpawnPoolWorker-2 82916 end
MainProcess 82913 end; current process count 424
Process finished with exit code 0
异步
进程池异步执行任务表示进程池中的进程同时执行任务,进程之间不会等待。
在下面的示例中,进程池进程数量被限制在3,但总共需要调用func
5次。同一时间会开启3个进程,每个调用func
一次,每结束1个的同时并开启一个,总是维持在3个进程,直到累计调用func
5次后同时结束3个进程。
from multiprocessing import Pool, current_process
import os, datetime, time, psutil
def func():
print(datetime.datetime.now(), current_process().name, current_process().pid, 'start;',
'current process count', len(psutil.pids()))
time.sleep(2)
print(datetime.datetime.now(), current_process().name, current_process().pid, 'end')
if __name__ == '__main__':
print(current_process().name, os.getpid(), 'start;', 'current process count', len(psutil.pids()))
pool = Pool(3)
for i in range(5):
pool.apply_async(func)
pool.close()
pool.join()
print(current_process().name, os.getpid(), 'end;', 'current process count', len(psutil.pids()))
###########################################################
MainProcess 82636 start; current process count 424
2022-01-02 13:18:42.792101 SpawnPoolWorker-1 82638 start; current process count 428
2022-01-02 13:18:42.802557 SpawnPoolWorker-2 82639 start; current process count 428
2022-01-02 13:18:42.803372 SpawnPoolWorker-3 82640 start; current process count 428
2022-01-02 13:18:44.797576 SpawnPoolWorker-1 82638 end
2022-01-02 13:18:44.798311 SpawnPoolWorker-1 82638 start; current process count 428
2022-01-02 13:18:44.807925 SpawnPoolWorker-2 82639 end
2022-01-02 13:18:44.808555 SpawnPoolWorker-3 82640 end
2022-01-02 13:18:44.808436 SpawnPoolWorker-2 82639 start; current process count 428
2022-01-02 13:18:46.802475 SpawnPoolWorker-1 82638 end
2022-01-02 13:18:46.813188 SpawnPoolWorker-2 82639 end
MainProcess 82636 end; current process count 425
Process finished with exit code 0
参考链接
写此文时有参考以下链接
#Python干货#Python网络编程——进程
queue — 一个同步的队列类
进程间
通信
队列
进程池
通信
同步、异步
Python之psutil,查看CPU、内存、网络等使用情况