同步,异步,阻塞,非阻塞,异步加回调机制,线程队列,event事件

1. 同步,异步,阻塞,非阻塞,

  • 站在任务发布的角度:
    • 同步:任务发出去之后,等待结果,直到这个任务最终结束后,返回结果,再发步下一个任务.
    • 异步:所有任务同时发出,不会在原地等待结果返回
  • 程序运行中表现得状态:阻塞.运行,就绪
    • 阻塞:程序遇到IO阻塞,程序遇到IO立马会停止(挂起),cpu马上切换,等IO结束后再执行

    • 非阻塞:程序没有IO或者遇到IO通过某种手段让CPU去执行同一个线程里面的其他的任务,尽可能的占用CPU

      # 异步回收任务的方式一: 将所有任务的结果统一收回
      from concurrent.futures import ProcessPoolExecutor
      import os
      import time
      import random
      def task():
          print(f'{os.getpid()} is running')
          time.sleep(random.randint(1, 3))
          return f'{os.getpid()} is finish'
      
      if __name__ == '__main__':
          p = ProcessPoolExecutor(4)
          lst = []
          for i in range(10):
              res = p.submit(task)   # 异步发出
              lst.append(res)
              # print(res.result())  # 在这里result()就会变成同步
          p.shutdown(wait=True)
          # 1.阻止再向进程池投放新的任务
          # 2.wait=True 一个任务完成了就减一,直至为0才执行下一行
          for res in lst:
              print(res.result())

2.异步加回调机制,

# 浏览器做的事情很简单,封装一些头部,发一个请求到服务器,服务器拿到请求信息,分析信息,分析正确之后,给浏览器返回一个文件,浏览器将这个文件的代码渲染就成了网页
# 爬虫: 利用requests模块,模拟浏览器,封装头给服务器发送请求,骗过服务器,服务器也给你返回一个文件,
# 爬虫拿到文件进行数据清洗,获取想要的信息
# 爬虫: 分两步
#   第一步: 爬取服务端的文件(IO阻塞)
#   第二步: 拿到文件,进行数据清洗(非IO,极少IO)

# 版本一
from concurrent.futures import ProcessPoolExecutor
import requests
import time
import os
import random
def get(url):  # 爬取文件
    response = requests.get(url)
    print(os.getpid(), '正在爬取:', url)
    time.sleep(random.randint(1, 3))
    if response.status_code == 200:
        return response.text

def parse(text): # 对爬取回来的字符串的分析,用len模拟一下
    print('分析结果:', len(text))

if __name__ == '__main__':
    url_list = [
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/',
        'https://www.cnblogs.com/'
    ]
    pool = ProcessPoolExecutor(4)
    obj_list = []
    for url in url_list:
        obj = pool.submit(get, url)
        obj_list.append(obj)
    pool.shutdown(wait=True)
    for obj in obj_list:
        text = obj.result()
        parse(text)
# 问题出在哪里?
# 1.分析结果的过程是串行,效率低
# 2.将所有的结果全部爬取成功之后,放在一个列表中
-------------------------------------------------------
# 版本二:异步处理,获取结果的第二种方式
# 完成一个任务,返回一个结果,并发的获取结果
from concurrent.futures import ProcessPoolExecutor
import requests
import time
import os
import random
def get(url):  # 爬取文件
    response = requests.get(url)
    print(os.getpid(), '正在爬取:', url)
    time.sleep(random.randint(1, 3))
    if response.status_code == 200:
        parse(response.text)
        # return response.text

def parse(text): # 对爬取回来的字符串的分析,用len模拟一下
    print('分析结果:', len(text))

if __name__ == '__main__':
    url_list = [
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/',
        'https://www.cnblogs.com/'
    ]
    pool = ProcessPoolExecutor(4)
    for url in url_list:
        obj = pool.submit(get, url)
    pool.shutdown(wait=True)
# 问题,增强了耦合性
------------------------------------------------------
# 版本三: 版本二,两个任务有耦合性.在上一个基础上,对其进行解耦
from concurrent.futures import ProcessPoolExecutor
import requests
import time
import os
import random
def get(url):  # 爬取文件
    response = requests.get(url)
    print(os.getpid(), '正在爬取:', url)
    time.sleep(random.randint(1, 3))
    if response.status_code == 200:
        return response.text

def parse(obj): # 对爬取回来的字符串的分析,用len模拟一下
    print(f'{os.getpid()}分析结果:', len(obj.result()))

if __name__ == '__main__':
    url_list = [
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/',
        'https://www.cnblogs.com/'
    ]
    pool = ProcessPoolExecutor(4)
    for url in url_list:
        obj = pool.submit(get, url)
        obj.add_done_callback(parse)  # 增加一个回调函数
        # 现在的进程完成的还是网络爬取的任务,拿到返回值之后,丢给回调函数,
        # 进程继续完成下一个任务,回调函数进行分析结果
    pool.shutdown(wait=True)
# 回调函数是主进程实现的,回调函数帮我们进行分析任务
# 明确了进程的任务就是网络爬取,分析任务交给回调函数执行,对函数之间解耦

# 极值情况: 如果回调函数是IO任务,那么由于回调函数是主进程做的,所以有可能影响效率
# 回调不是万能的,如果回调的任务是IO,那么异步+回调机制不好,此时如果需要效率,只能再开一个线程或进程池

# 异步就是回调?
# 这个论点是错的,异步,回调是两个概念

