Python_学习之多协程
一、yield和yield from区别
二、gevent构建多协程
三、asyncio构建多协程
1、名词简介
2、常用方法(api)
Loop
await
Task
Future
3、asyncio 通过yield from构建多任务协程
4、asyncio通过async和await【官方推荐】
5、asyncio.run() 构建循环事件【官方推荐】
6、实例操作
6.1、批量异步处理类型相同的数据?
6.2、希望不等待所有完成了才对结果处理,而是消费了就对结果进行处理?
6.3、动态的异步消费,比如动态接收mq或redis插入的数据?
6.4、如何将线程池或进程池与协程一起用,且让不支持协程的模块支持?
6.5. 在flask中应用
6.6、代替requests实现的aiohttp支持异步
前面已经分布介绍了Python中经常用的多线程和多线程,有兴趣可以参考一下,希望能给你一点帮助,本文主要记录下Python中的多线程用法。
协程又称微线程,是一种用户态的上下文切换技术,由程序(用户)自身控制代码与代码之间的切换,效率高,安全,无锁机制。
Python中实现的方式主要以下几种:
yield,yield from 生成器
greenlet为第三方模块【没有解决遇IO自动切换,只是阻塞】,生产中主要用gevent模块来实现
asyncio,Python3.4引入的模块【主流】
async&awiat ,Python3.5中引入的两个关键字,结合asynio模块使用【主流】
一、yield和yield from区别
def study_yield1(items):
"""一次全部返回个列表"""
yield items
def study_yield2(items):
"""一个一个返回,yield from 等价于 for i in items: yield i"""
yield from items
item = ["I", "Like", "Python"]
for i in study_yield1(item):
print(i) # ['I', 'Like', 'Python']
for j in study_yield2(item):
print(j) # 'I', 'Like', 'Python'
# 函数中有yield关键字,即是生成器函数,遇到yield关键字就返回后面的值,同时记录状态,下次调用从该状态处执行,而不是重新开始
# yield 直接返回一个列表,而yield from 是一个一个返回,本质将后面可迭代对象转成生成器了,因此后面可接生成器函数
# yield from 内部解决了很多异常
二、gevent构建多协程
官方文档:http://www.gevent.org/index.html
安装 pip install gevent
# 多任务
def asynchronous_func(func_name, job_list):
"""多少个任务,就开启多少个协程处理"""
g_list = [gevent.spawn(func_name, job) for job in job_list]
gevent.joinall(g_list)
return [g.value for g in g_list]
三、asyncio构建多协程
官方文档:https://docs.python.org/zh-cn/3.8/library/asyncio.html
借用廖雪峰老师的话,asyncio的编程模型就是一个消息循环,我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。
Python3.4引入asyncio,通过装饰器@asyncio.coroutine标识函数是一个协程,使用yield from来驱动即遇IO切换到另一个任务。
1、名词简介
异步IO:发起一个IO操作,因其耗时,不用等其结束,可以做其他的事情,结束时会发来通知告知。
事件循环loop:管理所有的事件【任务】,在整个程序运行过程中不断循环执行并追踪事件发生的顺序将它们放到队列中,空闲时,调用相应的事件处理者来处理这些事件。
任务对象Task:是Future的子类,作用是将一个协程打包成一个task对象,为这个协程自动排一个日程准备立即执行,并追踪其状态。
结果对象Future:表示一个异步运算的最终结果的处理,asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。
可等待对象:如果一个对象可以在await语句中使用,那么他就是可等待对象。主要有三种类型:协程、任务Task、Future
2、常用方法(api)
Loop
关于循环的文章可参考:https://mp.weixin.qq.com/s/fCWQAT-O27mbi8UvKIrjWw
loop = asyncio.get_event_loop()
获取一个标准事件循环loop对象,所有协程都是通过它来循环作业的,可以把它理解为一个队列
loop.run_until_complete(future_obj)
阻塞调用,入参为Future对象,作用是运行所有的协程,直到所有的协程都处理完了返回结果或异常才结束。
loop.close()
关闭事件循环,清除所有队列并立即关闭执行器,不会等没有完成的任务完成,幂等【相同的参数执行相同的函数结果必须一样】不可逆。
await
asyncio.wait(可等待对象awaitable,timeout=None)
内部将我们传入的任务封装成task对象,返回值为元祖,一个为完成的done列表,一个为还未完成的pending列表,如果设置timeout,在规定时间内,没返回的都放到未完成列表中,协程返回的结果顺序是无序的,完成的结果调用d.result()方法获取任务的返回值
asyncio.gather(可等待对象)
功能通asyncio.wait(),但返回的结果顺序,是按放入任务的顺序,有序的
asyncio.sleep(秒数,result="默认返回的结果")
模拟IO操作,这种休眠不会阻塞事件循环,前面加上await后将控制权交给主事件循环,不能用time.sleep(),因其会释放GIL,从而阻塞整个主线程,继而阻塞整个事件循环。
Task
Task:是Future的子类,作用是将一个协程打包成一个task对象,为这个协程自动排一个日程准备立即执行【白话就是将多个协程任务,排一个时间表自动并发的执行(遇到io就自动切另一个任务)】。
底层接口:loop.create_task()
asyncio.create_task(协程)
将一个协程打包为一个task,排入日程准备执行,返回task对象,Python3.7加入的,3.7之前通过asyncio.ensure_future(协程)实现
Future
Future:表示一个异步运算的最终结果,是一个awaitable对象,协程可以等待 Future 对象直到它们有结果或异常集合或被取消。在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。
asyncio.create_future(协程)
3、asyncio 通过yield from构建多任务协程
Python3.4引入asyncio,通过装饰器@asyncio.coroutine标识函数是一个协程,使用yield from来驱动即遇IO切换到另一个任务。
# 最常见用法
import time
import asyncio
@asyncio.coroutine
def task(n):
print(f"in {task.__name__}:{n} start")
yield from asyncio.sleep(n) # 模拟IO时间
print(f"in {task.__name__}:{n} end")
return f"01_{n}"
@asyncio.coroutine
def main():
# 构建任务集合
tasks = [
asyncio.ensure_future(task(1)),
asyncio.ensure_future(task(2)),
]
#或者如下也行,加入asyncio.wait() 会自动将协程转成task对象
#tasks = [task(1), task(2)]
print(tasks)
done, pending = yield from asyncio.wait(tasks,timeout=None)
# 返回两个列表,done是完成的任务返回的结果列表,pending是未完成的列表,调用result()得到返回值
# timeout默认为None,表示一直等任务都完成,如果设置了时间,则规定时间没有返回则放入未完成列表中
# 完成列表每个元素调用result()方法就可以得到对应任务的结果
task_result = [d.result() for d in done]
return task_result
for d in done:
print(f"协程任务结果为:{d.result()}")
if __name__ == '__main__':
start = time.time()
# 创建主线程的事件循环对象
loop = asyncio.get_event_loop()
# 装载任务
result = loop.run_until_complete(main())
# 关闭循环
loop.close()
print(f"任务结果为:{result}")
print(f"总耗时:{time.time()-start}")
"""结果为:
[<Task pending name='Task-2' coro=<task() running at ...>>, <Task pending name='Task-3' coro=<task() running at...>>]
def main():
in task:1 start
in task:2 start
in task:1 end
in task:2 end
协程任务结果为:01_2
协程任务结果为:01_1
任务结果为:['01_2', '01_1']
总耗时:2.002303123474121
"""
4、asyncio通过async和await【官方推荐】
python3.5后官方推荐为了区分生成器和协程,其实就是将@asyncio.coroutine 换成了 async,yield from 换成了 await
import asyncio
async def task01(n):
print(1)
await asyncio.sleep(n)
print(2)
return n
async def task02(n):
print(3)
await asyncio.sleep(n)
print(4)
return n
if __name__ == '__main__':
tasks = [task01(3), task02(2)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
5、asyncio.run() 构建循环事件【官方推荐】
python3.7才有run方法
"""
asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数
此函数运行传入的协程,负责管理 asyncio 事件循环并 完结异步生成器。
当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。
如果 debug 为 True,事件循环将以调试模式运行。
此函数---->总是会创建一个新的事件循环并在结束时关闭之<-----。
它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
"""
import asyncio
async def task(m):
print(f"start{m}")
await asyncio.sleep(m) # IO耗时,通过await 挂起当前协程,事件循环去执行其他的协程,等IO耗时结束了,再继续。
print(f"end{m}")
return m
async def main1(): # 入口
# 创建多个任务
task1 = asyncio.create_task(task(3))
task2 = asyncio.create_task(task(2))
ret1 = await task1
ret2 = await task2
ret = asyncio.run(main1())
# 写法2:
async def main2():
tasks = [task(3), task(2)]
done, p = await asyncio.wait(tasks, timeout=None)
return [d.result() for d in done]
ret = asyncio.run(main2())
# 写法3:
tasks = [task(3), task(2)]
done,p = asyncio.run(asyncio.wait(tasks))
ret = [d.result() for d in done]
# 不允许这样写tasks = [asyncio.create_task(task(3)), asyncio.create_task(task(2))]
"""
因为这时候还没有loop,看create_task源码
def create_task(coro, *, name=None):
"""Schedule the execution of a coroutine object in a spawn task.
Return a Task object.
"""
loop = events.get_running_loop()
task = loop.create_task(coro)
_set_task_name(task, name)
return task
"""
注:通过asyncio模块执行异步,所有的流程第一步是先有事件循环,然后才能将可等待对象封装成task对象,放入事件循环,才能实现异步,决不允许先建task对象,然后建loop,这样会报错
6、实例操作
6.1、批量异步处理类型相同的数据?
import asyncio
async def task(data):
"""消费消息,解析消息,处理消息,返回处理结果成功失败"""
# 假设data的格式为{"order_id":订单号,"num":数量}
await asyncio.sleep(2)
if data["num"]:
return {"order_id": data["order_id"], "check": 1}
else:
return {"order_id": data["order_id"], "check": 0}
if __name__ == '__main__':
import time
import random
start_time = time.time()
messages = [{"order_id": str(o), "num": random.choice([0, 1])} for o in range(835001, 835501)]
jobs = [task(d) for d in messages]
done, pending = asyncio.run(asyncio.wait(jobs))
print(f"消费结果为:{[r.result() for r in done]}")
print(f"总耗时:{time.time()-start_time}")
6.2、希望不等待所有完成了才对结果处理,而是消费了就对结果进行处理?
import asyncio
success_count = 0
fail_count = 0
async def task(data):
"""消费消息,解析消息,处理消息,返回处理结果成功失败"""
# 假设data的格式为{"order_id":订单号,"num":数量}
print(f"开始执行:{data}")
await asyncio.sleep(2)
if data["num"]:
res = {"order_id": data["order_id"], "check": 1}
else:
res = {"order_id": data["order_id"], "check": 0}
print(f"执行完成:{data}")
return res
def my_call_back(future):
"""对消费的消息结果进行实时处理,比如统计成功率,用于实时可视化展示"""
time.sleep(1)
result = future.result()
global success_count, fail_count
if result["check"] == 1:
# 可以是操作缓存
success_count += 1
else:
fail_count += 1
print(f"{result['order_id']}现在成功数:{success_count}")
print(f"{result['order_id']}现在失败数:{fail_count}")
if __name__ == '__main__':
import time
import random
start_time = time.time()
messages = [{"order_id": str(o), "num": random.choice([0, 1])} for o in range(835001, 835006)]
jobs = []
loop = asyncio.get_event_loop()
for m in messages:
job = loop.create_task(task(m))
job.add_done_callback(my_call_back) # 加入回调函数
jobs.append(job)
loop.run_until_complete(asyncio.wait(jobs))
loop.close()
print(f"总耗时:{time.time() - start_time}")
print(f"最终成功数:{success_count}")
print(f"最终失败数:{fail_count}")
# 注意:回调函数不能是协程,不能是协程,不能是协程!!!
6.3、动态的异步消费,比如动态接收mq或redis插入的数据?
# 生产者代码忽略
# 消费者
"""
解决方案是:创建一个线程,用于让事件循环永远执行
"""
import asyncio
import threading
def always_run_loop(event_loop):
asyncio.set_event_loop(event_loop)
event_loop.run_forever()
def get_redis():
"""获取连接"""
import redis
conn_pool = redis.ConnectionPool(host="127.0.0.1", port=6379, max_connections=10)
return redis.Redis(connection_pool=conn_pool)
async def call_back_task(data):
"""消费数据"""
print(data)
await asyncio.sleep(2)
return data
if __name__ == '__main__':
redis_pool = get_redis()
loop = asyncio.new_event_loop()
loop_th = threading.Thread(target=always_run_loop, args=(loop,))
loop_th.setDaemon(True)
loop_th.start()
while True:
message = redis_pool.rpop("check")
if message:
# 异步动态添加到协程中
asyncio.run_coroutine_threadsafe(call_back_task(message), loop)
6.4、如何将线程池或进程池与协程一起用,且让不支持协程的模块支持?
import asyncio
import requests
async def download_image(url):
# 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
print("开始下载:", url)
loop = asyncio.get_event_loop()
# requests模块默认不支持异步操作,所以就使用线程池来配合实现了。
future = loop.run_in_executor(None, requests.get, url)
response = await future
print('下载完成')
# 图片保存到本地文件
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://img.yituyu.com/gallery/1110/01_PnhbzecG.jpg',
'https://img.yituyu.com/gallery/1110/02_rWAsk0kY.JPG',
'https://img.yituyu.com/gallery/1114/00_8Q85y28B.jpg'
]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
6.5. 在flask中应用
import asyncio
from flask import Flask
app = Flask(__name__)
async def first():
await asyncio.sleep(20)
return 'first'
async def second():
await asyncio.sleep(10)
return 'second'
async def third():
await asyncio.sleep(10)
return 'third'
def ordinary_generator():
import sys
if not sys.platform.startswith("win"):
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for future in asyncio.as_completed([first(), second(), third()]):
print('reached')
yield loop.run_until_complete(future)
@app.route('/')
def healthcheck():
"""
Retrieves the health of the service.
"""
import time
time_s = time.time()
for element in ordinary_generator():
print(element)
print(f"{time.time() - time_s}")
return "Health check passed"
if __name__ == '__main__':
app.run(debug=True)
6.6、代替requests实现的aiohttp支持异步
pip install aiohttp
官方文档:https://docs.aiohttp.org/en/stable/
import asyncio
import aiohttp
# 客户端使用
async def get_page(session, url):
async with session.get(url) as response:
if response.status == 200:
text = await response.content.read()
with open(f"图片-{url.split('/')[-1]}", "wb") as f:
f.write(text)
f.flush()
async def my_main():
async with aiohttp.ClientSession() as session:
urls = [
"http://pic1.win4000.com/wallpaper/4/53ec50e410310.jpg",
"http://pic1.win4000.com/m00/a5/d1/8ab24d2d749ad08fe2b99830d5b30065.jpg",
"http://pic1.win4000.com/m00/f8/40/a0f4ea98e5b518c410b189a36704f459.jpg"
]
tasks = [asyncio.create_task(get_page(session, url)) for url in urls]
await asyncio.wait(tasks)
asyncio.run(my_main())
# 服务端
from aiohttp import web
async def health_check(request):
print(f"version:请求HTTP版本->{request.version}")
print(f"method:请求HTTP方法->{request.method}")
print(f"scheme:是http还是https->{request.scheme}")
print(f"secure:是否是https,返回bool->{request.secure}")
print(f"host:服务器地址->{request.host}")
print(f"remote:请求来源的地址->{request.remote}")
print(f"url:请求url全路径->{request.url}")
print(f"rel_url:请求url相对路径,无host->{request.rel_url}")
print(f"path_qs:包含路径及参数->{request.path_qs}")
print(f"path:url解码过的->{request.path}")
print(f"raw_path:url未解码前的信息->{request.raw_path}")
print(f"query:获取get请求url中的参数->{request.query}")
print(f"query_string:原始请求数据->{request.query_string}")
print(f"headers:获取header头部信息->{request.headers}")
print(f"content_type:获取请求消息的格式->{request.content_type}")
print(f"keep_alive:是否保持长链接->{request.keep_alive}")
print(f"cookies:{request.cookies}")
print(f"content:{request.content}")
print(f"match_info:{request.match_info}->路由解析结果")
return web.Response(text="aiohttp server is ok")
async def save_db(request):
data = await request.post()
print(f"获取post请求体:{data}")
return web.json_response({"key": 1111})
# 创建应用实例
app = web.Application()
# 注册路由
app.add_routes(
[
web.get('/', health_check),
web.get('/{version}', health_check),
web.post('/send', save_db)
]
)
if __name__ == '__main__':
web.run_app(app, host="127.0.0.88", port=9527)