线程和协程详解-python

1、前言

关于基本概念部分这里不再详述,可以参考之前的文章或者自行查阅相关文章。

由于python中线程的创建、属性和方法和进程很相似,这里也不再讲解。

这里重点讲解下多线程访问共享数据的相关问题。

2、多线程数据完全与解决

先看下示例:预测并执行看下结果是否和预测一致

from threading import Thread

sum = 0


def minus():
    global sum
    for i in range(1000000):
        sum += 1


def plus():
    global sum
    for i in range(1000000):
        sum -= 1


if __name__ == '__main__':
    t1 = Thread(target=minus)
    t2 = Thread(target=plus)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print('sum=',sum)
  • 预测:就我们学习的数学知识而言,加减相同数字相同次数,那么结果不会改变
  • 结果:不可预测
  • 简单分析:对于sum的计算和赋值并不是原子操作,可细分为-取值,+1,赋值,存入内存,在但线程的情况下是没有任何问题的;在多线程的情况下,初值为0,一个线程加一个线程1减1,结果可能是0,-1,1
    • 0:正常结果
    • 1:减一线程执行计算,结果存入内存,加1线程执行完毕,1覆盖了-1
    • -1:加一线程执行计算,结果存入内存,减一线程执行完毕,-1覆盖了1

如何解决多线程下共享数据的安全问题呢?

2.1、加锁

  • 互斥锁:Lock

    • 同一时刻,同一资源只能被一个线程访问
    import time
    import threading
    
    R = threading.Lock()
    
    
    def sub():
        global num
        R.acquire()  # 加锁,保证同一时刻只有一个线程可以修改数据
        num -= 1
        R.release()  # 修改完成就可以解锁
        time.sleep(1)
    
    
    num = 100  # 定义一个全局变量
    l = []  # 定义一个空列表,用来存放所有的列表
    
    
    def main():
        for i in range(100):  # for循环100次
            t = threading.Thread(target=sub)  # 每次循环开启一个线程
            t.start()  # 开启线程
            l.append(t)  # 将线程加入列表l
        for i in l:
            i.join()  # 这里加上join保证所有的线程结束后才运行下面的代码
        print(num)
    
    
    if __name__ == '__main__':
        main()
    
    
  • 递归锁(可重入):RLock

    • 同一线程可多次获取同一资源
    import time
    import threading
    
    lock = threading.RLock()
    
    
    def minus1():
        global num2
        lock.acquire()
        num2 -= 1
        lock.release()
    
    
    def minus2():
        global num1  # 在每个线程中都获取这个全局变量
        lock.acquire()
        minus1()
        time.sleep(1)
        num1 -= 1  # 对此公共变量进行-1操作
        print('%s--get num1:%s,num2:%s' % (threading.current_thread().name, num1, num2))
        lock.release()
    
    
    num1, num2 = 5, 9  # 设定共享变量
    thread_list = []
    
    if __name__ == '__main__':
        for i in range(5):
            t = threading.Thread(target=minus2)
            t.start()
            thread_list.append(t)
    
        while threading.active_count() != 1:
            print(threading.active_count())
            time.sleep(1)
        else:
            print('----all threads done---')
        print('final num:', num1, num2)
    

2.2、信号量:semaphore

  • 互斥锁允许同一时刻只有一个线程修改数据,Semaphore 允许同一时刻运行一定数量的线程。
import time
import threading

semaphore = threading.BoundedSemaphore(3)


def task():
    semaphore.acquire()
    print('{}---running-{}'.format(threading.current_thread().name,
                                   time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))))
    time.sleep(1)
    semaphore.release()


if __name__ == '__main__':
    l = []
    for i in range(10):
        t = threading.Thread(target=task, name='task' + str(i))
        t.start()
        l.append(t)

    for i in l:
        i.join()

2.3、Events:事件驱动

  • Event对象通过设置/清除标志位来实现和其他线程的同步,例如交通灯来修改Event的标志位来控制车辆线程的状态。
import threading, time, random

events = threading.Event()


