Python多进程教程

Python多进程教程


Python2.6版本中新添了multiprocessing模块。它最初由Jesse Noller和Richard Oudkerk定义在PEP 371中。就像你能通过threading模块衍生线程一样,multiprocessing 模块允许你衍生进程。这里用到的思想:因为你现在能衍生进程,所以你能够避免使用全局解释器锁(GIL),并且充分利用机器的多个处理器。

多进程包也包含一些根本不在threading 模块中的API。比如:有一个灵活的Pool类能让你在多个输入下并行化地执行函数。我们将在后面的小节讲解Pool类。我们将以multiprocessing模块的Process类开始讲解。

开始学习multiprocessing模块

Process这个类和threading模块中的Thread类很像。让我们创建一系列调用相同函数的进程,并且看看它是如何工作的。


  1. import os 
  2.  
  3. from multiprocessing import Process 
  4.  
  5. def doubler(number): 
  6.  
  7.     ""
  8.  
  9.     A doubling function that can be used by a process 
  10.  
  11.     ""
  12.  
  13.     result = number * 2 
  14.  
  15.     proc = os.getpid() 
  16.  
  17.     print('{0} doubled to {1} by process id: {2}'.format( 
  18.  
  19.         number, result, proc)) 
  20.  
  21. if __name__ == '__main__'
  22.  
  23.     numbers = [5, 10, 15, 20, 25] 
  24.  
  25.     procs = [] 
  26.  
  27.     for index, number in enumerate(numbers): 
  28.  
  29.         proc = Process(target=doubler, args=(number,)) 
  30.  
  31.         procs.append(proc) 
  32.  
  33.         proc.start() 
  34.  
  35.     for proc in procs: 
  36.  
  37.         proc.join() 

对于上面的例子,我们导入Process类、创建一个叫doubler的函数。在函数中,我们将传入的数字乘上2。我们也用Python的os模块来获取当前进程的ID(pid)。这个ID将告诉我们哪个进程正在调用doubler函数。然后,在下面的代码块中,我们实例化了一系列的Process类并且启动它们。最后一个循环只是调用每个进程的join()方法,该方法告诉Python等待进程直到它结束。如果你需要结束一个进程,你可以调用它的terminate()方法。

当你运行上面的代码,你应该看到和下面类似的输出结果:


  1. 5 doubled to 10 by process id: 10468 
  2.  
  3. 10 doubled to 20 by process id: 10469 
  4.  
  5. 15 doubled to 30 by process id: 10470 
  6.  
  7. 20 doubled to 40 by process id: 10471 
  8.  
  9. 25 doubled to 50 by process id: 10472 

有时候,你最好给你的进程取一个易于理解的名字 。幸运的是,Process类确实允许你访问同样的进程。让我们来看看如下例子:


  1. import os 
  2.  
  3. from multiprocessing import Process, current_process 
  4.  
  5. def doubler(number): 
  6.  
  7.     ""
  8.  
  9.     A doubling function that can be used by a process 
  10.  
  11.     ""
  12.  
  13.     result = number * 2 
  14.  
  15.     proc_name = current_process().name 
  16.  
  17.     print('{0} doubled to {1} by: {2}'.format( 
  18.  
  19.         number, result, proc_name)) 
  20.  
  21. if __name__ == '__main__'
  22.  
  23.     numbers = [5, 10, 15, 20, 25] 
  24.  
  25.     procs = [] 
  26.  
  27.     proc = Process(target=doubler, args=(5,)) 
  28.  
  29.     for index, number in enumerate(numbers): 
  30.  
  31.         proc = Process(target=doubler, args=(number,)) 
  32.  
  33.         procs.append(proc) 
  34.  
  35.         proc.start() 
  36.  
  37.     proc = Process(target=doubler, name='Test', args=(2,)) 
  38.  
  39.     proc.start() 
  40.  
  41.     procs.append(proc) 
  42.  
  43.     for proc in procs: 
  44.  
  45.         proc.join() 

这一次,我们多导入了current_process。current_process基本上和threading模块的current_thread是类似的东西。我们用它来获取正在调用我们的函数的线程的名字。你将注意到我们没有给前面的5个进程设置名字。然后我们将第6个进程的名字设置为“Test”。

