day35 python socket 单线程的并发 io多路复用 协程 greenlet模块 gevent模块

day35 python socket 单线程的并发 io多路复用 协程 greenlet模块 gevent模块   一.socket相关     1.socket: 所有的网络请求都是基于socket实现的,默认是阻塞的     2.requests是用的socket的客户端     3.socket到底哪端(谁)发生了变化         如果客户端向服务端发起连接时, 则服务端发生了变化         如果服务端向客户端发送数据时, 则客户端发生了变化             conn,addr = server.accept()             conn.recv()     4.如果想要提高并发(目前有以下两种)         多进程: 计算密集型         多线程: io密集型,如socket请求   二.单线程的并发     需求: 模拟浏览器发送请求: 向百度发送请求搜索三个关键词     1.单线程(串行):          使用requests模块 import requests key_list = ['bajie','wukong','datang']   for item in key_list:     ret = requests.get(         url="https://www.baidu.com/s?wd=%s" % (item,),         headers = {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0"}     )     print(ret.text)     2.单线程(串行):         用socket解析requests的原理 import socket   def task(key):     client = socket.socket()     client.connect(('www.baidu.com',80))            #阻塞: 和百度创建连接     client.sendall(b'GET /s?wd=wukong HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n')                                                     #告诉百度我的请求, 用的是http协议(数据格式)     data_list = []     while 1:         data = client.recv(4096)                    #等着接收百度的回复         if not data:             break         data_list.append(data)     body = b''.join(data_list)     print(body) key_list = ['bajie','wukong','datang']               ## for item in key_list:     task(item)     3.多线程(并发) import socket import threading   def task(key):     client = socket.socket()     client.connect(('www.baidu.com',80))              #阻塞: 和百度创建连接     client.sendall('GET /s?wd=wukong HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.encode('utf-8'))                                                           #告诉百度我的请求, 用的是http协议(数据格式)     data_list = []     while 1:         data = client.recv(4096)                      #等着接收百度的回复         if not data:             break         data_list.append(data)     body = b''.join(data_list)     print(body) key_list = ['bajie','wukong','datang']                ## for item in key_list:     t = threading.Thread(target=task,args=(item,))     t.start()     4.多线程并发时可能出现的问题         4.1.问题:              假设连接非常慢, 三个人就傻傻地等着,不能做其他事         4.2.怎么解决?              如果发现是io等待, 那么我就先把请求发出去, 然后去干其他事.             又假设连接请求是一个一个回来的, 那么等这个io请求回来了, 我再处理, 就实现了单线程的并发         4.3.单线程的并发本质是什么?             单线程的io操作不等待         4.4.单线程的并发如何实现?             需解决: socket如何让io不等待             需解决: 如何知道结果回来了     5.单线程(并发)         5.1.socket如何让io不等待: 把阻塞变成不阻塞             client.setblocking(False) import socket def task(key):     client = socket.socket()     client.setblocking(False)                             #socket默认是阻塞的: 那么我可以给它设置成不是阻塞的; 但是客户端再去连接的时候会报错     try:         client.connect(('www.baidu.com',80))              #加个异常处理: 上面设置了原来阻塞的位置不再阻塞, 这句执行了, 但是报错     except BlockingIOError as e:         pass     #此时我这里需要有个检测机制, 检测到连接成功, 才能继续下面的操作       client.sendall('GET /s?wd=wukong HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.encode('utf-8'))       data_list = []     while 1:         data = client.recv(4096)                           if not data:             break         data_list.append(data)     body = b''.join(data_list)     print(body) key_list = ['bajie','wukong','datang']                    ## for item in key_list:     task(item)             5.2.如何知道结果回来了: 使用io多路复用             io多路复用的应用:                  工作的时候没用过, 但是很多的东西都是基于这个做的                 io多路复用和socket的非阻塞,实现了单线程的并发             作用:                 检测多个socket是否已经发生变化(是否已经连接成功/是否已经获取数据)(rlist/wlist)             语法:                 select.select(rlist=[],                    #检测列表发生变化则返回(返回数据了吗?)                               wlist=[],                    #检测列表发生变化则返回(连接成功了吗?)                               xlist=[],                    #检测列表发生变化则返回(有异常吗?)                               timeout=0.005,               #检测的时间间隔                               ) import socket import select   client1 = socket.socket()                                #创建了一个socket client1.setblocking(False) try:     client1.connect(('www.baidu.com',80)) except BlockingIOError as e:     pass   client2 = socket.socket()                                #socket的非阻塞.立即又创建了一个socket client2.setblocking(False) try:     client2.connect(('www.sogou.com',80)) except BlockingIOError as e:     pass   client3 = socket.socket()                                #socket的非阻塞.立即又创建了一个socket client3.setblocking(False) try:     client3.connect(('www.oldboyedu.com',80)) except BlockingIOError as e:     pass   dat_list = [client1,client2,client3] conn_list = [client1,client2,client3]   while True:                                             #不停地去检测socket的状态     rlist,wlist,xlist = select.select(dat_list,conn_list,[],0.005)                                                            #每 0.005秒 去检测socket的状态: 连接成功了吗?返回数据了吗?有异常吗?     for sk in wlist:                                    #处理连接成功的socket         if sk == client1:             sk.sendall('GET /s?wd=wukong HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.encode('utf-8'))         elif sk == client2:             sk.sendall('GET /web?query=bajie HTTP/1.0\r\nHost: www.sogou.com\r\n\r\n'.encode('utf-8'))         else:             sk.sendall('GET /liaojie/index.html HTTP/1.0\r\nHost: www.oldboyedu.com\r\n\r\n'.encode('utf-8'))         conn_list.remove(sk)                            #处理完连接, 把sk从连接列表里剔除掉, 不用在检测       for sk in rlist:                                    #处理有数据回来的socket         data_list = []         while True:             try:                 data = sk.recv(8096)                    #因为上面的setblocking(False), 所有阻塞的地方都不阻塞, 所以这里应报错                 data_list.append(data)             except BlockingIOError as e:                 break         body = b''.join(data_list)         print(body)         sk.close()                                      #接收完数据的sk,断开socket连接         dat_list.remove(sk)                             #接收完数据的sk,把sk从数据检测的列表里剔除掉, 不用在检测       if not dat_list:                                    #都下载完数据了, 就停止检测         break         5.3.总结             基于io多路复用和socket实现并发请求             比多线程节省资源: 通过一个线程做20个线程做的事情 三.基于事件循环的异步非阻塞的框架         非阻塞: 不等待         异步: 指的是一个回调的过程,前面的某个事情完成了,自动执行一个'回调'函数; 上面的例子异步还没体现出来         事件循环: 循环地去做某个事         基于事件循环的异步非阻塞框架: Twisted              已经实现上述功能的模块, python中开源的模块          四.自定义异步非阻塞模块: 实现单线程并发     单线程并发的高级版:          和上面代码的原理一样, 只是封装好了         而且体现了异步的特性: 下载完数据就自动执行我给它的函数 import socket import select   class Foo(object):     def __init__(self,sk,func):         self.sk = sk         self.func = func     def fileno(self):         return self.sk.fileno() class Bajie(object):     def __init__(self):                                  #初始化,搞两个空检测列表         self.conn_list = []         self.socket_list = []         self.info_list = []     def add(self,domain,func):                           #创建socket, 加入到检测列表中         client = socket.socket()         client.setblocking(False)         try:             client.connect((domain, 80))         except BlockingIOError as e:             pass         obj = Foo(client,func)         self.socket_list.append(obj)         self.conn_list.append(obj)         self.info_list.append(obj)     def run(self):         while True:                                       #不停地去检测fileno()的状态             rlist, wlist, xlist = select.select(self.socket_list, self.conn_list, [],0.005)                                                             #每0.005秒去检测fileno()的状态: 连接成功了吗?返回数据了吗?有异常吗?               for obj in wlist:                             #处理连接成功的obj                 if obj == self.info_list[0]:                     obj.sk.sendall('GET /s?wd=wukong HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.encode('utf-8'))                 elif obj == self.info_list[1]:                     obj.sk.sendall('GET /web?query=bajie HTTP/1.0\r\nHost: www.sogou.com\r\n\r\n'.encode('utf-8'))                 else:                     obj.sk.sendall('GET /liaojie/index.html HTTP/1.0\r\nHost: www.oldboyedu.com\r\n\r\n'.encode('utf-8'))                 self.conn_list.remove(obj)                #处理完连接, 把obj从连接列表里剔除掉, 不用在检测             for obj in rlist:                             #处理有数据回来的obj                 data_list = []                 while True:                     try:                         data = obj.sk.recv(8096)          #因为上面的setblocking(False), 所有阻塞的地方都不阻塞, 所以这里应报错                         data_list.append(data)                     except BlockingIOError as e:                         break                 body = b''.join(data_list)                 # print(body)                             #下载完不打印了, 而去执行他们各自的函数, (回调就是这样搞)                 obj.func(body)                 obj.sk.close()                            #接收完数据的obj,断开socket连接                 self.socket_list.remove(obj)              #接收完数据的obj,把obj从数据检测的列表里剔除掉, 不用在检测               if not self.socket_list:                      #都下载完数据了, 就停止检测                 break def baidu_func(body):     print('www.baidu.com--->',body) def sogou_func(body):     print('www.sogou.com--->', body) def old_func(body):     print('www.oldboyedu.com--->', body)   t1 = Bajie() t1.add('www.baidu.com',baidu_func)                        #异步的体现: 当执行完前面的动作, 自动执行baidu_func函数 t1.add('www.sogou.com',sogou_func) t1.add('www.oldboyedu.com',old_func) t1.run()   五.总结:      1.socket默认是阻塞的, 体现在连接和接收数据, 但是可以使用client.setblocking(False)变成非阻塞     2.io多路复用的作用:          检测多个socket是否发生变化         使用的select是调用的操作系统的功能         操作系统检测socket是否发生变化的三种模式:             select:原始的, 对检测的socket的个数有限制: 1024 个, 内部实现时用的是循环检测        :select.select()             poll:对检测的socket的个数不再限制: 内部实现时还是循环检测(水平触发)             epoll:对检测的socket的个数不再限制: 内部实现时不再是检测, 而是等着socket报告给我(边缘触发)    :select.epoll()             windows只支持select.select()             linux是都支持的, epoll会比select用法上复杂一点点     3.以后完成某件事提高并发的方案:         多进程         多线程         异步非阻塞模块:Twisted, scrapy框架(单线程完成并发)             会有问题: 如果发100个请求,倒是可以一起发请求出去, 如果100个是 同时回来的, 这种一个线程就完蛋了             怎么办:                  控制请求的数量                 使用生产者消费者模型(比如单线程并发下载,下载的数据放到队列,多进程处理数据)     4.什么是异步非阻塞(面试题)?         非阻塞:不等待, 比如创建一个socket进行连接某个地址, 或者接收数据时, 默认都是阻塞的(等待连接成功,或接受到数据), 才执行后续的操作,              如果设置成setblocking(False)非阻塞, 以上两个过程就不再等待, 但是会报BlockingIOError错误, 这个错误只要捕获即可         异步:通知和回调, 当我们完成某件事时,自动调用回调函数或自动执行某些操作, 比如做爬虫向某个地址发送请求, 当请求完成之后, 自动执行指定的回调函数(就是通知)     5.什么是同步阻塞?         阻塞: 等         同步: 按顺序一个一个执行, 比如for循环     6.io多路复用和性能有关系吗?         只是检测, 要看你和谁配合     7.需要懵懂的一个概念         封装: 当别人不改变调用代码的时候, 但是还要有这个东西, 那么你就给它封装进去     六.协程(本质遇到io就切换)     1.什么是协程:         线程: 操作系统中存在的         进程: 操作系统中存在的         协程: 不是一个真实存在的东西, 和线程和进程不同, 是由程序员创造的         协程: 可以认为是微线程, 对线程进行分片, 使线程可以在代码块之间进行来回切换执行,而不是原来的逐行执行     2.如何让两个代码块可以来回切换执行?    
        2.1.比如下面的两个函数, 让程序员来控制, 先执行11,再执行33,回去执行22,再执行44 def f1():     print(11)     print(22) def f2():     print(33)     print(44) f1() f2()         2.2.使用greenlet模块
            装包: pip3 install greenlet import greenlet def f1():     print(11)     gr2.switch()     print(22)     gr2.switch() def f2():     print(33)     gr1.switch()     print(44) gr1 = greenlet.greenlet(f1)                    #创建了两个协程gr1,gr2 gr2 = greenlet.greenlet(f2)        gr1.switch()                                    #执行协程gr1     3.协程存在的意义?         协程: 人为进行进程代码切换         单纯的协程是没有任何用处的, 可能会让性能更低         用作通过单线程实现并发: 单纯的它自己无法实现             需要额外 + 一段代码配合(代码里面一旦遇到io操作, 就通过协程切换出去执行其他的代码)             即协程 + 遇到io就切换     4.gevent模块         装包: pip3 install gevent         内部封装了 greenlet + 遇到io就切换 from gevent import monkey monkey.patch_all()                                             #必须要有; 以后代码中遇到IO操作, 内部会自动执行greenlet的switch进行切换  (patch 补丁)   import requests import gevent   def get_page(url):     rst = requests.get(url)     print(url, rst.content)   gevent.joinall(     [         gevent.spawn(get_page,'https://www.python.org/'),       #协程1    (spawn: 产卵)         gevent.spawn(get_page,'https://www.yahoo.com/'),        #协程2         gevent.spawn(get_page,'https://github.com/')            #协程3     ] )       5.总结         5.1.什么是协程?             协程也可以称之为微线程, 就是开发者控制着线程的执行流程: 控制先执行某段代码然后再切换到另外函数执行         5.2.协程可以提高并发吗?             协程自己本身无法实现并发(甚至性能会降低)             协程 + IO切换性能提高         5.3.进程,线程,协程的区别?             进程:             线程:             协程:         5.4.如果是单线程并发:(以下两个方式本质上是一样的)             gevent: 协程 + io切换             Twisted: 基于事件循环的异步非阻塞模块框架             协程没有回调函数, 但是它可以自动切换             协程本质上用的操作系统的一个库, 内部用的是也是事件循环             所以这两个用哪个都一样的       6.手动实现协程: yield关键字生成器 def f1():     print(11)     yield     print(22)     yield     print(33) def f2():     print(44)     yield     print(55)     yield     print(66)   v2 = f2() v1 = f1()   v1.__next__()                                                       #用生成器就可以控制两个函数中的代码来回交替 next(v2) next(v1) v2.__next__()            
上一篇:并发和数据库


下一篇:Django框架简介