进程间通讯测试
from multiprocessing import Process,Pipe,Queue
def read_pipe(output,input):
output_p,input_p = output,input
while True:
try:
output_p.recv()
except EOFError:
break
def write_pipe(cout,input_p):
for i in range(cout):
input_p.send(i)
def reader_queue(queue):
while True:
msg = queue.get()
if msg == ‘DONE‘:
break
def write_queue(count,queue):
for i in range(count):
queue.put(i)
queue.put(‘DONE‘)
if __name__ == ‘__main__‘:
#
# print(‘test for pipe‘)
#
# for i in [10,10**2,10**3]:
# output_p,input_p = Pipe()
#
# reader_p = Process(target=read_pipe,args=(output_p,input_p))
#
# reader_p.start()
#
#
#
# _start = time.time()
#
# print(_start)
#
# write_pipe(i,input_p)
#
# reader_p.join()
# print("sending %s numbers to pipe() took %s seconds" %(i,time.time()-_start))
print(‘test for queue‘)
for i in [10 ** 3, 10 ** 4, 10 ** 5]:
queue = Queue()
reader_p = Process(target=reader_queue, args=(queue,))
reader_p.start()
_start = time.time()
write_queue(i, queue)
reader_p.join()
print("sending %s numbers to queue() took %s seconds" % (i, time.time() - _start))进程间通信
def func(val):
for i in range(10):
time.sleep(0.1)
#必须控制锁
with val.get_lock():
val.value += 1
if __name__ == ‘__main__‘:
v = Value(‘i‘,0)
process_list = [Process(target=func,args=(v,)) for i in range(10)]
for j in process_list:
j.start()
for i in process_list:
i.join()
print(v.value)
使用multiprocessing 克服GIL缺陷-- 进程间通讯