守护进程,锁,队列(生产者消费者模型)

一、守护进程

守护进程会随着主进程的代码结束之后在结束,而不是等待整个主进程结束(因为主进程要回收资源)

主进程的代码什么时候结束,守护进程就什么时候结束,和其他子进程执行进度无关

主进程会等待所有的子进程结束,是为了回收子进程的资源

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

开启一个守护进程:在start一个进程之前设置 daemon = True

import time
from multiprocessing import Process
?
?
def func1():
    while True:         # 此进程会每隔1秒无限打印 is func1
        print(is func1)
        time.sleep(1)
?
?
if __name__ == __main__:
    p = Process(target=func1)
    p.daemon = True     # 表示p是一个守护进程
    p.start()
    time.sleep(3)       # 主进程代码只执行3秒,守护进程同样只执行3秒
    
# 输出
is func1
is func1
is func1

要求守护进程p1必须在p2进程执行结束之后才结束

import time
from multiprocessing import Process
?
?
def func1():
    while True:         # 此进程会每无限打印,每秒打印一次 is func1
        print(is func1)
        time.sleep(1)
?
?
def func2():
    for i in range(5):  # 此进程会执行5秒,每秒打印一次 is func2
        print(is fun2)
        time.sleep(1)
?
?
if __name__ == __main__:
    p1 = Process(target=func1)
    p1.daemon = True     # 表示p是一个守护进程
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    p2.join()           # 主进程会等待 p2 进程结束后在结束
    
# 等待p2结束 ——> 主进程代码才结束 ——> 守护进程结束

二、进程同步—锁(multiprocessing . Lock)

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端是没问题的。

而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,速度时慢了,但牺牲速度保证了数据安全。

锁非常细致,可以控制具体的某一句代码控制成同步的,其他的还是并发的

1、加锁的格式:Lock

from multiprocessing import Process, Lock
import time
?
?
def func(i, lock):
    lock.acquire()  # 拿钥匙
    print(f被锁起来的代码{i})
    time.sleep(1)   # 锁起来的代码执行时间
    lock.release()  # 还钥匙
?
?
    # 也可以只写成这样
    # 锁非常细致,可以控制具体的某一句代码控制成同步的,其他的还是并发的
    # 和with open 一样的,(推荐这样写,在执行里面的的代码时那钥匙开锁,执行完了之后还钥匙)
    # 代替acquire和release 并且在此基础上做一些异常处理,保证即便一个进程的代码出错退出了,也会归还钥匙
    
    """
    with lock:
        print(f‘被锁起来的代码{i}‘)
        time.sleep(1)  # 锁起来的代码执行时间
    """
?
?
if __name__ == __main__:
    lock = Lock()   # 生成锁
    for i in range(5):
        p = Process(target=func, args=(i, lock))
        p.start()
        
# 输出
被锁起来的代码0
被锁起来的代码1
被锁起来的代码3
被锁起来的代码2
被锁起来的代码4

多进程抢票例子不加锁:效率高,但是顺序容易错乱

只剩一张票却被多人买到,因为每有一人买票,买到了就会去修改票的数量,在这期间会有延迟,在同一时间又有一人来买票,但是票的数量还没改回来,造成了数据不安全同一张票被多人购买。

import json
import time
from multiprocessing import Process
?
?
def search(i):
    with open(ticket, encoding=utf-8, mode=r) as f:
        ticket = json.load(f)
        print(f{i}查:当前的票还剩余{ticket["count"]}张。)
?
# 查询还剩多少票,如果大于0就把数量减1后在写入文件,中间会有延迟
def buy_ticket(i):
    with open(ticket, encoding=utf-8, mode=r) as f:
        ticket = json.load(f)
        if ticket[count] > 0:
            ticket[count] -= 1
            print(f{i},买到票了。)
?
    time.sleep(0.1)         # 模拟买到票后修改票数量网络延迟
    with open(ticket, encoding=utf-8, mode=w) as f:
        json.dump(ticket, f)
?
?
def get_ticket(i):
    search(i)
    buy_ticket(i)
?
?
if __name__ == __main__:
    user = [(小杨,), (小红,), (鲍勃,), (艾伦,), (爱丽,)]
    for name in user:
        p = Process(target=get_ticket, args=name)
        p.start()
        
# 输出
小杨查:当前的票还剩余1张。
小杨,买到票了。
小红查:当前的票还剩余1张。
小红,买到票了。
鲍勃查:当前的票还剩余1张。
鲍勃,买到票了。
艾伦查:当前的票还剩余1张。
艾伦,买到票了。
爱丽查:当前的票还剩余1张。
爱丽,买到票了。

多进程抢票例子加锁:牺牲了效率,但是保障了数据安全和顺序

import json
import time
from multiprocessing import Process, Lock
?
?
def search(i):
    with open(ticket, encoding=utf-8, mode=r) as f:
        ticket = json.load(f)
        print(f{i}查:当前的票还剩余{ticket["count"]}张。)
?
?
def buy_ticket(i):
    with open(ticket, encoding=utf-8, mode=r) as f:
        ticket = json.load(f)
        if ticket[count] > 0:
            ticket[count] -= 1
            print(f{i},买到票了。)
?
    time.sleep(0.1)         # 模拟买到票后修改票数量网络延迟
    with open(ticket, encoding=utf-8, mode=w) as f:
        json.dump(ticket, f)
?
?
def get_ticket(i, lock):
    search(i)       # 查票没加锁,所以所有人都是同时查票的
    with lock:      # 只在买票这里加了锁,同时只有一个人能买票,和修改票的数量
        buy_ticket(i)
?
?
if __name__ == __main__:
    user = [小杨, 小红, 鲍勃, 艾伦, 爱丽]
    lock = Lock()
    for name in user:
        p = Process(target=get_ticket, args=(name, lock))
        p.start()
        
