Python 多线程 Step by Step
Python 在 CPU 密集运算的场景,多个线程并不能提高太多性能,而对于 I/O 阻塞的场景,可以使得运行效率获得几倍的提高。我们接下来会详细的分析一下。
我们先做一个可以用来测试的基准程序,这是一个比较无聊的计算程序,可以理解为是一个CPU 密集型的测试。当然你也可以换做找最大公约数、求质数或者读者自己的计算程序。
在写这部分内容的时候,我的代码是在 Jupyter 中执行的,这是一台 2012年秋天款式的 mac mini,2.3GHz Intel Core i7, 一个处理器4个核心,16 G 1600 MHz DDR3 内存。
测试时间,我们这里就简单的取一次值,对于 CPU 密集运算的情况来说,除非电脑有更加消耗资源的应用,一般差异不大,但是和其他比如磁盘、网络等相关的应用就不一定了。
# 计算一次 my_cal() 函数,获得耗用时间
>>> from time import time
>>> def my_cal(a):
... j = 0
... for i in range(a):
... j = j + i
... print(j)
... return j
>>> start = time()
>>> my_cal(1000000)
>>> end = time()
>>> print('cost time {:f} s'.format(end - start))
499999500000
cost time 0.106146 s
真实场景都要复杂得多,我们为了后面的例子,用一个 list 来提供数据源,使用上面的累加函数来计算 5 次。
>>> from time import time
>>> def my_cal(a):
... j = 0
... for i in range(a):
... j = j + i
... print(j)
... return j
>>> start = time()
>>> list_01 = [1000000, 1500000, 2000000, 2500000, 3000000]
>>> for i in list_01:
... my_cal(i)
>>> end = time()
>>> print('cost time {:f} s'.format(end - start))
499999500000
1124999250000
1999999000000
3124998750000
4499998500000
cost time 0.906054 s
当然我们可以写的更加简洁一些,用 map() 函数是最合适在这样的场景了。所以读者如果要测试的话,可以用上面循环的方式,也可以用下面 map() 的方式,看具体情况。整体上来说推荐 Python 风格的写法,能用 map() 的尽量就不要用 for 循环了。
>>> from time import time
>>> def my_cal(a):
... j = 0
... for i in range(a):
... j = j + i
... print(j)
... return j
>>> start = time()
>>> list_01 = [1000000, 1500000, 2000000, 2500000, 3000000]
>>> list_02 = list(map(my_cal, list_01))
>>> end = time()
>>> print('cost time {:f} s'.format(end - start))
499999500000
1124999250000
1999999000000
3124998750000
4499998500000
cost time 0.927783 s
我们先用简单的多线程的方式,来看看有没有提速的作用。
>>> from time import time
>>> import threading
>>> def my_cal(a):
... j = 0
... for i in range(a):
... j = j + i
... print(j)
... return j
>>> start = time()
>>> list_01 = [1000000, 1500000, 2000000, 2500000, 3000000]
>>> Threads = []
>>> for i in list_01:
... # 建立线程
... thread = threading.Thread(target=my_cal,args=(i,))
... thread.start()
... # 建立线程列表
... Threads.append(thread)
# 等待线程结束
>>> for thread in Threads:
... thread.join()
>>> end = time()
>>> print('\ncost time {:f} s'.format(end - start))
499999500000
1124999250000
1999999000000
3124998750000
4499998500000
cost time 1.074110 s
上面这样写,可以执行多个线程。但是如前面所言,因为实际上还是在不同的线程中轮流,对于 CPU 密集型的例子来说,多线程对性能提升是没有什么用处,还让管理线程占用多一点的资源。
注意,创建的 thread 只能执行一次 start() 方法,否则会报错: RuntimeError: threads can only be started once
thread 的 join() 方法阻塞当前线程,等待子线程执行完毕后自己再继续执行。如果没有 join() 的阻塞,我们会先看到 cost time 的计算,而实际上上面的线程还没有完成自己的任务。
多线程多 join 的情况下,依次执行各线程的join方法,前面一个结束了才能执行后面一个。
注意:thread 模块在 Python 3 中已被废弃,用 threading 模块代替。在 Python3 中不能再使用"thread" 模块。为了兼容性,Python3 将 thread 重命名为 "_thread"。
# 检查线程的状态
>>> from time import time
>>> import threading
>>> def my_cal(a):
... j = 0
... for i in range(a):
... j = j + i
... print(j)
... return j
>>> thread = threading.Thread(target=my_cal,args=(i,))
# 创建后的线程的状态
>>> print(thread.isAlive)
>>> thread.start()
# 线程启动后的状态
>>> print(thread.isAlive)
<bound method Thread.is_alive of <Thread(Thread-43, initial)>>
<bound method Thread.is_alive of <Thread(Thread-43, started 123145451360256)>>
4499998500000
多进程方式
对于 CPU 运算密集的场景,我们换做多进程的方式来看一下。
# 多进程方式
>>> from time import time
>>> from concurrent.futures import *
>>> def my_cal(a):
... j = 0
... for i in range(a):
... j = j + i
... print(j)
... return j
>>> list_01 = [1000000, 2000000, 1500000, 2500000, 3000000]
>>> start = time()
>>> pool = ProcessPoolExecutor(max_workers=10)
>>> list_02 = list(pool.map(my_cal, list_01))
>>> end = time()
>>> print('cost time {:f} s'.format(end - start))
499999500000
1124999250000
1999999000000
3124998750000
4499998500000
cost time 0.374396 s
我们可以看到速度提升了60%以上。我觉得这是提高性能最好和最简单的方法之一。
设置多少个 worker,一般是等于 cpu 核心数或者乘以二,服务器 Gunicorn worker 的数量从经验的角度一般配置 2 * core + 1, core指的核心数。
修改一下程序,用 timeit 来进行测试。(注意,请在Jupyter中运行该程序)
# 多进程方式,测试 workers
from concurrent.futures import *
def my_cal(a):
j = 0
for i in range(a):
j = j + i
return j
def my_cal_main(workers):
list_01 = [1000000, 2000000, 1500000, 2500000, 3000000]
pool = ProcessPoolExecutor(workers)
list_02 = list(pool.map(my_cal, list_01))
for i in range(2,11):
print(i)
%timeit -n6 my_cal_main(i)
# 输出结果
2
6 loops, best of 3: 491 ms per loop
3
6 loops, best of 3: 443 ms per loop
4
6 loops, best of 3: 437 ms per loop
5
6 loops, best of 3: 352 ms per loop
6
6 loops, best of 3: 336 ms per loop
7
6 loops, best of 3: 341 ms per loop
8
6 loops, best of 3: 346 ms per loop
9
6 loops, best of 3: 346 ms per loop
10
6 loops, best of 3: 347 ms per loop
我们可以看到,worker 设为 6 的时候速度最快,再往上没有明显差别。这个还和应用场景有关,所以如果不想太复杂的化,设为 CPU 核心数比较适合。
下面的有三种方法可以获得 CPU 核心的数量:
# 获得 cpu 核心数量
>>> import multiprocessing
>>> pool = multiprocessing.Pool()
>>> print(pool._processes)
8
>>> print(multiprocessing.cpu_count())
8
>>> import psutil
>>> psutil.cpu_count()
8
摘自本人与同事所著《Python 机器学习实战》一书