一 进程对象其他方法
pid号:当前进程的进程号
一台计算机上面运行着很多进程,那么计算机是如何区分并管理这些进程服务端呢?
计算机会给每一个运行的进程分配一个 PID号
如何查看
? window 电脑:
? 进入 cmd 输入 tasklist 即可查看
? tasklist |findstr PID号 查看具体的进程
? mac 电脑:
? 进入终端之后输入 ps aux
? ps aux | grep PID号 查看具体的进程
"""
一台计算机上面运行着很多进程,那么计算机是如何区分并管理这些进程服务端的呢?
计算机会给每一个运行的进程分配一个PID号
如何查看
windows电脑
进入cmd输入tasklist即可查看
tasklist |findstr PID查看具体的进程
mac电脑
进入终端之后输入ps aux
ps aux|grep PID号 查看具体的进程
"""
from multiprocessing import Process, current_process
current_process().pid # 查看当前进程的进程号
import os
os.getpid() # 查看当前进程进程号
os.getppid() # 查看当前进程的父进程进程号
p.terminate() # 杀死当前进程
# 是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快
time.sleep(0.1)
print(p.is_alive()) # 判断当前进程是否存活
? 案例:
import time
import os
def task():
print(‘子进程start‘)
main_process_pid = os.getppid()
print(‘当前进程的父进程号:‘, main_process_pid)
time.sleep(3)
print(‘子进程 over‘)
if __name__ == ‘__main__‘:
p = Process(target=task)
p.start()
# 是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快
time.sleep(4)
p.terminate() # 杀死进程
#判断当前进程是否存活
print(p.is_alive()) #False
print(‘主‘)
二 僵尸进程与孤儿进程(了解)
? 当你开设了子进程之后 该进程死后不会立刻释放占用的进程号。是因为需要让父进程能够查看到它开设的子进程的一些基本信息 占用的pid号 运行时间。这是操作系统的机制。
1、僵尸进程
? 僵尸进程是当子进程比父进程先结束,而父进程又没有回收子进程,释放子进程占用的资源,此时子进程将成为一个僵尸进程。
? 也就是,只要创建子进程,就会存在僵尸进程(不管父进程是否及时回收pid 号),此时的进程 pid 号还是被占用的,当过多的僵尸进程存在时,就会占用大量的 pid 号,而 pid 号是有限的,所以需要及时获取子进程的状态信息。
? 僵尸进程是有害的
产生僵尸进程的程序:
#coding:utf-8
from multiprocessing import Process
import time,os
def run():
print(‘子‘,os.getpid())
if __name__ == ‘__main__‘:
p=Process(target=run)
p.start()
print(‘主‘,os.getpid())
time.sleep(100)
等待父进程正常结束后会调用wait/waitpid去回收僵尸进程
如果父进程永远不会结束,那么该僵尸进程就会一直存在,僵尸进程过多,就是有害的
解决方法一:杀死父进程
解决方法二:对开启的子进程应该记得使用join,join会回收僵尸进程
# coding:utf-8
from multiprocessing import Process
import time, os
def task():
print(‘%s is running‘ % os.getpid())
time.sleep(3)
if __name__ == ‘__main__‘:
p = Process(target=task)
p.start()
p.join() # 等待进程p结束后,join函数内部会发送系统调用wait,去告诉操作系统回收掉进程p的id号
#子进程被回收后,主进程查看子进程的进程号是可以看到的
print(p.pid) #67389
print(‘主‘)
‘‘‘
子进程被回收后,主进程查看子进程的进程号是可以看到的
原因是:
p.join()是像操作系统发送请求,告知操作系统p的id号不需要再占用了,回收就可以了,
子进程的PID号被操作系统回收后,此 PID 就没有意义了,但是主进程是子进程的父亲,
还是可以看到子进程的pid的号的
‘‘‘
解决方法三:参考博客:http://blog.csdn.net/u010571844/article/details/50419798
2、孤儿进程
? 当父进程意外结束时(非正常结束),子进程PID不会被回收,这时候,子进程就变成了孤儿进程,操作系统相当于‘孤儿院‘,会调用 init()来专门管理孤儿进程回收相关的资源。孤儿进程是无害的,可以将子进程变为守护进程,当主进程结束后,子进程跟着结束。
三 守护进程
? 主进程将创建的子进程变成守护进程。
? 守护进程:主进程结束,子进程跟着结束。
from multiprocessing import Process
import time
def task(name):
print(‘%s子进程正在活着‘% name)
time.sleep(3)
print(‘%s子进程正在死亡‘ % name)
if __name__ == ‘__main__‘:
p = Process(target=task,args=(‘qq‘,))
# p = Process(target=task,kwargs={‘name‘:‘qq‘})
p.daemon = True # 将进程p设置成守护进程 这一句一定要放在start方法上面才有效否则会直接报错
p.start()
print(‘主程序结束了‘)
四 互斥锁
? 多个进程操作同一份数据的时候,会出现数据错乱的问题
? 针对上述问题,解决方式就是加锁处理:
? 将并发编程串行,牺牲效率但是保证了数据的安全
买票案例:
# coding:utf-8
from multiprocessing import Process
import json
import random
import time
#查票
def search(name):
#文件操作读取票数
with open(‘data.json‘, ‘r‘,encoding=‘utf-8‘)as f:
dic = json.load(f)
return dic
# 字典取值不要用[]的形式 推荐使用get 你写的代码打死都不能报错!!!
#买票
def buy(name):
#1 用户先查看票再买票
dic = search(name)
print(‘用户%s查看余票:%s‘%(name, dic.get(‘ticket_num‘)))
#模拟网络延迟
time.sleep(random.randint(1, 3))
#买票,即修改后台数据
if dic[‘ticket_num‘] > 0:
#可以买票
dic[‘ticket_num‘] -= 1
with open(‘data.json‘, ‘w‘, encoding=‘utf-8‘)as f:
json.dump(dic, f)
print(f‘用户{name}买票成功‘)
else:
print(f‘无票,用户{name}购买失败‘)
#整合上面两个函数
def run(name):
search(name)
buy(name)
if __name__ == ‘__main__‘:
for index in range(1, 5):
p = Process(target=run, args=(index,))
p.start()
# 结果展示
‘‘‘
用户1查看余票:1
用户2查看余票:1
用户3查看余票:1
用户4查看余票:1
用户4买票成功
用户1买票成功
用户3买票成功
用户2买票成功
‘‘‘
可以看到只有一张票,但是所有用户都买到票了,有问题。原因是:当所有进程操作数据时,会造成数据错误。是因为进程之间存在竞争关系,相互之间数据不共享,导致所有进程同一时间都可以操作数据
进阶版:加互斥锁处理,加锁的意义是:同一时间只会有一个进程运行。
from multiprocessing import Process, Lock
import json
import time
import random
# 查票
def search(i):
# 文件操作读取票数
with open(‘data.json‘,‘r‘,encoding=‘utf8‘) as f:
dic = json.load(f)
print(‘用户%s查询余票:%s‘%(i, dic.get(‘ticket_num‘)))
# 字典取值不要用[]的形式 推荐使用get 你写的代码打死都不能报错!!!
# 买票 1.先查 2.再买
def buy(i):
# 先查票
with open(‘data.json‘,‘r‘,encoding=‘utf8‘) as f:
dic = json.load(f)
# 模拟网络延迟
time.sleep(random.randint(1,3))
# 判断当前是否有票
if dic.get(‘ticket_num‘) > 0:
# 修改数据库 买票
dic[‘ticket_num‘] -= 1
# 写入数据库
with open(‘data.json‘,‘w‘,encoding=‘utf8‘) as f:
json.dump(dic,f)
print(‘用户%s买票成功‘%i)
else:
print(‘用户%s买票失败‘%i)
# 整合上面两个函数
def run(i, mutex):
search(i)
# 给买票环节加锁处理
# 抢锁
mutex.acquire()
buy(i)
# 释放锁
mutex.release()
if __name__ == ‘__main__‘:
# 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票
mutex = Lock()
for i in range(1,6):
p = Process(target=run, args=(i, mutex))
p.start()
"""
#结果展示
‘‘‘
用户1查询余票:1
用户2查询余票:1
用户3查询余票:1
用户4查询余票:1
用户5查询余票:1
用户1买票成功
用户2买票失败
用户3买票失败
用户4买票失败
用户5买票失败
‘‘‘
扩展 行锁 表锁
注意:
1.锁不要轻易的使用,容易造成死锁现象(我们写代码一般不会用到,都是内部封装好的)
2.锁只在处理数据的部分加来保证数据安全(只在争抢数据的环节加锁处理即可)
? 加锁处理的目的,就是解决进程之间竞争关系,混乱的操作后台数据。
? 互斥锁:
? 优点:
? 解决了进程之间竞争关系,同一时间只能有一个进程运行,保证了数据安全
? 缺点:
? 将并行变成了串行,牺牲了速度,效率低
所以需要找一种还是并发(保证运行效率),同时也可以保证安全正确的修改后台数据,即多个进程同时共享内存数据,这便是 IPC 通信机制:队列和管道。
五 进程之间通信
1、队列
? 队列和管道都是将数据存放于内存中的,通过队列和管道可以实现进程之间的数据传递。
? 队列:是基于管道+锁实现的
1 Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
参数:maxsize是队列中允许最大项数,省略则无大小限制。
? 队列 Queue模块使用
# coding:utf-8
from multiprocessing import Queue
#创建一个队列
q = Queue(5) #括号内可以传数字,表示生成的队列对打可以同时存放的数据量
#往队列中存数据
q.put(111)
q.put(222)
q.put(333)
q.put(444)
q.put(555)
# q.put(666) #阻塞
# q.put(666) #原因是,队列可存数5 个,当队列中数据满了时,再往里存时会阻塞,不会报错
print(q.full()) #True #判断当前队列是否满了
print(q.empty()) #False #判断当前队列是否为空了
‘‘‘
存取数据,存时为了更好的取
‘‘‘
#去队列中取数据
v1 = q.get()
v2 = q.get()
v3 = q.get()
v4 = q.get()
v5 = q.get()
print(q.empty()) #True
# v6 = q.get() #阻塞,队列中没有数据可以取了,所以它会原地等待队列中数据,不会报错
# v6 = q.get_nowait() #报错,队列中没有数据时,不会等待,直接报错queue.Empty
# v6 = q.get(timeout=3) #没有数据之后,原地等待获取,三秒之后队列中还没有数据,再报错queue.Empty
#抛出异常方式
try:
v6 = q.get(timeout=3)
print(v6)
except Exception as e:
print(‘一滴都没有了‘)
"""
q.full()
q.empty()
q.get_nowait()
在多进程的情况下是不精确
"""
2、IPC 机制
? IPC 通信机制:进程之间使用队列或管道传递数据
from multiprocessing import Queue, Process
"""
研究思路
1.主进程跟子进程借助于队列通信
2.子进程跟子进程借助于队列通信
"""
def producer(q):
q.put(‘我是23号技师 很高兴为您服务‘)
def consumer(q):
print(q.get())
if __name__ == ‘__main__‘:
q = Queue()
p1 = Process(target=producer,args=(q,))
p2 = Process(target=consumer,args=(q,))
p1.start()
p2.start()
3、生产者与消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
#生产者消费者模型总结:
#程序中有两类角色
一类负责生产数据(生产者)
一类负责处理数据(消费者)
#引入生产者消费者模型为了解决的问题是:
平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
#如何实现:
生产者<-->队列<——>消费者
#生产者消费者模型实现类程序的解耦和
? 生产这与消费者模式是一种IPC 机制案例:
"""
生产者:生产/制造东西的
消费者:消费/处理东西的
该模型除了上述两个之外还需要一个媒介
生活中的例子做包子的将包子做好后放在蒸笼(媒介)里面,买包子的取蒸笼里面拿
厨师做菜做完之后用盘子装着给你消费者端过去
生产者和消费者之间不是直接做交互的,而是借助于媒介做交互
生产者(做包子的) + 消息队列(蒸笼) + 消费者(吃包子的)
"""
简单版:
# coding:utf-8
from multiprocessing import Process, Queue
import time
import random
def producer(name, food, q):
for i in range(1, 4):
data = f‘{name}生产了{food}{i}‘
#模拟延迟
time.sleep(random.randint(1,3))
print(data)
q.put(data)
def consumer(name, q):
#消费者吃完所有的食物
while True:
food = q.get() #没有数据时就会卡主
#模拟延迟
time.sleep(random.randint(1, 3))
print(f‘{name}吃了{food}‘)
if __name__ == ‘__main__‘:
q = Queue()
p1 = Process(target=producer, args=(‘egon‘, ‘包子‘, q))
p2 = Process(target=producer, args=(‘tank‘, ‘鸡腿‘, q))
c1 = Process(target=consumer, args=(‘jason‘, q))
p1.start()
p2.start()
c1.start()
# 结果展示
‘‘‘
egon生产了包子1
tank生产了鸡腿1
jason吃了egon生产了包子1
tank生产了鸡腿2
egon生产了包子2
tank生产了鸡腿3
jason吃了tank生产了鸡腿1
egon生产了包子3
jason吃了egon生产了包子2
jason吃了tank生产了鸡腿2
jason吃了tank生产了鸡腿3
jason吃了egon生产了包子3
此时进程会阻塞住
‘‘‘
此时进程已经阻塞住了,原因是消费者运用循环在队列中取数据,当队列中没有数据时,就是卡住,导致是主进程永远不会结束。
? 解决问题思路:
? 可以判断队列中的数据是否为空,如果为空,即退出循环,但是由于在多进程下,q.full( ) 、q.empty( )、q.get_nowait( ) 不精确,所以不能使用。
? 让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。
? 进阶版:
当队列中数据被取完时,消费者进程就没有必要存在了,可以将消费者进程变为守护进程,当主进程结束后(即队列中没有数据时),消费者进程跟着结束。
? ps:结束信号None,不一定要由生产者发,主进程里同样也可以发,但主进程需要等生产者结束后才应该发送该信号
from multiprocessing import Process, Queue
import time
import random
def producer(name,food,q):
for i in range(5):
data = ‘%s生产了%s%s‘%(name,food,i)
# 模拟延迟
time.sleep(random.randint(1,3))
print(data)
# 将数据放入 队列中
q.put(data)
def consumer(name,q):
# 消费者胃口很大 光盘行动
while True:
food = q.get() # 没有数据就会卡住
# 判断当前是否有结束的标识
# if food is None:break
time.sleep(random.randint(1,3))
print(‘%s吃了%s‘%(name,food))
if __name__ == ‘__main__‘:
q = Queue()
p1 = Process(target=producer,args=(‘大厨egon‘,‘包子‘,q))
p2 = Process(target=producer,args=(‘马叉虫tank‘,‘泔水‘,q))
c1 = Process(target=consumer,args=(‘春哥‘,q))
c2 = Process(target=consumer,args=(‘新哥‘,q))
p1.start()
p2.start()
# 将消费者设置成守护进程
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
# 等待生产者生产完毕之后 往队列中添加特定的结束符号
q.put(None) # 肯定在所有生产者生产的数据的末尾
q.put(None) # 肯定在所有生产者生产的数据的末尾
解决这种问题思路只是是发送结束信号而已,有另外一种队列提供了这种机制,就是JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目(数据)的使用者通知生成者,项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
? 使用 JoinableQueue 模块生成队列对象,内部自带计数器。每当存入一个数时,计数器+1 ,取走一个数时,计数器 -1,方便用于多进程下,判断队列中是否有数据。即:
JoinableQueue的实例q的方法:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
? 终极版:
from multiprocessing import Process, Queue, JoinableQueue
import time
import random
def producer(name,food,q):
for i in range(5):
data = ‘%s生产了%s%s‘%(name,food,i)
# 模拟延迟
time.sleep(random.randint(1,3))
print(data)
# 将数据放入 队列中
q.put(data)
def consumer(name,q):
# 消费者胃口很大 光盘行动
while True:
food = q.get() # 没有数据就会卡住
# 判断当前是否有结束的标识
# if food is None:break
time.sleep(random.randint(1,3))
print(‘%s吃了%s‘%(name,food))
q.task_done() # 告诉队列你已经从里面取出了一个数据并且处理完毕了
if __name__ == ‘__main__‘:
# q = Queue()
q = JoinableQueue()
p1 = Process(target=producer,args=(‘大厨egon‘,‘包子‘,q))
p2 = Process(target=producer,args=(‘马叉虫tank‘,‘泔水‘,q))
c1 = Process(target=consumer,args=(‘春哥‘,q))
c2 = Process(target=consumer,args=(‘新哥‘,q))
p1.start()
p2.start()
# 将消费者设置成守护进程
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
# 等待生产者生产完毕之后 往队列中添加特定的结束符号
# q.put(None) # 肯定在所有生产者生产的数据的末尾
# q.put(None) # 肯定在所有生产者生产的数据的末尾
q.join() # 等待队列中所有的数据被取完再执行往下执行代码
"""
JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1
没当你调用task_done的时候 计数器-1
q.join() 当计数器为0的时候 才往后运行
"""
# 只要q.join执行完毕 说明消费者已经处理完数据了 消费者就没有存在的必要了