def lighter():
    if not events.isSet():
        events.set()  # 初始化绿灯Event set
    counter = 0
    while True:
        if counter < 5:
            print('\033[42;0mGreen is lighten...\033[0m')
        elif counter < 10:
            if events.isSet():
                events.clear()
            print('\033[41;0mRed is lighten...\033[0m')

        else:
            counter = 0
            print('\033[42;1m--green light on---\033[0m')
            events.set()
        time.sleep(1)
        counter += 1


def car(i):
    while True:
        if events.isSet():
            print("car[%s] is running..." % i)
            time.sleep(random.randrange(10))
        else:
            print('car is waiting green lighten...')
            events.wait()


if __name__ == '__main__':
    lighter1 = threading.Thread(target=lighter)
    lighter1.start()
    for i in range(3):
        t = threading.Thread(target=car, args=(i,))
        t.start()

2.4、死锁

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源时,就会造成死锁。
尽管死锁很少发生,但一旦发生就会造成应用的停止响应。
示例代码:

import time
from threading import Thread, Lock

la = Lock()
lb = Lock()


def task1():
    if la.acquire():
        print('task1获取锁A')
        time.sleep(1)
        if lb.acquire():
            print('task1获取锁B')
            lb.release()
            print('task1释放锁B')
        la.release()
        print('task1释放锁A')


def task2():
    if lb.acquire():
        print('task2获取锁B')
        time.sleep(1)
        if la.acquire():
            print('task2获取锁A')
            la.release()
            print('task2释放锁A')
        lb.release()
        print('task2释放锁B')


if __name__ == '__main__':
    t1 = Thread(target=task1)
    t2 = Thread(target=task2)

    t1.start()
    t2.start()

    t1.join()
    t2.join()
  • 解决死锁

    • 添加超时时间

      import time
      from threading import Thread, Lock
      
      la = Lock()
      lb = Lock()
      
      
      def task1():
          if la.acquire():
              print('task1获取锁A')
              time.sleep(1)
              if lb.acquire(timeout=5):
                  print('task1获取锁B')
                  lb.release()
                  print('task1释放锁B')
              la.release()
              print('task1释放锁A')
      
      
      def task2():
          if lb.acquire():
              print('task2获取锁B')
              time.sleep(1)
              if la.acquire(timeout=4):
                  print('task2获取锁A')
                  la.release()
                  print('task2释放锁A')
              lb.release()
              print('task2释放锁B')
      
      
      if __name__ == '__main__':
          t1 = Thread(target=task1)
          t2 = Thread(target=task2)
      
          t1.start()
          t2.start()
      
          t1.join()
          t2.join()
      
    • 银行家算法:换没有学习,感兴趣的话自行查阅相关文档

3、线程间通信

线程间通信也是通过Queue来完成,这里不在讲解。

示例代码:

import queue
import random
import time
from threading import Thread


def produce(q):
    i = 0
    while i < 10:
        num = random.randint(1, 100)
        q.put(num)
        print('生产者生产数据:%d' % num)
        time.sleep(0.5)
        i += 1
    q.put(None)
    # 任务结束
    q.task_done()


def consume(q, n):
    while True:
        item = q.get()
        if item is None:
            break
        print('%s获取:%d' % (n, item))
        time.sleep(1)
    # 任务结束
    q.put(None)
    q.task_done()


if __name__ == '__main__':
    q = queue.Queue(10)

    # 创建生产者
    tp = Thread(target=produce, args=(q,))
    tp.start()

    # 创建消费者
    tc1 = Thread(target=consume, args=(q, '消费者1'))
    tc2 = Thread(target=consume, args=(q, '消费者2'))
    tc1.start()
    tc2.start()

    tp.join()
    tc1.join()
    tc2.join()

3、协程

3.1、简介

协程,又称微线程,纤程,是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。

因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态(进入上一次离开时所处逻辑流的位置)

协程与线程类似,每个协程表示一个执行单元,既有自己的本地数据,也与其他协程共享全局数据和其他资源。

