day14 课堂笔记 协程

文章目录

1.异步调用+回调函数

1.2浏览器、爬虫原理

  • 浏览器原理

    浏览器会将你的请求数据通过网络发送到百度的服务器,服务器接收到请求数据,验证请求数据后,返回给浏览器软件一个HTML网页数据,浏览器接收到这个HTML页面数据通过浏览器内核的渲染机制,渲染成美丽的页面。

  • 爬虫原理

    • 爬虫模拟一个浏览器向服务器请求数据

    • 数据请求成功后,通过数据清洗获取目标的数据

      import requests
      ret = requests.get('http://www.baidu.com')
      if ret.status_code == 200:
          print(ret.text)
      

1.2 异步调用如何获取结果

发布三个任务:几种回收方式?

  • 所有的任务全部完成后,统一回收、

  • 那个任务先完成,回收其任务、

  • 方案一

    演示一下统一回收所有的结果。

    from concurrent.futures import ThreadPoolExecutor
    import requests
    
    
    def crawling(url: str):
        ret = requests.get(url)
        if ret.status_code == 200:
            return ret.text
       
    
    def parse(data: str):
        return len(data)
    
    
    if __name__ == '__main__':
        thread_poor = ThreadPoolExecutor(5)
        # 方法一
        # obj = threadi_poor.submit(crawling, 'http://www.baidu.com')
        # parse(obj.result)
        # obj = threadi_poor.submit(crawling, 'http://www.baidu.com')
        # parse(obj.result)
        # obj = threadi_poor.submit(crawling, 'http://www.baidu.com')
        # parse(obj.result)
        
        
        # 方法二
        url_list = [
            'http://www.baidu.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.taobao.com',
            'https://www.cnblogs.com/jin-xin/articles/7459977.html',
            'https://www.luffycity.com/',
            'https://www.cnblogs.com/jin-xin/articles/9811379.html',
            'https://www.cnblogs.com/jin-xin/articles/11245654.html',
            'https://www.sina.com.cn/',
        ]
        
        task_list = []
        for url in url_list:
            obj = thread_poor.submit(crawling,url)
            task_list.append(obj)
           
        thread_poor.shutdown()
        for obj in task_list:
            print(parse(obj.result()))
    

    方案一过程描述:

    我们开启了5线程的线程池并发的处理了10个IO密集型的任务,但是我们还有10个计算密集型的任务是通过串行处理的,这样不合理,效率低。

  • 方案二

    from concurrent.futures import ThreadPoolExcutor
    import requests
    
    
    def crawling(url: str):
        ret = requests.get(url)
        if ret.status_code == 200:
            return parse(ret.text)
       
    
    def parse(data: str):
        return len(data)
    
    
    if __name__ == '__main__':
        thread_poor = ThreadPoolExecutor(5)
        url_list = [
            'http://www.baidu.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.taobao.com',
            'https://www.cnblogs.com/jin-xin/articles/7459977.html',
            'https://www.luffycity.com/',
            'https://www.cnblogs.com/jin-xin/articles/9811379.html',
            'https://www.cnblogs.com/jin-xin/articles/11245654.html',
            'https://www.sina.com.cn/',
        ]
        task_list = []
        for url in url_list:
            obj = thread_poor.submit(crawling,url)
            task_list.append(obj)
        thread_poor.shutdown()
        for obj in task_list:
            print(obj.result())
    

    方案二过程:

    我们开启了5线程的线程池并发处理了10个IO密集型的人任务(每个任务分两个小任务IO+计算),方案二比方案一效率更高一些,但是也有问题:

    • 利用多线程(多进程,线程池等)处理的任务最好都是IO密集型,不要掺杂这计算密集型,当我们现在是一个IO+一个计算两种任务耦合到了一起,增加了耦合性。
    • 无论方案一还是方案二都没有做到实时获取结果。

1.3 异步调用+回调函数

from concurrent.futures import ThreadPoolExecutor
import requests


def crawling(url: str):
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.text
   

def parse(obj):
    print(len(obj.result()))
   