让我们看看我们将得到什么样的输出结果:


  1. 5 doubled to 10 by: Process-2 
  2.  
  3. 10 doubled to 20 by: Process-3 
  4.  
  5. 15 doubled to 30 by: Process-4 
  6.  
  7. 20 doubled to 40 by: Process-5 
  8.  
  9. 25 doubled to 50 by: Process-6 
  10.  
  11. 2 doubled to 4 by: Test 

输出结果说明:默认情况下,multiprocessing模块给每个进程分配了一个编号,而该编号被用来组成进程的名字的一部分。当然,如果我们给定了名字的话,并不会有编号被添加到名字中。

multiprocessing模块支持锁,它和threading模块做的方式一样。你需要做的只是导入Lock,获取它,做一些事,释放它。


  1. from multiprocessing import Process, Lock 
  2.  
  3. def printer(item, lock): 
  4.  
  5.     ""
  6.  
  7.     Prints out the item that was passed in 
  8.  
  9.     ""
  10.  
  11.     lock.acquire() 
  12.  
  13.     try: 
  14.  
  15.         print(item) 
  16.  
  17.     finally: 
  18.  
  19.         lock.release() 
  20.  
  21. if __name__ == '__main__'
  22.  
  23.     lock = Lock() 
  24.  
  25.     items = ['tango''foxtrot', 10] 
  26.  
  27.     for item in items: 
  28.  
  29.         p = Process(target=printer, args=(item, lock)) 
  30.  
  31.         p.start() 

我们在这里创建了一个简单的用于打印函数,你输入什么,它就输出什么。为了避免线程之间互相阻塞,我们使用Lock对象。代码循环列表中的三个项并为它们各自都创建一个进程。每一个进程都将调用我们的函数,并且每次遍历到的那一项作为参数传入函数。因为我们现在使用了锁,所以队列中下一个进程将一直阻塞,直到之前的进程释放锁。

日志

为进程创建日志与为线程创建日志有一些不同。它们存在不同是因为Python的logging包不使用共享锁的进程,因此有可能以来自不同进程的信息作为结束的标志。让我们试着给前面的例子添加基本的日志。代码如下:


  1. import logging 
  2.  
  3. import multiprocessing 
  4.  
  5. from multiprocessing import Process, Lock 
  6.  
  7. def printer(item, lock): 
  8.  
  9.     ""
  10.  
  11.     Prints out the item that was passed in 
  12.  
  13.     ""
  14.  
  15.     lock.acquire() 
  16.  
  17.     try: 
  18.  
  19.         print(item) 
  20.  
  21.     finally: 
  22.  
  23.         lock.release() 
  24.  
  25. if __name__ == '__main__'
  26.  
  27.     lock = Lock() 
  28.  
  29.     items = ['tango''foxtrot', 10] 
  30.  
  31.     multiprocessing.log_to_stderr() 
  32.  
  33.     logger = multiprocessing.get_logger() 
  34.  
  35.     logger.setLevel(logging.INFO) 
  36.  
  37.     for item in items: 
  38.  
  39.         p = Process(target=printer, args=(item, lock)) 
  40.  
  41.         p.start() 

最简单的添加日志的方法通过推送它到stderr实现。我们能通过调用thelog_to_stderr() 函数来实现该方法。然后我们调用get_logger 函数获得一个logger实例,并将它的日志等级设为INFO。之后的代码是相同的。需要提示下这里我并没有调用join()方法。取而代之的:当它退出,父线程将自动调用join()方法。

当你这么做了,你应该得到类似下面的输出:


  1. [INFO/Process-1] child process calling self.run() 
  2.  
  3. tango 
  4.  
  5. [INFO/Process-1] process shutting down 
  6.  
  7. [INFO/Process-1] process exiting with exitcode 0 
  8.  
  9. [INFO/Process-2] child process calling self.run() 
  10.  
  11. [INFO/MainProcess] process shutting down 
  12.  
  13. foxtrot 
  14.  
  15. [INFO/Process-2] process shutting down 
  16.  
  17. [INFO/Process-3] child process calling self.run() 
  18.  
  19. [INFO/Process-2] process exiting with exitcode 0 
  20.  
  21. 10 
  22.  
  23. [INFO/MainProcess] calling join() for process Process-3 
  24.  
  25. [INFO/Process-3] process shutting down 
  26.  
  27. [INFO/Process-3] process exiting with exitcode 0 
  28.  
  29. [INFO/MainProcess] calling join() for process Process-2 

