第五十六篇 并发之协程

目录

一、引子

1.线程队列

from queue import Queue, LifoQueue, PriorityQueue

1.普通线程队列

和JoinableQueue队列相似

# 普通线程队列
q = Queue(2)  # 设置可以最多放入的元素个数

q.put('a')
q.put('b') 
# q.put('c')  # 如果要放入的元素或要取出的元素大于设定的个数,则会卡死

print(q.get())
q.task_done()   # 调用task_done的个数和存入的个数相同时,join函数就不再阻塞程序
print(q.get())
q.task_done()   # 可以将多个task_done放在一起,也有效,只要和存入的元素个数相同即可

q.join()   # 阻塞函数,阻塞将一直持续到task_done的调用次数和存入线程队列的元素个数相等为止
print('over')

'''
a
b
over
'''

2.LifoQueue

1.lifo:last in first out 后进先出队列,用于模拟栈

2.和普通线程队列只有取元素的顺序不同,其他一样

lifoq = LifoQueue()

lifoq.put('a') 
lifoq.put('b')

print(lifoq.get())
print(lifoq.get())

'''
b
a
'''

3.PriorityQueue

1.具备优先级的队列,取出数据时,会比较大小,越小的数据优先级越高

# 对于数据则直接比较大小,对于字符串则比较首字母在字母表中的先后顺序(数字的优先级高于字母的优先级)
pq1 = PriorityQueue()

pq1.put(20)
pq1.put(2)

print(pq1.get())
print(pq1.get())
'''
2
20
'''

pq2 = PriorityQueue()

pq2.put('sir')
pq2.put('ace')

print(pq2.get())
print(pq2.get())
'''
ace
sir
'''

# 对于有序的容器对象,会先比较第一个元素的大小,如果相同再比较后面的相同位置上的元素大小,如果为空,则优先级最高
pq3 = PriorityQueue()

pq3.put([2,4])
pq3.put([1,5])
pq3.put([1])

print(pq3.get())
print(pq3.get())
'''
[1]
[1, 5]
[2, 4]
'''

2.如果存入的是一个自定义对象,我们可以通过运算符重载来规定比较规则,使得对象可以被比较

class Person:
    def __init__(self, name, age):
        self.age = age
        self.name = name
        
    # 覆盖比较运算符:当在两个对象之间使用比较运算符时,会自动执行该方法
    def __lt__(self, other):
        # 先比较年龄,如果相同,则比较姓名中字母的有序顺序
        # 返回的是bool值,当我们使用优先级队列时,它会根据bool值,谁小就返回谁
        if self.age == other.age:
            return self.name < other.name
        return self.age < other.age

q = PriorityQueue()

p1 = Person('king', 20)
p2 = Person('tom', 18)

q.put(p1)
q.put(p2)

print(q.get().name)
print(q.get().name)
'''
tom
king
'''

2.背景

1.上节课中我们知道GIL锁将导致CPython中的多线程无法并行执行,只能并发的执行,而实现并发的原理是切换+保存,那就意味着使用多线程实现并发,就需要为每一个任务创建一个线程,必然增加了线程创建/销毁/切换/保存所带来的开销

2.高并发下,由于任务数量太多导致无法开启新的线程,会存在既没有实际任务要执行,也无法创建新线程来处理新任务的情况

3.既要保证并发效果,也要避免创建线程带来的开销问题,在这个背景下,协程出现了,协程的原理是使用单线程来实现多任务并发

二、单线程实现并发

1.可行性

1.并发:多个任务同时处理,其实是切换加保存,由于CPU运行速度极快,所以看上去是同时进行

2.并行:利用多核CPU,真正实现多个任务同时处理

3.早期的计算机只有一个CPU,通过CPU切换线程来实现并发,所以线程内实现并发理论上是可行的

2.如何实现

并发 = 切换任务 + 保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,就可以实现单线程并发

1.用yield实现

1.python中的生成器就具备这样一个特点,每次调用next方法,都会回到生成器函数中执行代码,意味着任务之间可以切换,并且是基于上一次运行的结果,也即是生成器会自动保存执行状态

def task1():
    while True:
        yield  # 不会终止函数,且每次返回一个值,并保留当前状态以供下次调用
        print('task1 start')
        
def task2():
    t = task1()  # 运行生成器函数
    while True:
        next(t)  # 利用next方法,每次循环都会去生成器中运行代码
        print('task2 start')

task2()
import time

# 将注释取消就是协程
def task1():
    a = 0
    for i in range(10000):
        a +=1
        # yield
        
def task2():
    # t = task1()
    a = 0 
    for i in range(10000):
        a += 1
        # next(t)
        
start_t = time.time()
task2()
print(time.time() - start_t)

# 单线程下串行两个任务,效率反而比线程内并发高,因为并发要切换加保存

2.对于纯计算任务,单线程并发效率比串行还低,所以我们需要用在io操作多的任务中,但是yield生成器方案无法解决阻塞问题,而且如果任务比较多时,代码将非常复杂

3.greenlet模块

1.greenlet模块简化了yield复杂的代码结构,实现了单线程多任务并发

2.但是无论直接使用yield还是greenlet都不能检测IO操作,遇到IO时都会进入阻塞状态,都对纯计算任务而言效率没有提升

3.通过greenlet模块导入greenlet类,实例化对象之后,需要将切换执行的两个任务通过switch方法放入对方的函数内,来不断实现切换加保存,并通过两个任务中的一个实例化对象使用switch方法来开启协程

from greenlet import greenlet

