Python并行实例

任务

def single():
# 单进程单线程实现
s = 0
for i in range(1, N):
s += math.sqrt(i)
return s

结论

  • Python多线程无法利用多核
  • Python多进程可以利用多核
  • Numpy速度远超并行的Python代码
  • twisted无法利用多核

实现

import math
import multiprocessing
import threading
import timeit import numpy as np
from twisted.internet import reactor
import time N = 10000000 def single():
# 单进程单线程实现
s = 0
for i in range(1, N):
s += math.sqrt(i)
return s def useThread():
# 多线程实现
total_sum = 0 def go(beg, end):
nonlocal total_sum
s = 0
for i in range(beg, end):
s += math.sqrt(i)
total_sum += s # python无法利用多核,所以这句话每个时刻只有一个线程在执行 thread_count = 4
per = math.ceil(N / thread_count)
thread_list = []
for i in range(thread_count):
th = threading.Thread(target=go, args=(i * per, (i + 1) * per))
thread_list.append(th)
th.start()
for th in thread_list:
th.join()
return total_sum def useMultiprocess():
# 使用多进程
def go(q: multiprocessing.Queue, beg, end):
s = 0
for i in range(beg, end):
s += math.sqrt(i)
q.put(s) process_count = 4
per = math.ceil(N / process_count)
process_list = [] q = multiprocessing.Queue()
for i in range(process_count):
th = multiprocessing.Process(target=go, args=(q, i * per, (i + 1) * per))
process_list.append(th)
th.start()
for th in process_list:
th.join()
total_sum = 0
try:
while 1:
x = q.get_nowait()
total_sum += x
except:
pass
return total_sum def useTwisted():
# reactor是单例模式,一个进程只有一个reactor,一个reactor包括多个线程
total_sum = 0
ok_count = 0
thread_count = 4 def go(beg, end):
nonlocal total_sum
s = 0
for i in range(beg, end):
s += math.sqrt(i)
reactor.callFromThread(accumulate, s) def accumulate(s):
nonlocal total_sum
nonlocal ok_count
ok_count += 1
if ok_count == thread_count:
reactor.stop()
total_sum += s def process_work(q):
reactor.suggestThreadPoolSize(thread_count)
per = math.ceil(N / thread_count)
for i in range(thread_count):
reactor.callInThread(go, i * per, i * per + per)
reactor.run()
q.put(total_sum) q = multiprocessing.Queue()
p = multiprocessing.Process(target=process_work, args=(q,))
p.start()
p.join()
return q.get() def useTwisted2():
# reactor是单例模式,一个进程只有一个reactor,一个reactor包括一个线程
total_sum = 0
thread_count = 4
ok_count = 0
beg_time = time.time() def go(beg, end):
nonlocal total_sum
s = 0
for i in range(beg, end):
s += math.sqrt(i)
reactor.callFromThread(accumulate, s) def accumulate(s):
nonlocal total_sum
nonlocal ok_count
total_sum += s
ok_count += 1
if ok_count == thread_count:
print(time.time() - beg_time, "value", total_sum) reactor.suggestThreadPoolSize(thread_count)
per = math.ceil(N / thread_count)
for i in range(thread_count):
reactor.callInThread(go, i * per, i * per + per) def useNumpy():
a = np.linspace(1, N, N)
return np.sum(np.sqrt(a)) def main():
for method in (single, useThread, useMultiprocess, useNumpy, useTwisted, useTwisted2):
print(method.__name__, "result", method(), "time", timeit.timeit(method, number=10))
reactor.run() if __name__ == '__main__':
main()

twisted无法利用多核

from twisted.internet import threads, reactor
import time
import math beg_time = time.time() def go():
print("go start")
s = 0
for i in range(10000000):
s += math.sqrt(i + 1)
print("go over", time.time() - beg_time) import timeit reactor.suggestThreadPoolSize(8)
print(timeit.timeit(go, number=1))
for i in range(10):
reactor.callInThread(go)
reactor.run()
上一篇:pt-query-digest 使用说明


下一篇:【POJ】2142 The Balance 数论(扩展欧几里得算法)