Python多线程与多进程浅析之二

Python 多线程 Step by Step

Python 在 CPU 密集运算的场景,多个线程并不能提高太多性能,而对于 I/O 阻塞的场景,可以使得运行效率获得几倍的提高。我们接下来会详细的分析一下。

我们先做一个可以用来测试的基准程序,这是一个比较无聊的计算程序,可以理解为是一个CPU 密集型的测试。当然你也可以换做找最大公约数、求质数或者读者自己的计算程序。

在写这部分内容的时候,我的代码是在 Jupyter 中执行的,这是一台 2012年秋天款式的 mac mini,2.3GHz Intel Core i7, 一个处理器4个核心,16 G 1600 MHz DDR3 内存。

测试时间,我们这里就简单的取一次值,对于 CPU 密集运算的情况来说,除非电脑有更加消耗资源的应用,一般差异不大,但是和其他比如磁盘、网络等相关的应用就不一定了。

# 计算一次 my_cal() 函数,获得耗用时间
>>> from time import time
>>> def my_cal(a):
...     j = 0
...     for i in range(a):
...         j = j + i
...     print(j)
...     return j

>>> start = time()
>>> my_cal(1000000)
>>> end = time()
>>> print('cost time {:f} s'.format(end - start))
499999500000 
cost time 0.106146 s

真实场景都要复杂得多,我们为了后面的例子,用一个 list 来提供数据源,使用上面的累加函数来计算 5 次。

>>> from time import time

>>> def my_cal(a):
...     j = 0
...     for i in range(a):
...         j = j + i
...     print(j)
...     return j

>>> start = time()
>>> list_01 = [1000000, 1500000, 2000000, 2500000, 3000000]

>>> for i in list_01:
...    my_cal(i)
    
>>> end = time()
>>> print('cost time {:f} s'.format(end - start))  
499999500000
1124999250000
1999999000000
3124998750000
4499998500000
cost time 0.906054 s

当然我们可以写的更加简洁一些,用 map() 函数是最合适在这样的场景了。所以读者如果要测试的话,可以用上面循环的方式,也可以用下面 map() 的方式,看具体情况。整体上来说推荐 Python 风格的写法,能用 map() 的尽量就不要用 for 循环了。

>>> from time import time

>>> def my_cal(a):
...     j = 0
...     for i in range(a):
...         j = j + i
...     print(j)
...     return j

>>> start = time()

>>> list_01 = [1000000, 1500000, 2000000, 2500000, 3000000]
>>> list_02 = list(map(my_cal, list_01))
        
>>> end = time()
>>> print('cost time {:f} s'.format(end - start))
499999500000
1124999250000
1999999000000
3124998750000
4499998500000
cost time 0.927783 s

我们先用简单的多线程的方式,来看看有没有提速的作用。

>>> from time import time
>>> import threading

>>> def my_cal(a):
...     j = 0
...     for i in range(a):
...         j = j + i
...     print(j)
...     return j

>>> start = time()
>>> list_01 = [1000000, 1500000, 2000000, 2500000, 3000000]
>>> Threads = []

>>> for i in list_01:
...     # 建立线程
...     thread = threading.Thread(target=my_cal,args=(i,))
...     thread.start()
...     # 建立线程列表
...     Threads.append(thread)

# 等待线程结束
>>> for thread in Threads:
...     thread.join()
       
>>> end = time()
>>> print('\ncost time {:f} s'.format(end - start))   
499999500000
1124999250000
1999999000000
3124998750000
4499998500000

cost time 1.074110 s

上面这样写,可以执行多个线程。但是如前面所言,因为实际上还是在不同的线程中轮流,对于 CPU 密集型的例子来说,多线程对性能提升是没有什么用处,还让管理线程占用多一点的资源。

注意,创建的 thread 只能执行一次 start() 方法,否则会报错: RuntimeError: threads can only be started once
thread 的 join() 方法阻塞当前线程,等待子线程执行完毕后自己再继续执行。如果没有 join() 的阻塞,我们会先看到 cost time 的计算,而实际上上面的线程还没有完成自己的任务。

多线程多 join 的情况下,依次执行各线程的join方法,前面一个结束了才能执行后面一个。

注意:thread 模块在 Python 3 中已被废弃,用 threading 模块代替。在 Python3 中不能再使用"thread" 模块。为了兼容性,Python3 将 thread 重命名为 "_thread"。

# 检查线程的状态
>>> from time import time
>>> import threading

>>> def my_cal(a):
...     j = 0
...     for i in range(a):
...         j = j + i
...     print(j)
...     return j

>>> thread = threading.Thread(target=my_cal,args=(i,))
# 创建后的线程的状态
>>> print(thread.isAlive)
>>> thread.start()
# 线程启动后的状态
>>> print(thread.isAlive)   
<bound method Thread.is_alive of <Thread(Thread-43, initial)>>
<bound method Thread.is_alive of <Thread(Thread-43, started 123145451360256)>>
4499998500000

多进程方式

对于 CPU 运算密集的场景,我们换做多进程的方式来看一下。

# 多进程方式
>>> from time import time
>>> from concurrent.futures import *

>>> def my_cal(a):
...     j = 0
...     for i in range(a):
...         j = j + i
...     print(j)
...     return j

>>> list_01 = [1000000, 2000000, 1500000, 2500000, 3000000]
>>> start = time()
>>> pool = ProcessPoolExecutor(max_workers=10)
>>> list_02 = list(pool.map(my_cal, list_01))
>>> end = time()

>>> print('cost time {:f} s'.format(end - start)) 
499999500000
1124999250000
1999999000000
3124998750000
4499998500000
cost time 0.374396 s

我们可以看到速度提升了60%以上。我觉得这是提高性能最好和最简单的方法之一。

设置多少个 worker,一般是等于 cpu 核心数或者乘以二,服务器 Gunicorn worker 的数量从经验的角度一般配置 2 * core + 1, core指的核心数。

修改一下程序,用 timeit 来进行测试。(注意,请在Jupyter中运行该程序)

# 多进程方式,测试 workers

from concurrent.futures import *

def my_cal(a):
    j = 0
    for i in range(a):
        j = j + i
    return j

def my_cal_main(workers):

    list_01 = [1000000, 2000000, 1500000, 2500000, 3000000]
    pool = ProcessPoolExecutor(workers)
    list_02 = list(pool.map(my_cal, list_01))

for i in range(2,11):
    print(i)
%timeit -n6 my_cal_main(i)

# 输出结果
2
6 loops, best of 3: 491 ms per loop
3
6 loops, best of 3: 443 ms per loop
4
6 loops, best of 3: 437 ms per loop
5
6 loops, best of 3: 352 ms per loop
6
6 loops, best of 3: 336 ms per loop
7
6 loops, best of 3: 341 ms per loop
8
6 loops, best of 3: 346 ms per loop
9
6 loops, best of 3: 346 ms per loop
10
6 loops, best of 3: 347 ms per loop

我们可以看到,worker 设为 6 的时候速度最快,再往上没有明显差别。这个还和应用场景有关,所以如果不想太复杂的化,设为 CPU 核心数比较适合。

下面的有三种方法可以获得 CPU 核心的数量:

# 获得 cpu 核心数量
>>> import multiprocessing
>>> pool = multiprocessing.Pool()
>>> print(pool._processes)
8
>>> print(multiprocessing.cpu_count())
8

>>> import psutil
>>> psutil.cpu_count()
8

摘自本人与同事所著《Python 机器学习实战》一书

上一篇:【重磅推荐】科技达人必须收听的微信号,值得收藏!


下一篇:高并发Web服务的演变——节约系统内存和CPU