# 输出
小杨查:当前的票还剩余2张。
小杨,买到票了。
小红查:当前的票还剩余2张。
鲍勃查:当前的票还剩余2张。
爱丽查:当前的票还剩余2张。
艾伦查:当前的票还剩余2张。
小红,买到票了。

Lock为互斥锁,不能在同一个进程中连续acquire多次

from multiprocessing import Lock # 互斥锁 不能再同一个进程中连续acquire多次
lock = Lock()       # 创建锁
lock.acquire()      # 拿钥匙
print(1)
lock.acquire()      # 拿钥匙——>会卡在这里,因为没有还钥匙,就拿不到钥匙,拿钥匙和还钥匙要成对出现
print(2)
lock.release()      # 还钥匙
?
# 输出
1

三、队列

进程之间的通信(IPC):两种形式

1、基于文件的:同一台机器上的多个进程之间通信

Queue 队列

基于socket的文件级别的通信来完成数据传递的

2、基于网络的

第三方工具(消息中间件)

memcache

redis

rabbitmg

kafka

总之进程彼此之间内存是相互隔离,要实现进程间通信(IPC), multiprocessing模块支持队列,这种方法是使用消息传递的。

1、主要方法:

from multiprocessing import Queue
?
q = Queue()     # 实例化一个队列
?
q.get()         # 放入数据到队列中
q.put()         # 从队列中取出数据
q.get_nowait()  # 同q.get(False)
q.put_nowait()  # 同q.put(False)
# 遵循先进先出,后进后出原则

2、队列用法:

子进程和主进程之间的数据传递

from multiprocessing import Process, Queue
?
?
def func1(q):
    # 2、子进程中对主进程中的队列进行传值5次
    for i in range(5):
        q.put(f子进程func1数据{i})
?
?
if __name__ == __main__:
    q = Queue()     # 1、在主进程实例化一个队列
    p = Process(target=func1, args=(q, ))
    p.start()
?
    # 3、在主进程中查看子进程对主进程的队列所传的值
    for i in range(5):
        print(q.get())
# 输出
子进程func1数据0
子进程func1数据1
子进程func1数据2
子进程func1数据3
子进程func1数据4

子进程之间的数据传递

from multiprocessing import Process, Queue
?
?
def func1(q):
    # 子进程中对主进程中的队列进行传值3次
    for i in range(3):
        q.put(f子进程func1数据{i})
?
?
def func2(q):
    # 子进程中对主进程中的队列进行传值2次
    for i in range(2):
        q.put(f子进程func2数据{i})
?
?
def show_func(q):
    # 子进程查看其他子进程传入的数据
    for i in range(5):
        print(q.get())
?
?
if __name__ == __main__:
    q = Queue()     # 在主进程实例化一个队列
    p1 = Process(target=func1, args=(q, ))
    p1.start()
    p2 = Process(target=func2, args=(q,))
    p2.start()
    p3 = Process(target=show_func, args=(q,))
    p3.start()
    
# 输出
子进程func1数据0
子进程func1数据1
子进程func1数据2
子进程func2数据0
子进程func2数据1

四、生产者消费者模型

在并发编程中使用生产者消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线和消费线的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

本质 : 就是让生产数据和消费数据的效率达到平衡并且最大化的效率,这样的话能够提高我们程序的执行效率,并且也能够降低我们要开的并发数,还能让我们的操作系统压力小一下

基于队列实现生产者消费者模型:

from multiprocessing import Process, Queue
import time
?
?
def consumer(q, name):  # 消费者:通常取到数据之后还要进行某些操作
    while True:
        food = q.get()
        if food:
            time.sleep(1)   # 消费者吃的速度
            print(f{name}吃了{food})
        else:
            print(吃完了!!)
            break   # 当接收到了结束信号就会自动结束,不然会卡住。
?
?
def producer(q, name, food):  # 生产者:通常在放数据之前需要先通过某些代码来获取数据
    for i in range(3):
        # 生产一个食物需要2秒, 而吃的话只要1秒,这样生产者就需要两个才能平衡
        time.sleep(2)         
        print(f{name}生产了一个食物{food+str(i)})
        q.put(f{food+str(i)})
?
?
if __name__ == __main__:
    q = Queue()
    p1 = Process(target=producer, args=(q, 小红, 苹果))
    p2 = Process(target=producer, args=(q, 小杨, 橘子))
    c = Process(target=consumer, args=(q, 鲍勃))
    p1.start()  # 生产者p1执行
    p2.start()  # 生产者p2执行
    c.start()   # 消费者c执行
    p1.join()   # 等待p1生产者生产完
    p2.join()   # 等待p2生产者生产完
    q.put(False)    # 生产者全部生产完了之后就放入一个False结束信号
                    # 这样消费者发现这个信号就主动退出,消费者有几个信号就要有几个
?
# 输出
小红生产了一个食物苹果0
鲍勃吃了苹果0
小杨生产了一个食物橘子0
鲍勃吃了橘子0
小红生产了一个食物苹果1
鲍勃吃了苹果1
小杨生产了一个食物橘子1
鲍勃吃了橘子1
小红生产了一个食物苹果2
鲍勃吃了苹果2
小杨生产了一个食物橘子2
鲍勃吃了橘子2
吃完了!!

生产者消费者模型总结:

程序中有两类角色:

1、一类负责生产数据(生产者)

2、一类负责处理数据(消费者)

引入生产者消费者模型为了解决的问题是:

平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度

如何实现:

生产者<——>队列<——>消费者

守护进程,锁,队列(生产者消费者模型)

上一篇:Linux系统概念模型——课程总结


下一篇:04Linux网络编程基础 ---- 信号