def task1():
    a = 0
    for i in range(10000):
        a +=1
        t2.switch()   # 用于切换加保存
        
def task2():
    a = 0 
    for i in range(10000):
        a += 1
        t1.switch()
        
start_t = time.time()
t1 = greenlet(task1)
t2 = greenlet(task2)
t1.switch()   # 开启任意一个任务即可
print(time.time() - start_t)

三、协程

1.协程是什么

1.单线程下的并发,又称为微线程、纤程,英文名Coroutine。是一种用户态的轻量级线程,即协程是由用户程序自身控制调度的

2.对比操作系统控制线程的切换,用户在单线程内控制协程的切换

3.详解

# 1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会*交出cpu执行权限,切换其他线程运行)
# 2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

4.优点和缺点

优点:
# 1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
# 2. 单线程内就可以实现并发的效果,最大限度地利用cpu

缺点:
# 1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程来尽可能提高效率
# 2. 协程本质是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

2.gevent模块

1.gevent简介

1.由于greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时,如果遇到IO,就会原地阻塞,任然实现不了遇到IO自动切换以提高效率的并发效果

2.任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1遇到阻塞时,就利用阻塞的时间去执行任务2,如此,才能提高效率,这里就需要Gevent模块

3.Gevent 是一个第三方库,可以轻松通过gevent实现并发编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度

2.gevent如何使用

1.直接导入gevent类,并通过gevent.spawn实例化一个协程对象,spawn里面第一个参数是目标函数,后面的参数可以是目标函数所需要的参数(位置实参、关键字实参皆可)

2.必须至少有一个join函数来控制协程的运行,否则将只会执行主线程

# 执行没有加join函数的代码时不会输出任何消息
# 这是因为协程任务都是以异步方式提交,所以主线程会继续往下执行,而一旦执行完最后一行主线程也就结束了,
# 导致了协程任务没有来的及执行,所以这时候必须join来让主线程等待协程任务执行完毕,也就是让主线程保持存活
# 后续在使用协程时也需要保证主线程一直存活,如果主线程不会结束也就意味着不需要调用join

3.gevent.joinall()函数里面的参数是一个容器,可以将多个协程对象放进去

4.仍然无法解决阻塞时自动切换的问题

import gevent, time

# 会一直等到task1执行完,才会执行task2中的代码
def task1(name):
    print('%s start'%name)
    time.sleep(2)   # 模拟阻塞
    print('%s end'%name)
    
def task2():
    print('task2 start')
    print('task2 end')
    
g1 = gevent.spawn(task1, 'king')
g2 = gevent.spawn(task2)

# g1.join()
gevent.joinall([g1, g2])
print('over')

3.monkey补丁

1.monkey补丁的原理是把原始的阻塞方法替换为修改后的非阻塞方法,偷梁换柱,来实现IO自动切换
monkey补丁实现原理(举例):

# myjson.py文件中

def dump():
    print("一个被替换的 dump函数")

def load():
    print("一个被替换的 load函数")
# test.py 文件中

import myjson
import json

# 补丁函数示例
def monkey_pacth_json():
    json.dump = myjson.dump
    json.load = myjson.load
    
# 打补丁
monkey_pacth_json()

# 测试是否替换 
json.dump()
json.load()

'''
一个被替换的 dump函数
一个被替换的 load函数
'''

2.通过monkey中的patch_all方法可以实现,遇到IO阻塞自动切换任务的并发效果

3.必须在打补丁后再使用相应的功能,避免忘记,建议写在最上方

from gevent import monkey  # 通过gevent导入monkey补丁
monkey.patch_all()  # 打补丁
import gevent, time

def task1(name):
    print('%s start'%name)
    time.sleep(2)   # 模拟阻塞
    print('%s end'%name)
    
def task2():
    print('task2 start')
    time.sleep(3)
    print('task2 end')
    
g1 = gevent.spawn(task1, 'king')
g2 = gevent.spawn(task2)

gevent.joinall([g1, g2])
print('over')

3.案例

1.爬虫

from gevent import monkey
monkey.patch_all()
import gevent, requests, time

def get_page(url):
    print('get %s'%url)
    response = requests.get(url)
    if response.status_code == 200:   # 
        print('%d bytes received from %s'%(len(response.text), url))
        
start_time = time.time()

gevent.joinall([
    gevent.spawn(get_page, 'https://www.python.org/'),
    gevent.spawn(get_page,'https://github.com/'),
    gevent.spawn(get_page,'https://baidu.com/'),
])

print(time.time() - start_time)

2.TCP通讯

# 服务器
from gevent import monkey
monkey.patch_all()

#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()

import gevent
from socket import *

def server_task(server_ip, port):
    s = socket.socket()
    s.bind((server_ip, port))
    s.listen()

    while True:
        c, addr = s.accept()
        g = gevent.spawn(talk_task, c)

def talk_task(c):
    while True:
        msg = c.recv(1024).decode('utf-8')
        c.send((msg.upper()).encode('utf-8'))

server_task()   
# 客户端

from socket import *
import os
from threading import Thread, current_thread

c = socket()
c.connect(('127.0.0.1', 8000))

def client_task():
    while True:
        msg = '%s : %s'%(os.getpid(), current_thread().name)

        c.send(msg.encode('utf-8'))
        data = c.recv(1024).decode('utf-8')
        print(data)

for i in range (100):
    t = Thread(target=client_task)
    t.start()
上一篇:协程


下一篇:关于celery在windows下运行报错提示找不到模块的问题