一. 事件循环
1.注:
实现搭配:事件循环+回调(驱动生成器【协程】)+epoll(IO多路复用),asyncio是Python用于解决异步编程的一整套解决方案;
基于asynico:tornado,gevent,twisted(Scrapy,django channels),tornado(实现了web服务器,可以直接部署,真正部署还是要加nginx),django,flask(uwsgi,gunicorn+nginx部署)
import asyncio
import time
async def get_html(url):
print('start get url')
#不能直接使用time.sleep,这是阻塞的函数,如果使用time在并发的情况有多少个就有多少个2秒
await asyncio.sleep(2)
print('end get url')
if __name__=='__main__':
start_time=time.time()
loop=asyncio.get_event_loop()
task=[get_html('www.baidu.com') for i in range(10)]
loop.run_until_complete(asyncio.wait(task))
print(time.time()-start_time)
2.如何获取协程的返回值(和线程池类似):
import asyncio
import time
from functools import partial
async def get_html(url):
print('start get url')
await asyncio.sleep(2)
print('end get url')
return "HAHA"
#需要接收task,如果要接收其他的参数就需要用到partial(偏函数),参数需要放到前面
def callback(url,future):
print(url+' success')
print('send email')
if __name__=='__main__':
loop=asyncio.get_event_loop()
task=loop.create_task(get_html('www.baidu.com'))
#原理还是获取event_loop,然后调用create_task方法,一个线程只有一个loop
# get_future=asyncio.ensure_future(get_html('www.baidu.com'))也可以
#loop.run_until_complete(get_future)
#run_until_complete可以接收future类型,task类型(是future类型的一个子类),也可以接收可迭代类型
task.add_done_callback(partial(callback,'www.baidu.com'))
loop.run_until_complete(task)
print(task.result())
3.wait和gather的区别:
3.1wait简单使用:
import asyncio
import time
from functools import partial
async def get_html(url):
print('start get url')
await asyncio.sleep(2)
print('end get url') if __name__=='__main__':
loop=asyncio.get_event_loop()
tasks=[get_html('www.baidu.com') for i in range(10)]
#wait和线程的wait相似
loop.run_until_complete(asyncio.wait(tasks))
协程的wait和线程的wait相似,也有timeout,return_when(什么时候返回)等参数
3.2gather简单使用:
import asyncio
import time
from functools import partial
async def get_html(url):
print('start get url')
await asyncio.sleep(2)
print('end get url') if __name__=='__main__':
loop=asyncio.get_event_loop()
tasks=[get_html('www.baidu.com') for i in range(10)]
#gather注意加*,这样就会变成参数
loop.run_until_complete(asyncio.gather(*tasks))
3.3gather和wait的区别:(定制性不强时可以优先考虑gather)
gather更加高层,可以将tasks分组;还可以成批的取消任务
import asyncio
import time
from functools import partial
async def get_html(url):
print('start get url')
await asyncio.sleep(2)
print('end get url') if __name__=='__main__':
loop=asyncio.get_event_loop()
groups1=[get_html('www.baidu.com') for i in range(10)]
groups2=[get_html('www.baidu.com') for i in range(10)]
#gather注意加*,这样就会变成参数
loop.run_until_complete(asyncio.gather(*groups1,*groups2))
#这种方式也可以
# groups1 = [get_html('www.baidu.com') for i in range(10)]
# groups2 = [get_html('www.baidu.com') for i in range(10)]
# groups1=asyncio.gather(*groups1)
# groups2=asyncio.gather(*groups2)
#取消任务
# groups2.cancel()
# loop.run_until_complete(asyncio.gather(groups1,groups2))
二. 协程嵌套
1.run_util_complete()源码:和run_forever()区别并不大,只是可以在运行完指定的协程后可以把loop停止掉,而run_forever()不会停止
2.loop会被放在future里面,future又会放在loop中
3.取消future(task):
3.1子协程调用原理:
官网例子:
解释: await相当于yield from,loop运行协程print_sum(),print_sum又会去调用另一个协程compute,run_util_complete会把协程print_sum注册到loop中。
1.event_loop会为print_sum创建一个task,通过驱动task执行print_sum(task首先会进入pending【等待】的状态);
2.print_sum直接进入字协程的调度,这个时候转向执行另一个协程(compute,所以print_sum变为suspended【暂停】状态);
3.compute这个协程首先打印,然后去调用asyncio的sleep(此时compute进入suspende的状态【暂停】),直接把返回值返回给Task(没有经过print_sum,相当于yield from,直接在调用方和子生成器通信,是由委托方print_sum建立的通道);
4.Task会告诉Event_loop暂停,Event_loop等待一秒后,通过Task唤醒(越过print_sum和compute建立一个通道);
5.compute继续执行,变为状态done【执行完成】,然后抛一个StopIteration的异常,会被await语句捕捉到,然后提取出1+2=3的值,进入print_sum,print_sum也被激活(因为抛出了StopIteration的异常被print_sum捕捉),print_sum执行完也会被标记为done的状态,同时抛出StopIteration会被Task接收
三. call_soon、call_later、call_at、call_soon_threadsafe
1.call_soon:可以直接接收函数,而不用协程
import asyncio
#函数
def callback(sleep_time):
print('sleep {} success'.format(sleep_time))
#通过该函数暂停
def stoploop(loop):
loop.stop()
if __name__=='__main__':
loop=asyncio.get_event_loop()
#可以直接传递函数,而不用协程,call_soon其实就是调用的call_later,时间为0秒
loop.call_soon(callback,2)
loop.call_soon(stoploop,loop)
#不能用run_util_complete(因为不是协程),run_forever找到call_soon一直运行
loop.run_forever()
2.call_later:可以指定多长时间后启动(实际调用call_at,时间不是传统的时间,而是loop内部的时间)
import asyncio
#函数
def callback(sleep_time):
print('sleep {} success'.format(sleep_time))
#通过该函数暂停
def stoploop(loop):
loop.stop()
if __name__=='__main__':
loop=asyncio.get_event_loop()
loop.call_later(3,callback,1)
loop.call_later(1, callback, 2)
loop.call_later(1, callback, 2)
loop.call_later(1, callback, 2)
loop.call_soon(callback,4)
# loop.call_soon(stoploop,loop)
#不能用run_util_complete(因为不是协程),run_forever找到call_soon一直运行
loop.run_forever()
3.call_at:指定某个时间执行
import asyncio
#函数
def callback(sleep_time):
print('sleep {} success'.format(sleep_time))
#通过该函数暂停
def stoploop(loop):
loop.stop()
if __name__=='__main__':
loop=asyncio.get_event_loop()
now=loop.time()
print(now)
loop.call_at(now+3,callback,1)
loop.call_at(now+1, callback, 0.5)
loop.call_at(now+1, callback, 2)
loop.call_at(now+1, callback, 2)
# loop.call_soon(stoploop,loop)
#不能用run_util_complete(因为不是协程),run_forever找到call_soon一直运行
loop.run_forever()
4.call_soon_threadsafe:
线程安全的方法,不仅能解决协程,也能解决线程,进程,和call_soon几乎一致,多了self._write_to_self(),和call_soon用法一致
四. ThreadPoolExecutor+asyncio(线程池和协程结合)
1.使用run_in_executor:就是把阻塞的代码放进线程池运行,性能并不是特别高,和多线程差不多
#使用多线程,在协程中集成阻塞io
import asyncio
import socket
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor
import time
def get_url(url):
#通过socket请求html
url=urlparse(url)
host=url.netloc
path=url.path
if path=="":
path="/"
#建立socket连接
client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect((host,80))
#向服务器发送数据
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
#将数据读取完
data=b""
while True:
d=client.recv(1024)
if d:
data+=d
else:
break
#会将header信息作为返回字符串
data=data.decode('utf8')
print(data.split('\r\n\r\n')[1])
client.close() if __name__=='__main__':
start_time=time.time()
loop=asyncio.get_event_loop()
excutor=ThreadPoolExecutor()
tasks=[]
for i in range(20):
task=loop.run_in_executor(excutor,get_url,'http://www.baidu.com')
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print(time.time()-start_time)
五. asyncio模拟http请求
注:asyncio目前没有提供http协议的接口
# asyncio目前没有提供http协议的接口
import asyncio
from urllib.parse import urlparse
import time async def get_url(url):
# 通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
# 建立socket连接(比较耗时),非阻塞需要注册,都在open_connection中实现了
reader, writer = await asyncio.open_connection(host, 80)
# 向服务器发送数据,unregister和register都实现了
writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
# 读取数据
all_lines = []
# 源码实现较复杂,有__anext__的魔法函数(协程)
async for line in reader:
data = line.decode('utf8')
all_lines.append(data)
html = '\n'.join(all_lines)
return html async def main():
tasks = []
for i in range(20):
url = "http://www.baidu.com/"
tasks.append(asyncio.ensure_future(get_url(url)))
for task in asyncio.as_completed(tasks):
result = await task
print(result) if __name__ == '__main__':
start_time = time.time()
loop = asyncio.get_event_loop()
# tasks=[get_url('http://www.baidu.com') for i in range(10)]
# 在外部获取结果,保存为future对象
# tasks = [asyncio.ensure_future(get_url('http://www.baidu.com')) for i in range(10)]
# loop.run_until_complete(asyncio.wait(tasks))
# for task in tasks:
# print(task.result())
# 执行完一个打印一个
loop.run_until_complete(main())
print(time.time() - start_time)
六. future和task
1.future:协程中的future和线程池中的future相似
future中的方法,都和线程池中的相似
set_result方法
不像线程池中运行完直接运行代码(这是单线程的,会调用call_soon方法)
2.task:是future的子类,是future和协程之间的桥梁
会首先启动_step方法
该方法会首先启动协程,把返回值(StopIteration的值)做处理,用于解决协程和线程不一致的地方
七. asyncio同步和通信
1.单线程协程不需要锁:
import asyncio
total=0
async def add():
global total
for i in range(1000000):
total+=1 async def decs():
global total
for i in range(1000000):
total-=1
if __name__=='__main__':
loop=asyncio.get_event_loop()
tasks=[add(),decs()]
loop.run_until_complete(asyncio.wait(tasks))
print(total)
2.某种情况需要锁:
asyncio中的锁(同步机制)
import asyncio,aiohttp
#这是并没有调用系统的锁,只是简单的自己实现(注意是非阻塞的),Queue也是非阻塞的,都用了yield from,不用用到condition【单线程】】
#Queue还可以限流,如果只需要通信还可以直接使用全局变量否则可以
from asyncio import Lock,Queue
catche={}
lock=Lock()
async def get_stuff():
#实现了__enter__和__exit__两个魔法函数,可以用with
# with await lock:
#更明确的语法__aenter__和__await__
async with lock:
#注意加await,是一个协程
#await lock.acquire()
for url in catche:
return catche[url]
#异步的接收
stauff=aiohttp.request('Get',url)
catche[url]=stauff
return stauff
#release是一个简单的函数
#lock.release() async def parse_stuff():
stuff=await get_stuff() async def use_stuff():
stuff=await get_stuff()
#如果没有同步机制,就会发起两次请求(这里就可以加一个同步机制)
tasks=[parse_stuff(),use_stuff()]
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
八. aiohttp实现高并发爬虫
# asyncio去重url,入库(异步的驱动aiomysql)
import aiohttp
import asyncio
import re
import aiomysql
from pyquery import pyquery start_url = 'http://www.jobbole.com/'
waiting_urls = []
seen_urls = []
stopping = False
#限制并发数
sem=asyncio.Semaphore(3) async def fetch(url, session):
async with sem:
await asyncio.sleep(1)
try:
async with session.get(url) as resp:
print('url_status:{}'.format(resp.status))
if resp.status in [200, 201]:
data = await resp.text()
return data
except Exception as e:
print(e) def extract_urls(html):
'''
解析无io操作
'''
urls = []
pq = pyquery(html)
for link in pq.items('a'):
url = link.attr('href')
if url and url.startwith('http') and url not in urls:
urls.append(url)
waiting_urls.append(url)
return urls async def init_urls(url, session):
html = await fetch(url, session)
seen_urls.add(url)
extract_urls(html) async def handle_article(url, session, pool):
'''
处理文章
'''
html = await fetch(url, session)
seen_urls.append(url)
extract_urls(html)
pq = pyquery(html)
title = pq('title').text()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
insert_sql = "insert into Test(title) values('{}')".format(title)
await cur.execute(insert_sql) async def consumer(pool):
with aiohttp.CLientSession() as session:
while not stopping:
if len(waiting_urls) == 0:
await asyncio.sleep(0.5)
continue
url = waiting_urls.pop()
print('start url:' + 'url')
if re.match('http://.*?jobble.com/\d+/', url):
if url not in seen_urls:
asyncio.ensure_future(handle_article(url, session, pool))
await asyncio.sleep(30)
else:
if url not in seen_urls:
asyncio.ensure_future(init_urls(url, session)) async def main():
# 等待mysql连接好
pool = aiomysql.connect(host='localhost', port=3306, user='root',
password='', db='my_aio', loop=loop, charset='utf8', autocommit=True)
async with aiohttp.CLientSession() as session:
html = await fetch(start_url, session)
seen_urls.add(start_url)
extract_urls(html)
asyncio.ensure_future(consumer(pool)) if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(loop)
loop.run_forever(main(loop))