# 如果多个任务,多进程多线程处理的IO任务
# 1. 剩下的任务 非IO阻塞   异步+回调机制
# 2. 剩下的任务有 IO  远小于  多个任务的IO   异步+回调机制
# 3. 剩下的任务 IO 大于等于 多个任务的IO  第二种解决方式,或者开启两个进程/线程池

3.线程队列(进程相同)

  • FIFO: 先进先出

    import queue
    #不需要通过threading模块里面导入,直接import queue就可以了,这是python自带的
    #用法基本和我们进程multiprocess中的queue是一样的
    q = queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(先进先出):
    first
    second
    third
    '''
  • LIFO: 后进先出(栈)

    import queue
    q = queue.LifoQueue() # 队列,类似于栈
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(后进先出):
    third
    second
    first
    '''
  • Priority: 优先级队列

    import queue
    
    q = queue.PriorityQueue()
    # put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((-10, 'a'))
    q.put((-5, 'a'))  #负数也可以
    # q.put((20, 'ws'))  #如果两个值的优先级一样,那么按照后面的值的acsii码顺序来排序,如果字符串第一个数元素相同,比较第二个元素的acsii码顺序
    # q.put((20, 'wd'))
    # q.put((20, {'a': 11})) #TypeError: unorderable types: dict() < dict() 不能是字典
    # q.put((20, ('w', 1)))  #优先级相同的两个数据,他们后面的值必须是相同的数据类型才能比较,可以是元祖,也是通过元素的ascii码顺序来排序
    
    q.put((20, 'b'))
    q.put((20, 'a'))
    q.put((0, 'b'))
    q.put((30, 'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    (-10, 'a')
    (-5, 'a')
    (0, 'b')
    (20, 'a')
    (20, 'b')
    (30, 'c')
    '''

4. Event事件

  • 线程的一个关键特性是每个线程都是独立运行且状态不可预测.如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手.为了解决这些问题,我们需要使用threading库中的Event对象. 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生.在初始情况下,Event对象中的信号标志被设置为假.如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真.一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程.如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行(进程也一样)

    同步,异步,阻塞,非阻塞,异步加回调机制,线程队列,event事件

  • 方法

    event.isSet(): 返回event的状态值
    
    event.wait(): 如果 event.isSet() == False将阻塞线程
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度
    
    event.clear(): 恢复event的状态值为False
  • 示例

    from threading import Thread
    from threading import current_thread
    from threading import Event
    import time
    event = Event()  # 默认False
    def task():
        print(f'{current_thread().name}检测服务器是否正常开启....')
        time.sleep(3)
        event.set()  # 改成True
    
    def task1():
        print(f'{current_thread().name}正在尝试连接服务器')
        event.wait()  # 阻塞,轮询检测event是否为True,当其为True,继续下一行代码
        # event.wait(1) # 超时时间,超过时间无论是否为True都继续下一行代码
        print(f'{current_thread().name}连接成功')
    
    if __name__ == '__main__':
        t1 = Thread(target=task1)
        t2 = Thread(target=task1)
        t3 = Thread(target=task1)
        t = Thread(target=task)
        t1.start()
        t2.start()
        t3.start()
        t.start()

    协程

    1. 概念

    协程本质就是一条线程,多个任务在一条线程上来回切换 协程是操作系统不可见的 协程的概念本身并 没有规避I/O操作,但是我们可以利用协程这个概念来实现规避I/O操作,进而达到了我们将一条线程中 的I/O操作降到最低的目的 协程能够实现的大部分I/O操作都在网络

    2. 相关模块概览和协程的应用

    gevent:利用了greenlet底层模块(C语言写的)完成的切换 + 自动规避io的功能

    3. gevent模块

    import gevent
    def eat(name):
        print('%s eat 1' %name)
        gevent.sleep(2)
        print('%s eat 2' %name)
    
    def play(name):
        print('%s play 1' %name)
        gevent.sleep(1)
        print('%s play 2' %name)
    
    
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,name='egon')
    g1.join()
    g2.join()
    #或者gevent.joinall([g1,g2])
    print('主')
    
    遇到I/O切换

    上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,

      而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

      from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

      或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

    from gevent import monkey;monkey.patch_all() #必须写在最上面,这句话后面的所有阻塞全部能够识别了
    
    import gevent  #直接导入即可
    import time
    def eat():
        #print()  
        print('eat food 1')
        time.sleep(2)  #加上mokey就能够识别到time模块的sleep了
        print('eat food 2')
    
    def play():
        print('play 1')
        time.sleep(1)  #来回切换,直到一个I/O的时间结束,这里都是我们个gevent做得,不再是控制不了的操作系统了。
        print('play 2')
    
    g1=gevent.spawn(eat)
    g2=gevent.spawn(play_phone)
    gevent.joinall([g1,g2])
    print('主')
    

    我们可以用threading.current_thread().getName()来查看每个g1和g2,查看的结果为DummyThread-n,即假线程,虚拟线程,其实都在一个线程里面

      进程线程的任务切换是由操作系统自行切换的,你自己不能控制

      协程是通过自己的程序(代码)来进行切换的,自己能够控制,只有遇到协程模块能够识别的IO操作的时候,程序才会进行任务切换,实现并发效果,如果所有程序都没有IO操作,那么就基本属于串行执行了。

Gevent之同步与异步

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()
#上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。

协程:同步异步对比
上一篇:python 并发编程 协程池


下一篇:使用gevent在应用程序上下文之外工作的烧瓶蓝图