1. multiprocessing
Python
实现多进程的模块最常用的是multiprocessing
,此外还有multiprocess、pathos、concurrent.futures、pp、parallel、pprocess
等模块。
1.1 multiprocessing.Process
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
参数
-
group
: 为预留参数 -
target
:子进程要执行的目标函数 -
name
:线程名称 -
args、kwargs
:参数,args
必须是元组 -
deamon
为bool
值:表示是否为守护进程
实例
# coding=utf-8
import multiprocessing
import time
def run(a):
time.sleep(5)
print(a)
return a * a
if __name__ == '__main__':
p = multiprocessing.Process(target=run, args=(123456,))
p.start() # 运行进程实例
p.join() # 阻塞主进程,当子进程结束后,才会继续执行主进程
print(123)
1.2 multiprocessing.Pool
创建多个子进程最好是采用进程池 multiprocessing.Pool
multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
参数
-
processes
:进程数量,如果processes
是None
那么使用os.cpu_count()
返回的数量 -
initializer
: 如果initializer
不是None
,那么每一个工作进程在开始的时候会调用initializer(*initargs)
-
maxtasksperchild
:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild
默认是None
,意味着只要Pool
存在工作进程就会一直存活 -
context
: 用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()
或者一个context
对象的Pool()
方法来创建一个池,两种方法都适当的设置了context
创建子进程的几种方式
-
apply()
:同步阻塞执行,上一个子进程结束后才能进行下一个子进程(不推荐) -
apply_async()
:异步非阻塞执行,每个子进程都是异步执行的(并行)(推荐) -
map()
:同步阻塞 -
map_async()
:异步非阻塞 -
imap()
:内存不够用可以采用此种方式,速度慢于map()
-
imap_unordered
:imap()
的无序版本(不会按照调用顺序返回,而是按照结束顺序返回),返回迭代器实例
1.2.1 apply
同步阻塞执行,上一个子进程结束后才能进行下一个子进程
apply(func, args=(), kwds={}, callback=None, error_callback=None)
1.2.2 apply_async
异步非阻塞执行,每个子进程都是异步执行的(并行),异步执行指的是一批子进程并行执行,且子进程完成一个,就新开始一个,而不必等待同一批其他进程完成
# callback 回调,error_back 错误回调
apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
示例
# coding=utf-8
import multiprocessing
def callback(result):
"""回调函数"""
with open("result.txt", "a+", encoding="utf-8") as f:
f.write(str(result) + "\n")
def run(num):
return num * num
if __name__ == '__main__':
pool = multiprocessing.Pool(6)
for i in range(1000):
pool.apply_async(run, args=(i,), callback=callback)
# # 如有多个参数,可传一个 iterable
# pool.apply_async(run, args=([i, 123, 456]), callback=callback)
pool.close()
pool.join()
1.2.3 map
若子进程有返回值,且需集中处理,建议采用此种方式(但是它是同步阻塞的):
# iterable 可迭代类型,将 iterable 中每个元素作为参数应用到 func 函数中,返回 list
map(func, iterable, chunksize=None)
1.2.4 map_async
map
的异步非阻塞版本,返回 MapResult
实例,使用 get()
方法,获取结果(list
方法):
map_async(func, iterable, chunksize=None, callback=None, error_callback=None)
apply_async 与 map_async 对比
# coding=utf-8
import multiprocessing
import time
def run(a):
return a * a
data = []
def my_callback(result):
data.append(result)
if __name__ == '__main__':
st = time.time()
pool = multiprocessing.Pool(6)
# 总耗时:0.4497215747833252
future = pool.map_async(run, range(20000))
print(future.get()) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# # 总耗时:3.019148111343384
# for i in range(20000):
# pool.apply_async(run, args=(i,), callback=my_callback)
#
# print(data)
pool.close()
pool.join()
print(f"总耗时:{time.time() - st}")
结论
-
map_async
比apply_async
速度快 - 若想统一处理结果,
map_async
比apply_async
更方便
1.2.5 imap 和 imap_unordered
内存不够可以采用 imap
方式,map
的迭代器版本,返回迭代器实例,速度远慢于 map
,但是堆内存需求小。
imap_unordered
为 imap
的无序版本
imap(func, iterable, chunksize=1)
imap_unordered(func, iterable, chunksize=1)
实例:
# coding=utf-8
import multiprocessing
import time
def run(a):
return a * a
data = []
def my_callback(result):
data.append(result)
if __name__ == '__main__':
st = time.time()
pool = multiprocessing.Pool(6)
# # 总耗时:0.4497215747833252
# future = pool.map_async(run, range(20000))
# print(future.get()) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# # 总耗时:3.019148111343384
# for i in range(20000):
# pool.apply_async(run, args=(i,), callback=my_callback)
#
# print(data)
future = pool.imap(run, range(20000)) # 总耗时:4.171960115432739
print(future)
for i in future:
print(i)
pool.close()
pool.join()
print(f"总耗时:{time.time() - st}") # 总耗时:0.4497215747833252
1.2.6 starmap 和 starmap_async
starmap
可以使子进程活动接收多个参数,而 map
只能接收一个参数:
# 子进程活动 func允许包含多个参数,也即iterable的每个元素也是iterable(其每个元素作为func的参数),返回结果组成的 list
starmap(func, iterable, chunksize=None)
# 异步并行版本,返回 MapResult 实例,get() 方法可以获取结果组成的 list
starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)
# 使用方式
pool.starmap_async(f, ((a0, b0), (a1, b1), ...)).get()
1.3 进程间通信(数据共享)
每个进程是相互独立的,内存无法共享,实现进程间数据共享的方式有:
-
multiprocessing.Value(typecode_or_type, *args, lock=True)
:共享单个数据,共享内存 -
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
:共享数组,共享内存 -
multiprocessing.Manager()
:共享进程,支持多种数据结构的数据共享
Manager
支持的类型有:list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value
和Array
,不仅可以在本地进程间共享,甚至可以在多客户端实现网络共享,不过 Manager
占用资源较大。
1、共享 dict
:
# coding=utf-8
# 多个进程将数据添加到字典 dd 中
import multiprocessing
def run(d, k, v):
d[k] = v
if __name__ == '__main__':
pool = multiprocessing.Pool(6)
manager = multiprocessing.Manager()
dd = manager.dict()
for i in range(20):
future = pool.apply_async(run, args=(dd, i, i * i))
pool.close()
pool.join()
print(dict(dd))
# 运行结果
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 8: 64, 7: 49, 9: 81, 10: 100, 11: 121, 12: 144, 13: 169, 14: 196, 15: 225, 16: 256, 17: 289, 18: 324, 19: 361}
2、管理队列,并让不同的进程可以访问它:
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
m = multiprocessing.Manager()
q = m.Queue()
for i in range(20):
pool.apply_async(worker, (i, q))
pool.close()
pool.join()
# coding=utf-8
import multiprocessing
def write(name, que):
que.put("%d is done" % name)
print(f'{name} write done!')
def read(que):
while not que.empty():
val = que.get(True)
print('read===>: ', val)
# while True:
# if not que.empty():
# val = que.get(True)
# print('read===>: ', val)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
m = multiprocessing.Manager()
q = m.Queue()
for i in range(20):
pool.apply_async(write, (i, q))
p1 = multiprocessing.Process(target=read, args=(q,))
p1.start()
p1.join()
pool.close()
pool.join()
注意:在操作共享对象元素时,除了赋值操作,其他的方法都作用在共享对象的拷贝上,并不会对共享对象生效。比如:
dic['k'] = []; dic['k'].append(x)
,将不会修改dic
的内容
1.4 进程间通信(数据传递)
- 队列
-
multiprocessing.Queue(maxsize=0)
:建立共享的队列实例 -
multiprocessing.JoinableQueue(maxsize=0)
:建立可阻塞的队列实例
-
- 管道
-
multiprocessing.Pipe(duplex=True)
:建立一对管道对象,用于在两个进程之间传递数据
-
2. concurrent.futures 模块
concurrent.futures
是 3.2
中引入的新模块,它为异步执行可调用对象提供了高层接口,分为两类:
-
ThreadPoolExecutor
:多线程编程 -
ProcessPoolExecutor
:多进程编程
两者实现了同样的接口,这些接口由抽象类 Executor
定义;这个模块提供了两大类型:
-
Executor
:执行器,用于管理工作池 -
Future
:管理工作计算出的结果
2.1 concurrent.futures.Executor 类
提供了一系列方法,可以用于异步执行调用,定义的方法有:
# 调用对象执行,fn(*args, **kwargs),返回 Future 对象,可用 future.result() 获取执行结果
submit(fn, *args, **kwargs)
# 异步执行 func,并支持多次并发调用,返回一个迭代器
# timeout 秒数可以是浮点数或者整数,如果设置为 None 或者不指定,则不限制等待时间
# ProcessPoolExecutor 这个方法将 iterables 划分为多块,作为独立的任务提交到进程池(不是 1)可显著提升性能,ThreadPoolExecutor,chunksize 不起作用
map(func, *iterables, timeout=None, chunksize=1)
# 告诉当执行器 executor 在当前所有等待的 future 对象运行完毕后,应该释放执行器用到的所有资源
# True 会等待所有 future 执行完毕,且 executor 的资源都释放完会才会返回,False 会立即返回,executor 的资源会在 future 执行完后释放
shutdown(wait=True)
2.2 ThreadPoolExecutor
ThreadPoolExecutor + requests 并发执行
# coding=utf-8
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch(req_url):
r = requests.get(req_url)
return r.json()['args']['a']
if __name__ == '__main__':
start = time.time()
numbers = range(12)
url = 'http://httpbin.org/get?a={}'
# submit() 方式
with ThreadPoolExecutor(max_workers=3) as executor:
# task_list = [executor.submit(fetch(url.format(n))) for n in range(12)]
task_list = [executor.submit(fetch, url.format(n)) for n in range(12)]
for future in as_completed(task_list):
print(future.result())
# data = future.result() # 总耗时:2.903249740600586
# print(data)
## map() 方式
# with ThreadPoolExecutor(max_workers=3) as executor:
# future = executor.map(fetch, (url.format(n) for n in range(12)))
#
# for result in future:
# print(result)
print(f'总耗时:{time.time() - start}') # 总耗时:2.630300760269165
实测
submit
未按顺序返回结果
2.3 ProcessPoolExecutor
ProcessPoolExecutor
使用进程池来异步执行调用,适合计算密集型任务,方法参数:
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
示例:
# coding=utf-8
from concurrent.futures import ProcessPoolExecutor, as_completed
def fib(n):
if n <= 2:
return 1
return fib(n - 1) + fib(n - 2)
if __name__ == '__main__':
numbers = range(20)
with ProcessPoolExecutor(max_workers=3) as executor:
# # map 方式
# for num, result in zip(numbers, executor.map(fib, numbers)):
# print(f"{num}====>{result}")
# submit 方式
work_dict = {executor.submit(fib, i): i for i in numbers}
for future in as_completed(work_dict):
num = work_dict[future]
try:
data = future.result()
except Exception as e:
print(e)
else:
print(f"fib({num} = {data})")
2.4 Future 类
Future
类封装了可调用对象的异步执行,由 Executor.submit()
产生,有如下方法:
-
cancel()
:尝试取消调用,如果该调用正在执行中,无法取消,本方法返回False
,其他情况下调用会被取消,并返回True
;只有当任务提交了还没执行才可以通过这种方式取消 -
cancelled()
: 如果调用已经被成功取消,返回True
-
running()
:如果调用正在执行,无法被取消,则返回True
-
done()
:如果调用成功被取消或者已经执行完毕,返回 True -
result(timeout=None)
: 返回调用的返回值。如果调用还没有完成,则最多等待timeout
秒。如果timeout
秒之后还没有完成,抛出concurrent.futures.TimeoutError``。timeout
可以为整数或者浮点数。如果不指定或者为None,则不限制等待时间。如果
future 在完成之前被取消了,会抛出CancelledError
异常,如果调用抛出异常,这个方法会抛出同样的异常。同时它也会阻塞直到任务完成,获取被取消 -
exception(timeout=None)
:返回被调用抛出的异常,如果调用还没有执行完毕,则最多等待timeout
秒。如果timeout
秒之后还没有完成,抛出concurrent.futures.TimeoutError
。timeout
可以为整数或者浮点数。如果不指定或者为None
,则不限制等待时间。
如果future
在完成之前被取消了,会抛出CancelledError
异常,如果调用完成并且没有抛出异常,返回None
-
add_done_callback(fn)
:为future
附加可调用对象fn
,当future
运行完毕或者被取消时,它会被用作fn
的唯一参数,并调用fn
。可调用对象按照添加顺序依次调用,并且总是在添加时所处进程的一个线程内调用它。如果该可调用对象抛出了属于Exception
子类的异常,它会被记录并忽略。如果它抛出了属于BaseException
子类的异常,该行为未定义。
如果future
已经完成或者已经取消,fn
会被立即调用
通过 add_done_callack() 获取返回值和捕获异常
concurrent.futuresthread.ProcessPoolExecutor
线程池中的 worker
引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用 concurrent.futures.Future.exception(timeout=None)
方法主动获取 worker
的异常:
# coding=utf-8
from concurrent.futures import ProcessPoolExecutor, as_completed
def fib(n):
if n <= 2:
return 1
return fib(n - 1) + fib(n - 2)
def call_back(future):
"""
回调(可获取多进程返回值、错误)
:param future: future 对象
:return:
"""
# 获取错误信息
worker_exception = future.exception()
if worker_exception:
print(worker_exception)
# 获取返回值
print(future.result())
def test(n):
if n % 2 == 0:
n / 0 # 发生异常
return n * 2
if __name__ == '__main__':
numbers = range(20)
with ProcessPoolExecutor(max_workers=3) as executor:
# # map 方式
# for num, result in zip(numbers, executor.map(fib, numbers)):
# print(f"{num}====>{result}")
# submit 方式
# work_dict = {executor.submit(fib, i): i for i in numbers}
# for future in as_completed(work_dict):
# num = work_dict[future]
# try:
# data = future.result()
# except Exception as e:
# print(e)
# else:
# print(f"fib({num} = {data})")
# 其他方法
for i in numbers:
executor.submit(test, i).add_done_callback(call_back)
实测
map()
方式提交的会触发异常,submit()
方式需要通过add_done_callback()
主动捕获异常!
参考文章
https://blog.csdn.net/jpch89/article/details/87643972
https://blog.csdn.net/makingLJ/article/details/98084973
3. 实例
3.1 多进程(池)向同一文件写入数据
# coding=utf-8
"""
Function:回调函数解决多进程向同一文件写入数据
"""
import multiprocessing
def callback(result):
"""回调函数"""
with open("result.txt", "a+", encoding="utf-8") as f:
f.write(str(result) + "\n")
def run(num):
return num * num
if __name__ == '__main__':
pool = multiprocessing.Pool(6)
for i in range(1000):
pool.apply_async(run, args=(i,), callback=callback)
pool.close()
pool.join()