现在如果你想要保存日志到硬盘中,那么这件事就显得有些棘手。你能在Python的logging Cookbook阅读一些有关那类话题。

Pool类

Pool类被用来代表一个工作进程池。它有让你将任务转移到工作进程的方法。让我们看下面一个非常简单的例子。


  1. from multiprocessing import Pool 
  2.  
  3. def doubler(number): 
  4.  
  5.     return number * 2 
  6.  
  7. if __name__ == '__main__'
  8.  
  9.     numbers = [5, 10, 20] 
  10.  
  11.     pool = Pool(processes=3) 
  12.  
  13.     print(pool.map(doubler, numbers)) 

基本上执行上述代码之后,一个Pool的实例被创建,并且该实例创建了3个工作进程。然后我们使用map 方法将一个函数和一个可迭代对象映射到每个进程。最后我们打印出这个例子的结果:[10, 20, 40]。

你也能通过apply_async方法获得池中进程的运行结果:


  1. from multiprocessing import Pool 
  2.  
  3. def doubler(number): 
  4.  
  5.     return number * 2 
  6.  
  7. if __name__ == '__main__'
  8.  
  9.     pool = Pool(processes=3) 
  10.  
  11.     result = pool.apply_async(doubler, (25,)) 
  12.  
  13.     print(result.get(timeout=1)) 

我们上面做的事实际上就是请求进程的运行结果。那就是get函数的用途。它尝试去获取我们的结果。你能够注意到我们设置了timeout,这是为了预防我们调用的函数发生异常的情况。毕竟我们不想要它被无限期地阻塞。

进程通信

当遇到进程间通信的情况,multiprocessing 模块提供了两个主要的方法:Queues 和 Pipes。Queue 实现上既是线程安全的也是进程安全的。让我们看一个相当简单的并且基于 Queue的例子。代码来自于我的文章(threading articles)。


  1. from multiprocessing import Process, Queue 
  2.  
  3. sentinel = -1 
  4.  
  5. def creator(data, q): 
  6.  
  7.     ""
  8.  
  9.     Creates data to be consumed and waits for the consumer 
  10.  
  11.     to finish processing 
  12.  
  13.     ""
  14.  
  15.     print('Creating data and putting it on the queue'
  16.  
  17.     for item in data: 
  18.  
  19.         q.put(item) 
  20.  
  21. def my_consumer(q): 
  22.  
  23.     ""
  24.  
  25.     Consumes some data and works on it 
  26.  
  27.     In this caseall it does is double the input 
  28.  
  29.     ""
  30.  
  31.     while True
  32.  
  33.         data = q.get() 
  34.  
  35.         print('data found to be processed: {}'.format(data)) 
  36.  
  37.         processed = data * 2 
  38.  
  39.         print(processed) 
  40.  
  41.         if data is sentinel: 
  42.  
  43.             break 
  44.  
  45. if __name__ == '__main__'
  46.  
  47.     q = Queue() 
  48.  
  49.     data = [5, 10, 13, -1] 
  50.  
  51.     process_one = Process(target=creator, args=(data, q)) 
  52.  
  53.     process_two = Process(target=my_consumer, args=(q,)) 
  54.  
  55.     process_one.start() 
  56.  
  57.     process_two.start() 
  58.  
  59.     q.close() 
  60.  
  61.     q.join_thread() 
  62.  
  63.     process_one.join() 
  64.  
  65.     process_two.join() 

在这里我们只需要导入Queue和Process。Queue用来创建数据和添加数据到队列中,Process用来消耗数据并执行它。通过使用Queue的put()和get()方法,我们就能添加数据到Queue、从Queue获取数据。代码的最后一块只是创建了Queue 对象以及两个Process对象,并且运行它们。你能注意到我们在进程对象上调用join()方法,而不是在Queue本身上调用。

总结

我们这里有大量的资料。你已经学习如何使用multiprocessing模块指定不变的函数、使用Queues在进程间通信、给进程命名等很多事。在Python文档中也有很多本文没有接触到的知识点,因此也务必深入了解下文档。与此同时,你现在知道如何用Python利用你电脑所有的处理能力了!


作者:佚名

来源:51CTO

上一篇:SAP Commerce Cloud OAuth 实现介绍


下一篇:《Python爬虫开发与项目实战》——1.4 进程和线程