本文最初发布于 FLOYDHUB 博客,经原作者 Sumit Ghosh 授权由 InfoQ 中文站翻译并分享。
导读:线程和进程都是现在计算机领域比较时髦的用语。进程 (Process) 是计算机中已运行程序的实体。进程本身不会运行,是线程的容器。程序本身只是指令的集合,进程才是程序(那些指令) 的真正运行。若干进程有可能与同一个程序相关系,且每个进程皆可以同步(循序) 或不同步(平行) 的方式独立运行。进程为现今分时系统的基本运作单位。线程(thread),操作系统技术中的术语,是操作系统能够进行运算调度的最小单位。它被包含在进程之中,一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。今天,我们翻译并分享 Sumit Ghosh 撰写的关于 Python 中的多进程与线程的方方面面,这些内容对于每个有志于成为数据科学家的从业者都是应知必会的内容。
每个数据科学项目迟早都会面临一个不可避免的挑战:速度。使用更大的数据集会导致处理速度变慢,因此,最终不得不考虑优化算法的运行时间。正如大多数人所知道的,并行化就是这种优化的必要步骤。Python 为并行化提供了两个内置库:multiprocessing(多进程)和 threading(线程)。在本文中,我们将探讨数据科学家如何在这两者之间进行选择,以及在选择时应记住哪些因素。
并行计算与数据科学
众所周知,数据科学是一门处理大量数据,并从中提取有用见解的科学。通常情况下,我们在数据上执行的操作很容易实现并行化,这意味着不同的处理可以在数据上一次运行一个操作,然后在最后将结果组合起来以得到完整的结果。
为了更好地理解并行性,让我们考虑一个真实世界的类比。假设你需要打扫家里的三个房间,你可以一个人包揽所有的事情:一个接一个地打扫房间,或者你也可以叫来两个人帮助你,你们每个人只打扫一个房间。在后一种方法中,每个人都并行地处理整个任务的一部分,从而减少完成了任务所需的总时间,这就是并行性。
在 Python 中,可以通过两种不同的方式实现并行处理:多进程和线程。
多进程和线程:理论
从根本上来说,多进程和线程是实现并行计算的两种方法,分别使用进程和线程作为处理代理。要了解这些方法的工作原理,我们就必须弄清楚什么是进程,什么是线程。
进程
进程是正在执行的计算机程序的实例。每个进程都有自己的内存空间,用于存储正在运行的指令,以及需要存储和访问用来执行的任何数据。
线程
线程是进程的组件,可以并行运行。一个进程可以有多个线程,它们共享相同的内存空间,即父进程的内存空间。这意味着要执行的代码以及程序中声明的所有变量,将由所有线程共享。
进程和线程(图:笔者与 Cburnett)
例如,让我们考虑一下你的计算机上正在运行的程序。你可能正在浏览器中阅读本文,浏览器可能打开了多个标签页。你还可能同时通过 Spotify 桌面应用收听音乐。浏览器和 Spotify 应用程序是不同的进程,它们中的每一个都可以使用多个进程或线程来实现并行性。浏览器中的不同标签页可能在不同的线程中运行。Spotify 可以在一个线程中播放音乐,在另一个线程中从互联网下载音乐,并使用第三个线程来显示 GUI。这就叫做多线程(multithreading)。多进程(多即个进程)也可以做到这一点。事实上,大多数像 Chrome 和 Firefox 这样的现代浏览器使用的是多进程而不是多线程来处理多个标签。
技术细节
- 一个进程的所有线程都位于同一个内存空间中,而进程有各自独立的内存空间。
- 与进程相比,线程更轻量级,并且开销更低。生成进程比生成线程要慢一些。
- 在线程之间共享对象更容易,因为它们共享的是相同的内存空间。为了在进程之间实现同样的效果,我们必须使用一些类似 IPC(inter-process communication,进程间通信)模型,通常是由操作系统提供的。
并行计算的陷阱
在程序中引入并行性并不总是一个正和博弈;有一些需要注意的陷阱。最重要的陷阱如下:
- 竞态条件: 正如我们已经讨论过的,线程具有共享的内存空间,因此它们可以访问共享变量。当多个线程试图通过同时更改同一个变量时,就会出现竞态条件。线程调度程序可以在线程之间任意切换,因此,我们无法知晓线程将试图更改数据的顺序。这可能导致两个线程中的任何一个出现不正确的行为,特别是如果线程决定基于变量的值执行某些操作时。为了防止这种情况的发生,可以在修改变量的代码段周围放置互斥锁,这样,一次只能有一个线程可以写入变量。
- 饥饿: 当线程在更长的时间内被拒绝访问特定资源时,就会发生饥饿,因此,整个程序速度就会变慢。这可能是涉及不良的线程调度算法的意外副作用。
- 死锁: 过度使用互斥锁也有一个缺点,它可能会在程序中引入死锁。死锁是一个线程等待另一个线程将锁释放,但另一个线程需要一个资源来完成第一个线程所持有的锁。这样,两个线程都会停止,程序也随之停止。死锁可以被看作是饥饿的一种极端情况。要避免这种情况,我们必须小心不要引入太多相互依赖的锁。
- 活锁: 活锁是指线程在循环中继续运行,但没有任何进展。这也是由于涉及不良和互斥锁的使用不当造成的。
Python 中的多进程和线程
全局解释锁
当涉及到 Python 时,有一些奇怪的地方需要记住。我们知道,线程共享相同的内存空间,因此必须采取特殊的预防措施,以便两个线程不会写入相同的内存位置。CPython 解释器使用一种名为 GIL
的机制或全局解释锁来处理这个问题。
摘自 Python 的官方 wiki :
在 CPython 中, 全局解释锁 (Global Interpreter Lock, GIL )是一个互斥锁,用来保护对 Python 对象的访问,防止多个线程同时执行 Python 字节码。这种锁是必要的,主要是因为 CPython 的内存管理不是线程安全的。
请查看这个幻灯片来了解 Python GIL
的详细信息: Understanding the Python GIL
GIL
完成了它的工作,但是也付出了代价。它有效地序列化了解释器级别的指令。它的工作原理如下:任何线程要执行任何函数,都必须获取全局锁。一次只能有一个线程可以获得全局锁,这意味着 解释器最终会串行地运行指令 。这种设计使内存管理做到线程安全,但结果是,它根本不能利用多个 CPU 内核。在单核 CPU 中(这正是设计师在开发 CPython 时所考虑的),这并不是什么大问题。但是,如果使用多核 CPU 的话,那么这个全局锁最终将会成为一个瓶颈了。
如果你的程序在其他地方存在更严重的瓶颈,例如在网络、IO、或者用户交互方面,那么全局锁这个瓶颈就变得无关紧要了。在这些情况下,线程化是一种完全有效的并行化方法。但对于计算密集型(CPU bound)的程序,线程化最终会使程序变慢。让我们通过一些用例来探讨这个问题。
线程用例
GUI 程序始终使用线程来使应用程序作出响应。例如,在文本编辑程序中,一个线程负责记录用户输入,另一个线程负责显示文本,第三个线程负责拼写检查,等等。在这里,程序必须等待用户交互,这是最大的瓶颈。使用多进程并不会使程序变得更快。
线程处理的另一个用例是 IO 密集型(IO bound)或网络密集型的程序,比如 Web Scraper 。在这种情况下,多个线程可以负责并行抓取多个 Web 网页。线程必须从互联网上下载网页,这将是最大的瓶颈,因此线程对于这种情况来说是一个完美的解决方案。网络密集型的 Web 服务器的工作方式类似:对于它们这种情况,多进程并不比线程有任何优势。另一个相关的例子是 TensorFlow ,它使用线程池(thread pool)来并行地转换数据。
多进程的用例
在程序是计算密集型的,且不需要进行任何 IO 或用户交互的情况下,那么多进程就比线程处理更为出色。例如,任何只处理数字的程序都将从多进程中获得巨大的加速;事实上,线程化处理可能会降低它的运行速度。一个有趣的实际例子是 Pytorch Dataloader ,它使用多个子进程将数据加载到 GPU 中。
在 Python 中的并行化
Python 为并行化方法提供了两个同名的库: multiprocessing
和 threading
。尽管它们之间存在根本的不同,但这两个库提供了非常相似的 API(从 Python 3.7 开始)。让我们看看它们的实际应用。
复制代码
importthreading importrandom from functoolsimportreduce deffunc(number): random_list = random.sample(range(1000000), number) returnreduce(lambda x, y: x*y, random_list) number =50000 thread1 = threading.Thread(target=func,args=(number,)) thread2 = threading.Thread(target=func,args=(number,)) thread1.start() thread2.start() thread1.join() thread2.join()
你可以看到,我创建一个函数 func
,它创建了一个随机数列表,然后按顺序将其中的所有元素相乘。如果项目数量足够大,比如 5 万或 10 万,这可能是一个相当繁重的过程。
然后,我创建了两个线程,它们将执行相同的函数。线程对象有一个异步启动线程的 start
方法。如果我们想等待它们终止并返回,就必须调用 join
方法,这就是我们这段代码所做的事情。
正如你所见,在后台将一个新线程转化为一个任务的 API 非常简单。很棒的是,用于多进程的 API 也几乎完全相同;让我们来看一下。
复制代码
importmultiprocessing importrandom from functoolsimportreduce deffunc(number): random_list = random.sample(range(1000000), number) returnreduce(lambda x, y: x*y, random_list) number =50000 process1 = multiprocessing.Process(target=func,args=(number,)) process2 = multiprocessing.Process(target=func,args=(number,)) process1.start() process2.start() process1.join() process2.join()
代码就是这样的,只需将 multiprocessing.Process
与 threading.Thread
进行交换。 你使用多进程实现了完全相同的程序。
很显然,你可以用它做更多的事情,但这已不在本文的范畴之内,因此我们将不再赘述。如果有兴趣了解更多相关信息,请查看 threading — Thread-based parallelism 。
基准
现在我们已经了解了实现并行化的代码是什么样子的,让我们回到性能问题上来。正如我们之前所指出的,线程处理不适合计算密集型任务。在这种情况下,它最终会成为瓶颈。我们可以使用一些简单的基准来验证这一点。
首先,让我们看看上面所展示的代码示例中线程和多进程的比较。请记住,此任务不涉及任何类型的 IO,因此它是纯计算密集型的任务。
让我们来看看 IO 密集型的任务的类似基准测试。例如,下面的函数:
复制代码
import requests def func(number): url ='http://example.com/' foriinrange(number): response = requests.get(url) withopen('example.com.txt','w')asoutput: output.write(response.text)
这个函数的作用就是只获取一个网页,并将其保存到本地文件中,如此循环多次。虽然没有什么用,但是很直接,因此非常适合用来演示。让我们看一下基准测试。
从这两张图表中可以注意到以下几点:
- 在这两种情况下,单个进程比单个线程花费更多的执行时间。显然,进程的开销比线程更大。
- 对于计算密集型的任务,多个进程的性能要比多个线程的性能要好得多。然而,当我们使用 8x 并行化时,这种差异就变得不那么明显了。由于我的笔记本的 CPU 是四核的,因此最多可以有四个进程有效地使用多个内核。因此,当我使用更多进程时,它就不能很好地进行扩展。但是,它的性能仍然比线程要好很多,因为线程根本就不能利用多核。
- 对于 IO 密集型的任务,那么 CPU 就不是瓶颈了。因此,GIL 的常见限制在这里并不适用,而多进程也没有什么优势。不仅如此,线程的轻量级开销实际上使它们比多进程更快,而且线程的性能始终优于多进程。
区别、优点和缺点
join
从所有这些讨论中,我们可以得出以下结论:
- 线程应该用于涉及 IO 或用户交互的程序。
- 多进程应该用于计算密集型程序。
站在数据科学家的角度来看
典型的数据处理管道可以分为以下几个步骤:
- 读取㽜数据并存储到主存储器或 GPU 中。
- 使用 CPU 或 GPU 进行计算。
- 将挖掘出的信息存储在数据库或磁盘中。
让我们探索一下如何在这些任务中引入并行性,以便加快它们的运行速度。
步骤 1 涉及从磁盘读取数据,因此显然磁盘 IO 将成为这一步骤的瓶颈。正如我们已经讨论过的,线程是并行化这种操作的最佳选择。类似地,步骤 3 也是引入线程的理想候选步骤。
但是,步骤 2 包括了涉及 CPU 或 GPU 的计算。如果它是一个基于 CPU 的任务,那么使用线程就没有用;相反,我们必须进行多进程。只有这样,我们才能充分利用 CPU 的多核并实现并行性。如果它是基于 GPU 的任务,由于 GPU 已经在硬件级别上实现了大规模并行化架构,使用正确的接口(库和驱动程序)与 GPU 交互应该会解决其余的问题。
现在你可能会想,“恐怕我的数据管道看起来有点不同啊;我有一些任务并不完全适合这个通用框架啊。”不过,你应该能够观察到此处用来决定线程和多进程之间的关系。你应该考虑的因素包括:
- 你的任务是否具有任何形式的 IO?
- IO 是否为程序的瓶颈?
- 你的任务是否依赖于 CPU 的大量计算?
考虑到这些因素,再加上上面提到的要点,你应该能够作出自己的决定了。另外,请记住,你不必在整个程序中,使用单一形式的并行化。你应该为程序的不同部分使用其中一种或另一种形式的并行化,以适合该特定部分的为准。
现在,我们来看一下数据科学家可能面临的两个示例场景,以及如何使用并行计算来加速它们。
场景:下载电子邮件
假设你想分析你自己的创业公司收件箱里的所有电子邮件,并了解趋势:谁是发送频率最高的发件人,在电子邮件中出现的最常见的关键词是什么,一周中的哪一天或者一天中的哪个时段收到的电子邮件最多,等等。当然,这个项目的第一步是将电子邮件下载到你的电脑上。
首先,让我们按顺序执行,不使用任何并行化。下面是要使用的代码,它应该很容易理解。有一个函数 download_emails
,它将电子邮件的 ID 列表作为输入,并按顺序下载它们。这会将此函数与一次 100 封电子邮件列表的 ID 一起调用。
复制代码
import imaplib importtime IMAP_SERVER ='imap.gmail.com' USERNAME ='username@gmail.com' PASSWORD ='password' def download_emails(ids): client = imaplib.IMAP4_SSL(IMAP_SERVER) client.login(USERNAME, PASSWORD) client.select() foriinids: print(f'Downloading mail id: {i.decode()}') _, data = client.fetch(i,'(RFC822)') withopen(f'emails/{i.decode()}.eml','wb')asf: f.write(data0) client.close() print(f'Downloaded {len(ids)} mails!') start=time.time() client = imaplib.IMAP4_SSL(IMAP_SERVER) client.login(USERNAME, PASSWORD) client.select() _, ids = client.search(None,'ALL') ids = ids[0].split() ids = ids[:100] client.close() download_emails(ids) print('Time:',time.time() -start)
Time taken :: 35.65300488471985
seconds.
现在,让我们在这个任务中引入一些并行化能力,以加快速度。在开始编写代码之前,我们必须在线程和多进程之间作出决定。正如你到目前为止所了解的那样,当涉及到 IO 为瓶颈的任务时,线程就是最佳选择。手边的任务显然属于这一类,因为它是通过互联网访问 IMAP 服务器。因此我们将使用 threading
(线程)。
我们将要使用的大部分代码与我们在顺序情况下使用的代码是相同的。唯一的区别是,我们将 100 封电子邮件的 ID 列表拆分为 10 个较小的块,每一块包含 10 个 ID,然后创建 10 个线程并使用不同的块调用函数 download_emails
。我使用 Python 标准库中的 concurrent.futures.ThreadPoolExecutor
类进行线程化处理。
复制代码
importimaplib importtime fromconcurrent.futuresimportThreadPoolExecutor IMAP_SERVER ='imap.gmail.com' USERNAME ='username@gmail.com' PASSWORD ='password' defdownload_emails(ids): client = imaplib.IMAP4_SSL(IMAP_SERVER) client.login(USERNAME, PASSWORD) client.select() foriinids: print(f'Downloading mail id:{i.decode()}') _, data = client.fetch(i,'(RFC822)') withopen(f'emails/{i.decode()}.eml','wb')asf: f.write(data0) client.close() print(f'Downloaded{len(ids)}mails!') start = time.time() client = imaplib.IMAP4_SSL(IMAP_SERVER) client.login(USERNAME, PASSWORD) client.select() _, ids = client.search(None,'ALL') ids = ids[0].split() ids = ids[:100] client.close() number_of_chunks =10 chunk_size =10 executor = ThreadPoolExecutor(max_workers=number_of_chunks) futures = [] foriinrange(number_of_chunks): chunk = ids[i*chunk_size:(i+1)*chunk_size] futures.append(executor.submit(download_emails, chunk)) forfutureinconcurrent.futures.as_completed(futures): pass print('Time:', time.time() - start)
Time taken :: 9.841094255447388
seconds.
正如你所看到的,线程化大大加快了执行速度。
场景:使用 Scikit-Learn 进行分类
假设你有一个分类问题,想为此使用随机森林(random forest)分类器。因为它是一种标准的、众所周知的机器学习方法,所以我们不打算“ 重新发明* ”,只使用 sklearn.ensemble.RandomForestClassifier
。
下面的代码段用于演示目的。我使用辅助函数 sklearn.datasets.make_classification
创建了一个分类数据集,然后在此基础上训练了一个 RandomForestClassifier
。此外,我正在对代码中做核心工作的部分进行计时,以对模型进行拟合。
复制代码
fromsklearn.ensembleimportRandomForestClassifier fromsklearnimportdatasets importtime X, y = datasets.make_classification(n_samples=10000, n_features=50, n_informative=20, n_classes=10) start =time.time() model = RandomForestClassifier(n_estimators=500) model.fit(X, y) print('Time:',time.time()-start)
Time taken :: 34.17733192443848
seconds.
现在,我们来看看如何减少这个算法的运行时间。我们知道这个算法在一定程度上实行并行化,但什么样的并行化才是合适的呢?它没有任何 IO 瓶颈;相反,这是一项非常耗费 CPU 的任务。因此,多进程将是合理的选择。
幸运的是, sklearn
已经在这个算法中实现了多进程,我们不必从头开始编写。正如你在下面的代码中所看到的那样,我们只需提供一个参数 n_jobs
,即它应该使用的进程数量,来启用多进程。
复制代码
fromsklearn.ensembleimportRandomForestClassifier fromsklearnimportdatasets importtime X, y = datasets.make_classification(n_samples=10000, n_features=50, n_informative=20, n_classes=10) start =time.time() model = RandomForestClassifier(n_estimators=500, n_jobs=4) model.fit(X, y) print('Time:',time.time()-start)
Time taken :: 14.576200723648071
seconds.
正如预期的那样,多进程使其运行速度提高了很多。
结论
大多数(如果不是所有的话)数据科学项目将会看到并行计算速度大幅提高。事实上,许多流行的数据科学库已经内置了并行性, 你只需启用它即可 。因此,在尝试自己实现它之前,请先查看正在使用的库的文档,并检查它是否支持并行性(顺便说一句,我强烈建议你查看 dask )。如果没有的话,希望本文能够帮助你自己来实现并行性。