协程存在于线程中,需要用户来编写调度逻辑,对CPU而言,不需要考虑协程如何调度,切换上下文。

  • 优点
    • 无需线程上下文切换的开销
    • 无需原子操作锁定及同步的开销
    • 高并发+高扩展性+低成本

"原子操作(atomic operation)是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何上下文切换 (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。

3.2、greenlet

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator。

greenlet的工作流程:进行访问网络的IO操作时,出现阻塞,greenlet就显式切换到另一段没有被阻塞的代码执行,直到原来的阻塞状况消失以后,再切换回原来代码段继续处理。因此,greenlet是一种合理安排的串行方法。

greenlet.switch()可实现协程的切换,greenlet并不能实现自动切换。

示例:模拟2个耗时任务

import time

from greenlet import greenlet


def task1():
    for i in range(5):
        print('A ' + str(i))
        g2.switch()
        # 模拟耗时操作
        time.sleep(0.1)


def task2():
    for i in range(5):
        print('B ' + str(i))
        g1.switch()
        # 模拟耗时操作
        time.sleep(0.1)


if __name__ == '__main__':
    g1 = greenlet(task1)
    g2 = greenlet(task2)

    g1.switch()

3.3、gevent和猴子补丁

gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程。gevent是对greenlet进行封装,实现协程的自动切换。

通过gevent.sleep模仿IO操作,实现协程的切换。

  • 常用方法
    • gevent.spawn 用来形成协程
    • gevent.joinall 添加这些协程任务,并且执行给定的gevent,同时阻塞当前程序流程,当所有gevent执行完毕程序继续向下执行
    • gevent.sleep 模拟IO操作多少时间

示例:

import time
import gevent


def task1():
    for i in range(5):
        print('A ' + str(i))
        # 模拟耗时操作
        time.sleep(0.1)


def task2():
    for i in range(5):
        print('B ' + str(i))
        # 模拟耗时操作
        time.sleep(0.1)


if __name__ == '__main__':
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)

    g1.join()
    g2.join()

协程切换是在IO操作时自动完成,在启动时通过monkey.patch_all()实现将一些常见的阻塞,如socket,select,urllib等地方实现协程跳转,因为gevent并不能完全识别所有当前操作是否为IO操作,而未切换。

示例:

import time
import gevent
from gevent import monkey
monkey.patch_all()


def task1():
    for i in range(5):
        print('A ' + str(i))
        # 模拟耗时操作
        time.sleep(0.1)


def task2():
    for i in range(5):
        print('B ' + str(i))
        # 模拟耗时操作
        time.sleep(0.1)


if __name__ == '__main__':
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)

    g1.join()
    g2.join()

3.4、简单应用

示例:模拟爬取3个网页

from gevent import monkey

monkey.patch_all()
import requests
import gevent


def download(url):
    resp = requests.get(url)
    print('下载了{}的数据, 长度:{}'.format(url, len(resp.text)))


if __name__ == '__main__':
    urls = ['https://www.baidu.com', 'https://www.163.com', 'https://www.qq.com']

    arr = []

    for url in urls:
        g = gevent.spawn(download, url)
        arr.append(g)

    gevent.joinall(arr)

  • 注意:打补丁放在import requests之前,否则补丁打不上会报错

gevent还提供对池的支持,当拥有动态数量的greenlet需要进行并发管理(限制并发数)时,就可以使用池,在处理大量的网络或IO操作时非常重要。

示例:注意python版本

from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
import requests

def run_task(url):
    print('Visit --> %s'% url)
    try:
        res = requests.get(url)
        data = res.text
        print('%s bytes received from %s' %(len(data),url))
    except Exception as value:
        print(value)
    return 'url:%s --> finished' % url

if __name__ == '__main__':
    urls = ['https://www.baidu.com/','https://github.com/','https://www.python.org/']
    pool = Pool(2)
    result = pool.map(run_task, urls)
    print(result)

参考地址

参考视频https://www.bilibili.com/video/BV1R7411F7JV p274~281

源代码仓库地址:https://gitee.com/gaogzhen/python-study

QQ群:433529853

上一篇:OpenCV 通道分离:split() 函数


下一篇:并发编程 协程 | IO模型简介