if __name__ == '__main__':
    thread_poor = ThreadPoolExecutor(5)
    url_list = [
        'http://www.baidu.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.taobao.com',
        'https://www.cnblogs.com/jin-xin/articles/7459977.html',
        'https://www.luffycity.com/',
        'https://www.cnblogs.com/jin-xin/articles/9811379.html',
        'https://www.cnblogs.com/jin-xin/articles/11245654.html',
        'https://www.sina.com.cn/',
    ]
    for url in url_list:
        obj = thread_poor.submit(crawling, url)
        obj.add_done_callback(parse)
    thread_poor.shutdown() # 等待线程池的所有线程将10个IO任务并发的处理完毕
    # print(task_list) # 所有的任务对象都是已完成的状态

异步调用+回调函数 机制:

我们开启了5线程的线程池并发的处理了10个IO密集型的任务,并且让每个任务设置了回调函数,回调函数处理的数据解析的功能,10个IO密集型的任务线程池的线程去执行,10个任务的回调函数交由空闲的线程(或者主线程)去执行。

参数解释:

add_done_callback:回调函数,只要有线程或者进程完成了当前网页爬取的任务,剩下的分析结果的任务交由回调函数去执行,线程或者进程继续进行网页爬取的任务。

**整体的执行流程:**线程池设置4个线程,异步发起10个任务,每个任务是通过网页获取源码,并发执行,当一个任务完成之后,将parse这个分析代码的任务交由剩余的空闲的线程去执行,你这个线程继续去处理其他任务。

什么情况下用异步调用+回调函数的机制呢?

你要具备两种类型的任务:第一种IO密集型的任务,第二种计算密集型(耗时很短)的任务,这样IO密集型的任务我们可以利用并发或者并行处理,任何一个处理完毕得到结果之后,直接抛给回调函数,回调函数一般都是主进程或者主线程(空余线程)处理,所以第二种任务一定要耗时短,尽量无IO阻塞。

进程池+回到函数:回调函数有主进程去执行。

线程池+回调函数:回调函数有空闲线程去执行。

2. 线程队列

2.1 先进先出队列

import queue
q = queue.Queue(3)
q.put(111)
q.put(222)
q.put(333)
# q.put(444)
print(q.get())
print(q.get())
print(q.get())
print(q.get())

2.2 后进先出队列

import queue
q = queue.LifoQueue(4)  # last in first out
q.put(111)
q.put(222)
q.put(333)
q.put('玮哥')

print(q.get())
print(q.get())
print(q.get())
print(q.get())

2.3 优先级队列

import queue
q = queue.PriorityQueue(5)
# 每次通过元组的形式插入,元组的第一个元素一定是int类型,数字越低,优先级越高。
q.put((0, '新闻'))
q.put((2, '峰哥'))
q.put((-10, '冲哥'))
q.put((5, '日天哥', 'fdsafd'))

print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())

3. 事件Event

线程与进程都有event存在,先不说什么事件event,我们先聊一聊线程和进程的一个特性:每一个线程(进程)都是独立运行且状态不可预测。如果程序中的其他线程(进程)需要通过判断某个线程的状态来确定自己下一步操作,这时线程(进程)同步问题就会变得非常棘手。其实这是增加了线程(进程)之间的关联性,一个线程(进程)运行到某个节点决定另个一个线程(进程)是否继续运行。听起来很神秘,其实没有时间event照样可以实现。

from threading import Thread, Event
# from multiprocessing import Event
import time
import threading

event = Event()
def connect():
    print(f'{threading.currentThread()}检查服务器是否开启...')
    time.sleep(3)
    print(f'{threading.currentThread()}确定服务器已经正常开启')
    event.set()
def ask():

    print(f'{threading.currentThread()}尝试连接服务器....')
    # event.wait()  # 阻塞,直到event对象改变了状态在向下进行
    # event.wait(timeout=2)  # 可以设置阻塞时长,如果超过设置的世间仍未改变状态,则直接向下运行
    # event.wait(timeout=4)  #
    print(f'{threading.currentThread()}连接成功')


if __name__ == '__main__':
    t1 = Thread(target=connect,)
    t2 = Thread(target=ask,)
    t1.start()
    t2.start()
上一篇:架构期day14-四层负载均衡


下一篇:day14_ 权限修饰符 内部类 匿名内部类