day10---multiprocess 多进程

multiprocess
Queue  \ Pipe 只是实现进程间数据的传递
Manager 实现了进程间数据的共享,即多个进程可以修改同一份数据
 

进程模块 multiprocessing

#!/usr/bin/env python
# -*- coding: utf-8 -*- from multiprocessing import Process
def run(name):
print('my name is %s'% name)
if __name__ == '__main__':
p = Process(target=run,args=('lilei',))#创建一个进程实例
p.start()
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author:DCC
from multiprocessing import Process
import multiprocessing
import time,threading
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print("\n\n")
def f(name):
info('\033[31;1mfunction f\033[0m')
print('hello', name)
if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m')
p = Process(target=f, args=('bob',))
p.start() def thread_run():
print(threading.get_ident())
def run(name):
time.sleep(2)
print("hello",name)
t = threading.Thread(target=thread_run,)
t.start()
if __name__ == "__main__":
for i in range(5):
p = multiprocessing.Process(target=run,args=("bob %s" %i,))
p.start()

进程号获取,父子进程关系

from multiprocessing import Process
import os def info(title): #info 函数打印父子进程
print(title)
print('module name:',__name__)
print('parent process name:',os.getppid())#打印父进程
print('child process name:',os.getpid())#打印子进程 def f(name):
info('\033[31;1m called from child process function f \033[0m')#打印f函数的父子进程
print('hello ',name) if __name__ == '__main__':
info('\033[32;1m main process \033[0m') #主程序调用info函数打印父子进程
p = Process(target=f,args=('hanmeimei',)) #主程序启动一个子进程,打印子进程的父子函数
p.start()
p.join()

进程间通信Queue,pipe

from multiprocessing import Queue,Process

def f(cq):
print('in child before cq.put:',cq.qsize()) #子进程put前查看队列中是否有数据
cq.put(['my','name','is',['lilei','xixi']]) #往队列中添加一个元素 if __name__ == '__main__':
mq = Queue() #定义进程队列实例
mq.put('fome main') #往队列中添加一个元素
p = Process(target=f,args=(mq,))#创建一个子进程,并将mq传给子进程
p.start() #启动
p.join() #等待子进程执行完毕
print('',mq.get_nowait())#获取队列元素
print('',mq.get_nowait())
from multiprocessing import  Process,Pipe

def f(conn):
conn.send("from child1") #发送数据
conn.send("from child2") #发送数据
print('client recv:',conn.recv())#接收数据
conn.close() if __name__ == '__main__':
a_conn, b_conn = Pipe() #创建管道
p = Process(target=f,args=(b_conn,)) #实例化子进程,函数f,参数管道的一端
p.start()
print(a_conn.recv())
print(a_conn.recv())
a_conn.send('from parent') #父进程发送数据
p.join()

进程间数据共享manager

from multiprocessing import Process,Manager
import os
def run(d,l):
d[os.getpid()] = os.getpid() #以当前子进程的pid为key,同时pid也作为value
l.append(os.getpid())
print(d,l) if __name__ == '__main__':
with Manager() as manager:
d = manager.dict() #manager 字典
l = manager.list() #manager 列表
p_list = [] #空的列表,为之后的添加进程实例
for i in range(10): #启动多个子进程
p = Process(target=run,args=(d,l))#起一子进程,执行run参数d,l
p.start()
p_list.append(p) #添加进程实例至列表
for r in p_list: #循环子进程
r.join() #等待子进程结束
print(d) #打印最终的字典
print(l) #打印最终的列表

进程同步

Without using the lock output from the different processes is liable to get all mixed up.

from multiprocessing import Process, Lock

def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release() if __name__ == '__main__':
lock = Lock() for num in range(10):
Process(target=f, args=(lock, num)).start()

进程池 

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async
from  multiprocessing import Process,Pool
import time def Foo(i):
time.sleep(2)
return i+100 def Bar(arg):
print('-->exec done:',arg) pool = Pool(5) for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)
#pool.apply(func=Foo, args=(i,)) print('end')
pool.close()
pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author:DCC
from multiprocessing import Process, Pool ,freeze_support
import time
import os def Foo(i):
print("in process",os.getpid())
time.sleep(2)
return i + 100 def Bar(arg):
print('-->exec done:', arg,os.getpid()) if __name__ == "__main__":
freeze_support()
pool = Pool(3)
#pool = Pool(processes=5)
print("主进程号", os.getpid())
for i in range(10):
pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback 回调,当func进行完的时候,再执行callback
# pool.apply(func=Foo, args=(i,)) print('end')
pool.close() # 必须先关闭,再join
pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
上一篇:日志收集框架 Exceptionless


下一篇:io流和序列化