内容简述:
- 1、multiprocess模块详解
1、multiprocess模块详解
Python的os模块封装了常见的系统调用,其中就包含 「 fork函数」,通过这个函数可以轻松的创建子进程,但是要注意一点,在Windows系统上是无法使用fork函数的,Python为我们提供了可跨平台的 multiprocess 模块。该模块提供了一个 Process类 来代表一个进程对象,用法和Thread非常相似。
① Process进程对象
创建一个进程的代码示例如下:
from multiprocessing import Process import os def show_msg(name): print("子进程运行中:name = %s , pid = %d " % (name, os.getpid())) if __name__ == '__main__': print("父进程 %d" % os.getpid()) p = Process(target=show_msg, args=('测试',)) print("开始执行子进程~") p.start() p.join() print("子进程执行完毕!") 复制代码
运行结果如下:
父进程 26539 开始执行子进程~ 子进程运行中:name = 测试 , pid = 26540 子进程执行完毕! 复制代码
Process构造函数:
Process(group=None, target=None, name=None, args=(), kwargs={}) 复制代码
参数详解:
- group :分组,很少用到
- target :调用对象,传入任务执行函数作为参数
- name :进程别名
- args :给调用对象以元组的形式提供参数,比如有两个参数args=(a,b),如果只有一个参数,要这样写args=(a,),不能把逗号漏掉,不然会被当做括号运算符使用!
- kwargs :调用对象的关键字参数字典
Process的常用函数:
- is_alive ():判断进程实例是否还在执行;
- join ([timeout]):是否等待进程实例执行结束,或等待多少秒;
- start ():启动进程实例(创建子进程);
-
run ():如果没有给定target参数,对这个对象调用start()方法时,
就将执行对象中的run()方法; - terminate ():不管任务是否完成,立即终止;
除了使用fork函数和上述操作创建进程的方式外,还可以自定义一个Process类,重写 __init__
和 run函数
即可,代码示例如下:
from multiprocessing import Process import os class MyProcess(Process): def __init__(self, name): Process.__init__(self) self.msg = name def run(self): print("子进程运行中:name = %s , pid = %d " % (self.msg, os.getpid())) if __name__ == '__main__': print("父进程 %d" % os.getpid()) p = MyProcess('测试') print("开始执行子进程~") p.start() p.join() print("子进程执行完毕!") 复制代码
运行结果如下:
父进程 26794 开始执行子进程~ 子进程运行中:name = 测试 , pid = 26795 子进程执行完毕! 复制代码
② Pool进程池
知道了如何创建进程,那么实现多进程有不是什么难事了,一个循环创建多个即可,但是有个问题,进程可是重量级别的程序,重复进程创建和销毁会造成一定的性能开销! Python为我们提供了一个 进程池对象Pool用来缓解进程重复关启带来的性能消耗问题 。在创建进程池的时候指定一个容量,如果接收到一个新任务,而池没满的话,会创建一个新的进程来执行这个任务,如果池满的话,任务则会进入等待状态,直到池中有进程结束,才会创建新的进程来执行这个任务。
Pool的构造函数:
Pool(processes=None, initializer=None, initargs=(),maxtasksperchild=None, context=None) 复制代码
一般只用到第一个参数,processes用于设置进程池的容量,即最多并发的进程数量,如果不写默认使用 os.cpu_count()
返回的值。
Pool常用函数详解:
-
apply (func, args=(), kwds={}):使用堵塞方式调用func,堵塞的意思是一个进程结束,
释放回进程池,下一个进程才可以开始,args为传递给func的参数列表,kwds为传递给
func的关键字参数列表,该方法在Python 2.3后就不建议使用了。 - apply_async (func, args=(), kwds={}, callback=None,error_callback=None) :使用非阻塞方式调用func,进程池进程最大数可以同时执行,还支持返回结果后进行回调。
- close ():关闭进程池,使其不再接受新的任务;
- terminate ():结束工作进程,不再处理未处理的任务,不管任务是否完成,立即终止;
- join ():主进程阻塞,等待⼦进程的退出,必须在close或terminate之后使用;
- map (func, iterable, chunksize=None):这里的map函数和Python内置的高阶函数map类似,只是这里的map方法是在进程池多进程并发进行的,接收一个函数和可迭代对象,把函数作用到每个元素,得到一个新的list返回。
最简单的进程池代码示例如下:
import multiprocessing as mp import time def func(msg): time.sleep(1) print(mp.current_process().name + " : " + msg) if __name__ == '__main__': pool = mp.Pool() for i in range(20): msg = "Do Something %d" % (i) pool.apply_async(func, (msg,)) pool.close() pool.join() print("子进程执行任务完毕!") 复制代码
运行结果如下:
ForkPoolWorker-4 : Do Something 3 ForkPoolWorker-2 : Do Something 1 ForkPoolWorker-1 : Do Something 0 ForkPoolWorker-3 : Do Something 2 ForkPoolWorker-5 : Do Something 4 ForkPoolWorker-6 : Do Something 5 ForkPoolWorker-7 : Do Something 6 ForkPoolWorker-8 : Do Something 7 ForkPoolWorker-2 : Do Something 9 ForkPoolWorker-4 : Do Something 8 ForkPoolWorker-1 : Do Something 11 ForkPoolWorker-7 : Do Something 12 ForkPoolWorker-5 : Do Something 13 ForkPoolWorker-6 : Do Something 14 ForkPoolWorker-3 : Do Something 10 ForkPoolWorker-8 : Do Something 15 ForkPoolWorker-6 : Do Something 19 ForkPoolWorker-1 : Do Something 17 ForkPoolWorker-5 : Do Something 18 ForkPoolWorker-7 : Do Something 16 子进程执行任务完毕! 复制代码
上面的输出结果顺序并没有按照循环中的顺序输出,可以利用 apply_async
的返回值是:被进程调用的函数的返回值,来规避,修改后的代码如下:
import multiprocessing as mp import time def func(msg): time.sleep(1) return mp.current_process().name + " : " + msg if __name__ == '__main__': pool = mp.Pool() results = [] for i in range(20): msg = "Do Something %d" % i results.append(pool.apply_async(func, (msg,))) pool.close() pool.join() for result in results: print(result.get()) print("子进程执行任务完毕!") 复制代码
运行结果如下:
ForkPoolWorker-1 : Do Something 0 ForkPoolWorker-2 : Do Something 1 ForkPoolWorker-3 : Do Something 2 ForkPoolWorker-4 : Do Something 3 ForkPoolWorker-5 : Do Something 4 ForkPoolWorker-7 : Do Something 6 ForkPoolWorker-6 : Do Something 5 ForkPoolWorker-8 : Do Something 7 ForkPoolWorker-1 : Do Something 8 ForkPoolWorker-2 : Do Something 9 ForkPoolWorker-4 : Do Something 11 ForkPoolWorker-3 : Do Something 10 ForkPoolWorker-7 : Do Something 12 ForkPoolWorker-8 : Do Something 13 ForkPoolWorker-5 : Do Something 14 ForkPoolWorker-6 : Do Something 15 ForkPoolWorker-1 : Do Something 16 ForkPoolWorker-2 : Do Something 17 ForkPoolWorker-4 : Do Something 18 ForkPoolWorker-3 : Do Something 19 子进程执行任务完毕! 复制代码
感觉还是有点模糊,通过一个多进程统计目录下文件的行数和字符个数的脚本来巩固,代码示例如下:
import multiprocessing as mp import time import os result_file = 'result.txt' # 统计结果写入文件名 # 获得路径下的文件列表 def get_files(path): file_list = [] for file in os.listdir(path): if file.endswith('py'): file_list.append(os.path.join(path, file)) return file_list # 统计每个文件中函数与字符数 def get_msg(path): with open(path, 'r', encoding='utf-8') as f: content = f.readlines() f.close() lines = len(content) char_count = 0 for i in content: char_count += len(i.strip("\n")) return lines, char_count, path # 将数据写入到文件中 def write_result(result_list): with open(result_file, 'a', encoding='utf-8') as f: for result in result_list: f.write(result[2] + " 行数:" + str(result[0]) + " 字符数:" + str(result[1]) + "\n") f.close() if __name__ == '__main__': start_time = time.time() file_list = get_files(os.getcwd()) pool = mp.Pool() result_list = pool.map(get_msg, file_list) pool.close() pool.join() write_result(result_list) print("处理完毕,用时:", time.time() - start_time) 复制代码
运行结果如下:
# 控制台输出 处理完毕,用时: 0.13662314414978027 # result.txt文件内容 /Users/jay/Project/Python/Book/Chapter 11/11_4.py 行数:33 字符数:621 /Users/jay/Project/Python/Book/Chapter 11/11_1.py 行数:32 字符数:578 /Users/jay/Project/Python/Book/Chapter 11/11_5.py 行数:52 字符数:1148 /Users/jay/Project/Python/Book/Chapter 11/11_13.py 行数:20 字符数:333 /Users/jay/Project/Python/Book/Chapter 11/11_16.py 行数:62 字符数:1320 /Users/jay/Project/Python/Book/Chapter 11/11_12.py 行数:23 字符数:410 /Users/jay/Project/Python/Book/Chapter 11/11_15.py 行数:48 字符数:1087 /Users/jay/Project/Python/Book/Chapter 11/11_8.py 行数:17 字符数:259 /Users/jay/Project/Python/Book/Chapter 11/11_11.py 行数:18 字符数:314 /Users/jay/Project/Python/Book/Chapter 11/11_10.py 行数:46 字符数:919 /Users/jay/Project/Python/Book/Chapter 11/11_14.py 行数:20 字符数:401 /Users/jay/Project/Python/Book/Chapter 11/11_9.py 行数:31 字符数:623 /Users/jay/Project/Python/Book/Chapter 11/11_2.py 行数:32 字符数:565 /Users/jay/Project/Python/Book/Chapter 11/11_6.py 行数:23 字符数:453 /Users/jay/Project/Python/Book/Chapter 11/11_7.py 行数:37 字符数:745 /Users/jay/Project/Python/Book/Chapter 11/11_3.py 行数:29 字符数:518 复制代码
③ 进程间共享数据
涉及到了多个进程,不可避免的要处理进程间数据交换问题,多进程不像多线程, 不同进程之间内存是不共享的 ,multiprocessing模块提供了四种进程间共享数据的方式: Queue , Value和Array , Manager.dict和pipe 。下面一一介绍这四种方式的具体用法。
- 1.Queue队列
多进程安全的队列,put方法用以插入数据到队列中,put方法有两个可选参数: blocked 和 timeout 。 若blocked为True (默认)且 timeout为正值 ,该方法 会阻塞timeout指定的时间 ,直到该队列有剩余的空间。如果超时,会抛出 Queue.Full 异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常,而get方法则从队列读取并且删除一个元素,参数规则同抛出的一场是Queue.Empty。另外Queue不止适用于进程通信,也适用于线程,顺道写一个比较单线程,多线程
和多进程的运行效率对比示例,具体代码如下:
import threading as td import multiprocessing as mp import time def do_something(queue): result = 0 for i in range(100000): result += i ** 2 queue.put(result) # 单线程 def normal(): result = 0 for _ in range(3): for i in range(100000): result += i ** 2 print("单线程处理结果:", result) # 多线程 def multi_threading(): q = mp.Queue() t1 = td.Thread(target=do_something, args=(q,)) t2 = td.Thread(target=do_something, args=(q,)) t3 = td.Thread(target=do_something, args=(q,)) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() print("多线程处理结果:", (q.get() + q.get() + q.get())) # 多进程 def multi_process(): q = mp.Queue() p1 = mp.Process(target=do_something, args=(q,)) p2 = mp.Process(target=do_something, args=(q,)) p3 = mp.Process(target=do_something, args=(q,)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() print("多进程处理结果:", (q.get() + q.get() + q.get())) if __name__ == '__main__': start_time_1 = time.time() normal() start_time_2 = time.time() print("单线程处理耗时:", start_time_2 - start_time_1) multi_threading() start_time_3 = time.time() print("多线程处理耗时:", start_time_3 - start_time_2) multi_process() start_time_4 = time.time() print("多继承处理耗时:", start_time_4 - start_time_3) 复制代码
运行结果如下:
单线程处理结果: 999985000050000 单线程处理耗时: 0.10726284980773926 多线程处理结果: 999985000050000 多线程处理耗时: 0.13849401473999023 多进程处理结果: 999985000050000 多继承处理耗时: 0.041596174240112305 复制代码
从上面的结果可以明显看出在处理CPU密集型任何时,多进程更优。
- 2.Value和Array
两者是通过「 共享内存 」的方式来共享数据的, 前者 用于需要 共享单个值 , 后者 用于
共享多个值(数组),构造函数的 第一个元素 为 数据类型 , 第二个元素 为 值 。数据类型对照如表所示。
标记 | 数据类型 | 标记 | 数据类型 |
---|---|---|---|
'c' | ctypes.c_char | 'u' | ctypes.c_wchar |
'b' | ctypes.c_byte | 'B' | ctypes.c_ubyte |
'h' | ctypes.c_short | 'H' | ctypes.c_ushort |
'i' | ctypes.c_int | 'I' | ctypes.c_uint |
'l' | ctypes.c_long | 'L' | ctypes.c_ulong |
'f' | ctypes.c_float | 'd' | ctypes.c_double |
使用代码示例如下:
import multiprocessing as mp def do_something(num, arr): num.value += 1 for i in range(len(arr)): arr[i] = arr[i] * 2 if __name__ == '__main__': value = mp.Value('i', 1) array = mp.Array('i', range(5)) print("刚开始的值:", value.value, array[:]) # 创建进程1 p1 = mp.Process(target=do_something, args=(value, array)) p1.start() p1.join() print("进程1操作后的值:", value.value, array[:]) # 创建进程2 p2 = mp.Process(target=do_something, args=(value, array)) p2.start() p2.join() print("进程2操作后的值:", value.value, array[:]) 复制代码
运行结果如下:
刚开始的值: 1 [0, 1, 2, 3, 4] 进程1操作后的值: 2 [0, 2, 4, 6, 8] 进程2操作后的值: 3 [0, 4, 8, 12, 16] 复制代码
- 3.Manager
Python还为我们提供更加强大的数据共享类,支持更丰富的数据类型,比如Value、Array、dict、list、Lock、Semaphore等等,另外Manager还可以共享类的实例对象。有一点要注意: 进程间通信应该尽量避免使用共享数据的方式!
使用代码示例如下:
import multiprocessing as mp import os import time def do_something(dt): dt[os.getpid()] = int(time.time()) print(data_dict) if __name__ == '__main__': manager = mp.Manager() data_dict = manager.dict() for i in range(3): p=mp.Process(target=do_something,args=(data_dict,)) p.start() p.join() 复制代码
运行结果如下:
{5432: 1533200189} {5432: 1533200189, 5433: 1533200189} {5432: 1533200189, 5433: 1533200189, 5434: 1533200189} 复制代码
- 4.Pipe
管道, 简化版的Queue ,通过 Pipe()构造函数 可以创建一个 进程通信用的管道对象 , 默认双向 ,意味着使用管道只能同时开启两个进程!如果想设置 单向 ,可以添加参数「 duplex=False 」,双向即可发送也可接受,但是只允许前面的端口用于接收,后面的端口用于发送。管道对象发送和接收信息的函数依次为 send ()和 recv()。 使用代码示例如下 :
import multiprocessing as mp def p_1(p): p.send("你好啊!") print("P1-收到信息:", p.recv()) def p_2(p): print("P2-收到信息:", p.recv()) p.send("你也好啊!") if __name__ == '__main__': pipe = mp.Pipe() p1 = mp.Process(target=p_1, args=(pipe[0],)) p2 = mp.Process(target=p_2, args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join() 复制代码
运行结果如下:
P2-收到信息: 你好啊! P1-收到信息: 你也好啊! 复制代码
如果本文对你有所帮助,欢迎留言,点赞。