day35 python socket 单线程的并发 io多路复用 协程 greenlet模块 gevent模块
一.socket相关
1.socket: 所有的网络请求都是基于socket实现的,默认是阻塞的
2.requests是用的socket的客户端
3.socket到底哪端(谁)发生了变化
如果客户端向服务端发起连接时, 则服务端发生了变化
如果服务端向客户端发送数据时, 则客户端发生了变化
conn,addr = server.accept()
conn.recv()
4.如果想要提高并发(目前有以下两种)
多进程: 计算密集型
多线程: io密集型,如socket请求
二.单线程的并发
需求: 模拟浏览器发送请求: 向百度发送请求搜索三个关键词
1.单线程(串行):
使用requests模块
import requests
key_list = ['bajie','wukong','datang']
for item in key_list:
ret = requests.get(
url="https://www.baidu.com/s?wd=%s" % (item,),
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0"}
)
print(ret.text)
2.单线程(串行):
用socket解析requests的原理
import socket
def task(key):
client = socket.socket()
client.connect(('www.baidu.com',80)) #阻塞: 和百度创建连接
client.sendall(b'GET /s?wd=wukong HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n')
#告诉百度我的请求, 用的是http协议(数据格式)
data_list = []
while 1:
data = client.recv(4096) #等着接收百度的回复
if not data:
break
data_list.append(data)
body = b''.join(data_list)
print(body)
key_list = ['bajie','wukong','datang'] ##
for item in key_list:
task(item)
3.多线程(并发)
import socket
import threading
def task(key):
client = socket.socket()
client.connect(('www.baidu.com',80)) #阻塞: 和百度创建连接
client.sendall('GET /s?wd=wukong HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.encode('utf-8'))
#告诉百度我的请求, 用的是http协议(数据格式)
data_list = []
while 1:
data = client.recv(4096) #等着接收百度的回复
if not data:
break
data_list.append(data)
body = b''.join(data_list)
print(body)
key_list = ['bajie','wukong','datang'] ##
for item in key_list:
t = threading.Thread(target=task,args=(item,))
t.start()
4.多线程并发时可能出现的问题
4.1.问题:
假设连接非常慢, 三个人就傻傻地等着,不能做其他事
4.2.怎么解决?
如果发现是io等待, 那么我就先把请求发出去, 然后去干其他事.
又假设连接请求是一个一个回来的, 那么等这个io请求回来了, 我再处理, 就实现了单线程的并发
4.3.单线程的并发本质是什么?
单线程的io操作不等待
4.4.单线程的并发如何实现?
需解决: socket如何让io不等待
需解决: 如何知道结果回来了
5.单线程(并发)
5.1.socket如何让io不等待: 把阻塞变成不阻塞
client.setblocking(False)
import socket
def task(key):
client = socket.socket()
client.setblocking(False) #socket默认是阻塞的: 那么我可以给它设置成不是阻塞的; 但是客户端再去连接的时候会报错
try:
client.connect(('www.baidu.com',80)) #加个异常处理: 上面设置了原来阻塞的位置不再阻塞, 这句执行了, 但是报错
except BlockingIOError as e:
pass
#此时我这里需要有个检测机制, 检测到连接成功, 才能继续下面的操作
client.sendall('GET /s?wd=wukong HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.encode('utf-8'))
data_list = []
while 1:
data = client.recv(4096)
if not data:
break
data_list.append(data)
body = b''.join(data_list)
print(body)
key_list = ['bajie','wukong','datang'] ##
for item in key_list:
task(item)
5.2.如何知道结果回来了: 使用io多路复用
io多路复用的应用:
工作的时候没用过, 但是很多的东西都是基于这个做的
io多路复用和socket的非阻塞,实现了单线程的并发
作用:
检测多个socket是否已经发生变化(是否已经连接成功/是否已经获取数据)(rlist/wlist)
语法:
select.select(rlist=[], #检测列表发生变化则返回(返回数据了吗?)
wlist=[], #检测列表发生变化则返回(连接成功了吗?)
xlist=[], #检测列表发生变化则返回(有异常吗?)
timeout=0.005, #检测的时间间隔
)
import socket
import select
client1 = socket.socket() #创建了一个socket
client1.setblocking(False)
try:
client1.connect(('www.baidu.com',80))
except BlockingIOError as e:
pass
client2 = socket.socket() #socket的非阻塞.立即又创建了一个socket
client2.setblocking(False)
try:
client2.connect(('www.sogou.com',80))
except BlockingIOError as e:
pass
client3 = socket.socket() #socket的非阻塞.立即又创建了一个socket
client3.setblocking(False)
try:
client3.connect(('www.oldboyedu.com',80))
except BlockingIOError as e:
pass
dat_list = [client1,client2,client3]
conn_list = [client1,client2,client3]
while True: #不停地去检测socket的状态
rlist,wlist,xlist = select.select(dat_list,conn_list,[],0.005)
#每 0.005秒 去检测socket的状态: 连接成功了吗?返回数据了吗?有异常吗?
for sk in wlist: #处理连接成功的socket
if sk == client1:
sk.sendall('GET /s?wd=wukong HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.encode('utf-8'))
elif sk == client2:
sk.sendall('GET /web?query=bajie HTTP/1.0\r\nHost: www.sogou.com\r\n\r\n'.encode('utf-8'))
else:
sk.sendall('GET /liaojie/index.html HTTP/1.0\r\nHost: www.oldboyedu.com\r\n\r\n'.encode('utf-8'))
conn_list.remove(sk) #处理完连接, 把sk从连接列表里剔除掉, 不用在检测
for sk in rlist: #处理有数据回来的socket
data_list = []
while True:
try:
data = sk.recv(8096) #因为上面的setblocking(False), 所有阻塞的地方都不阻塞, 所以这里应报错
data_list.append(data)
except BlockingIOError as e:
break
body = b''.join(data_list)
print(body)
sk.close() #接收完数据的sk,断开socket连接
dat_list.remove(sk) #接收完数据的sk,把sk从数据检测的列表里剔除掉, 不用在检测
if not dat_list: #都下载完数据了, 就停止检测
break
5.3.总结
基于io多路复用和socket实现并发请求
比多线程节省资源: 通过一个线程做20个线程做的事情
三.基于事件循环的异步非阻塞的框架
非阻塞: 不等待
异步: 指的是一个回调的过程,前面的某个事情完成了,自动执行一个'回调'函数; 上面的例子异步还没体现出来
事件循环: 循环地去做某个事
基于事件循环的异步非阻塞框架: Twisted
已经实现上述功能的模块, python中开源的模块
四.自定义异步非阻塞模块: 实现单线程并发
单线程并发的高级版:
和上面代码的原理一样, 只是封装好了
而且体现了异步的特性: 下载完数据就自动执行我给它的函数
import socket
import select
class Foo(object):
def __init__(self,sk,func):
self.sk = sk
self.func = func
def fileno(self):
return self.sk.fileno()
class Bajie(object):
def __init__(self): #初始化,搞两个空检测列表
self.conn_list = []
self.socket_list = []
self.info_list = []
def add(self,domain,func): #创建socket, 加入到检测列表中
client = socket.socket()
client.setblocking(False)
try:
client.connect((domain, 80))
except BlockingIOError as e:
pass
obj = Foo(client,func)
self.socket_list.append(obj)
self.conn_list.append(obj)
self.info_list.append(obj)
def run(self):
while True: #不停地去检测fileno()的状态
rlist, wlist, xlist = select.select(self.socket_list, self.conn_list, [],0.005)
#每0.005秒去检测fileno()的状态: 连接成功了吗?返回数据了吗?有异常吗?
for obj in wlist: #处理连接成功的obj
if obj == self.info_list[0]:
obj.sk.sendall('GET /s?wd=wukong HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.encode('utf-8'))
elif obj == self.info_list[1]:
obj.sk.sendall('GET /web?query=bajie HTTP/1.0\r\nHost: www.sogou.com\r\n\r\n'.encode('utf-8'))
else:
obj.sk.sendall('GET /liaojie/index.html HTTP/1.0\r\nHost: www.oldboyedu.com\r\n\r\n'.encode('utf-8'))
self.conn_list.remove(obj) #处理完连接, 把obj从连接列表里剔除掉, 不用在检测
for obj in rlist: #处理有数据回来的obj
data_list = []
while True:
try:
data = obj.sk.recv(8096) #因为上面的setblocking(False), 所有阻塞的地方都不阻塞, 所以这里应报错
data_list.append(data)
except BlockingIOError as e:
break
body = b''.join(data_list)
# print(body) #下载完不打印了, 而去执行他们各自的函数, (回调就是这样搞)
obj.func(body)
obj.sk.close() #接收完数据的obj,断开socket连接
self.socket_list.remove(obj) #接收完数据的obj,把obj从数据检测的列表里剔除掉, 不用在检测
if not self.socket_list: #都下载完数据了, 就停止检测
break
def baidu_func(body):
print('www.baidu.com--->',body)
def sogou_func(body):
print('www.sogou.com--->', body)
def old_func(body):
print('www.oldboyedu.com--->', body)
t1 = Bajie()
t1.add('www.baidu.com',baidu_func) #异步的体现: 当执行完前面的动作, 自动执行baidu_func函数
t1.add('www.sogou.com',sogou_func)
t1.add('www.oldboyedu.com',old_func)
t1.run()
五.总结:
1.socket默认是阻塞的, 体现在连接和接收数据, 但是可以使用client.setblocking(False)变成非阻塞
2.io多路复用的作用:
检测多个socket是否发生变化
使用的select是调用的操作系统的功能
操作系统检测socket是否发生变化的三种模式:
select:原始的, 对检测的socket的个数有限制: 1024 个, 内部实现时用的是循环检测 :select.select()
poll:对检测的socket的个数不再限制: 内部实现时还是循环检测(水平触发)
epoll:对检测的socket的个数不再限制: 内部实现时不再是检测, 而是等着socket报告给我(边缘触发) :select.epoll()
windows只支持select.select()
linux是都支持的, epoll会比select用法上复杂一点点
3.以后完成某件事提高并发的方案:
多进程
多线程
异步非阻塞模块:Twisted, scrapy框架(单线程完成并发)
会有问题: 如果发100个请求,倒是可以一起发请求出去, 如果100个是 同时回来的, 这种一个线程就完蛋了
怎么办:
控制请求的数量
使用生产者消费者模型(比如单线程并发下载,下载的数据放到队列,多进程处理数据)
4.什么是异步非阻塞(面试题)?
非阻塞:不等待, 比如创建一个socket进行连接某个地址, 或者接收数据时, 默认都是阻塞的(等待连接成功,或接受到数据), 才执行后续的操作,
如果设置成setblocking(False)非阻塞, 以上两个过程就不再等待, 但是会报BlockingIOError错误, 这个错误只要捕获即可
异步:通知和回调, 当我们完成某件事时,自动调用回调函数或自动执行某些操作, 比如做爬虫向某个地址发送请求, 当请求完成之后, 自动执行指定的回调函数(就是通知)
5.什么是同步阻塞?
阻塞: 等
同步: 按顺序一个一个执行, 比如for循环
6.io多路复用和性能有关系吗?
只是检测, 要看你和谁配合
7.需要懵懂的一个概念
封装: 当别人不改变调用代码的时候, 但是还要有这个东西, 那么你就给它封装进去
六.协程(本质遇到io就切换)
1.什么是协程:
线程: 操作系统中存在的
进程: 操作系统中存在的
协程: 不是一个真实存在的东西, 和线程和进程不同, 是由程序员创造的
协程: 可以认为是微线程, 对线程进行分片, 使线程可以在代码块之间进行来回切换执行,而不是原来的逐行执行
2.如何让两个代码块可以来回切换执行?
2.1.比如下面的两个函数, 让程序员来控制, 先执行11,再执行33,回去执行22,再执行44
def f1():
print(11)
print(22)
def f2():
print(33)
print(44)
f1()
f2()
2.2.使用greenlet模块
装包: pip3 install greenlet
import greenlet
def f1():
print(11)
gr2.switch()
print(22)
gr2.switch()
def f2():
print(33)
gr1.switch()
print(44)
gr1 = greenlet.greenlet(f1) #创建了两个协程gr1,gr2
gr2 = greenlet.greenlet(f2)
gr1.switch() #执行协程gr1
3.协程存在的意义?
协程: 人为进行进程代码切换
单纯的协程是没有任何用处的, 可能会让性能更低
用作通过单线程实现并发: 单纯的它自己无法实现
需要额外 + 一段代码配合(代码里面一旦遇到io操作, 就通过协程切换出去执行其他的代码)
即协程 + 遇到io就切换
4.gevent模块
装包: pip3 install gevent
内部封装了 greenlet + 遇到io就切换
from gevent import monkey
monkey.patch_all() #必须要有; 以后代码中遇到IO操作, 内部会自动执行greenlet的switch进行切换 (patch 补丁)
import requests
import gevent
def get_page(url):
rst = requests.get(url)
print(url, rst.content)
gevent.joinall(
[
gevent.spawn(get_page,'https://www.python.org/'), #协程1 (spawn: 产卵)
gevent.spawn(get_page,'https://www.yahoo.com/'), #协程2
gevent.spawn(get_page,'https://github.com/') #协程3
]
)
5.总结
5.1.什么是协程?
协程也可以称之为微线程, 就是开发者控制着线程的执行流程: 控制先执行某段代码然后再切换到另外函数执行
5.2.协程可以提高并发吗?
协程自己本身无法实现并发(甚至性能会降低)
协程 + IO切换性能提高
5.3.进程,线程,协程的区别?
进程:
线程:
协程:
5.4.如果是单线程并发:(以下两个方式本质上是一样的)
gevent: 协程 + io切换
Twisted: 基于事件循环的异步非阻塞模块框架
协程没有回调函数, 但是它可以自动切换
协程本质上用的操作系统的一个库, 内部用的是也是事件循环
所以这两个用哪个都一样的
6.手动实现协程: yield关键字生成器
def f1():
print(11)
yield
print(22)
yield
print(33)
def f2():
print(44)
yield
print(55)
yield
print(66)
v2 = f2()
v1 = f1()
v1.__next__() #用生成器就可以控制两个函数中的代码来回交替
next(v2)
next(v1)
v2.__next__()