1、前言
关于基本概念部分这里不再详述,可以参考之前的文章或者自行查阅相关文章。
由于python中线程的创建、属性和方法和进程很相似,这里也不再讲解。
这里重点讲解下多线程访问共享数据的相关问题。
2、多线程数据完全与解决
先看下示例:预测并执行看下结果是否和预测一致
from threading import Thread
sum = 0
def minus():
global sum
for i in range(1000000):
sum += 1
def plus():
global sum
for i in range(1000000):
sum -= 1
if __name__ == '__main__':
t1 = Thread(target=minus)
t2 = Thread(target=plus)
t1.start()
t2.start()
t1.join()
t2.join()
print('sum=',sum)
- 预测:就我们学习的数学知识而言,加减相同数字相同次数,那么结果不会改变
- 结果:不可预测
- 简单分析:对于sum的计算和赋值并不是原子操作,可细分为-取值,+1,赋值,存入内存,在但线程的情况下是没有任何问题的;在多线程的情况下,初值为0,一个线程加一个线程1减1,结果可能是0,-1,1
- 0:正常结果
- 1:减一线程执行计算,结果存入内存,加1线程执行完毕,1覆盖了-1
- -1:加一线程执行计算,结果存入内存,减一线程执行完毕,-1覆盖了1
如何解决多线程下共享数据的安全问题呢?
2.1、加锁
-
互斥锁:Lock
- 同一时刻,同一资源只能被一个线程访问
import time import threading R = threading.Lock() def sub(): global num R.acquire() # 加锁,保证同一时刻只有一个线程可以修改数据 num -= 1 R.release() # 修改完成就可以解锁 time.sleep(1) num = 100 # 定义一个全局变量 l = [] # 定义一个空列表,用来存放所有的列表 def main(): for i in range(100): # for循环100次 t = threading.Thread(target=sub) # 每次循环开启一个线程 t.start() # 开启线程 l.append(t) # 将线程加入列表l for i in l: i.join() # 这里加上join保证所有的线程结束后才运行下面的代码 print(num) if __name__ == '__main__': main()
-
递归锁(可重入):RLock
- 同一线程可多次获取同一资源
import time import threading lock = threading.RLock() def minus1(): global num2 lock.acquire() num2 -= 1 lock.release() def minus2(): global num1 # 在每个线程中都获取这个全局变量 lock.acquire() minus1() time.sleep(1) num1 -= 1 # 对此公共变量进行-1操作 print('%s--get num1:%s,num2:%s' % (threading.current_thread().name, num1, num2)) lock.release() num1, num2 = 5, 9 # 设定共享变量 thread_list = [] if __name__ == '__main__': for i in range(5): t = threading.Thread(target=minus2) t.start() thread_list.append(t) while threading.active_count() != 1: print(threading.active_count()) time.sleep(1) else: print('----all threads done---') print('final num:', num1, num2)
2.2、信号量:semaphore
- 互斥锁允许同一时刻只有一个线程修改数据,Semaphore 允许同一时刻运行一定数量的线程。
import time
import threading
semaphore = threading.BoundedSemaphore(3)
def task():
semaphore.acquire()
print('{}---running-{}'.format(threading.current_thread().name,
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))))
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
l = []
for i in range(10):
t = threading.Thread(target=task, name='task' + str(i))
t.start()
l.append(t)
for i in l:
i.join()
2.3、Events:事件驱动
- Event对象通过设置/清除标志位来实现和其他线程的同步,例如交通灯来修改Event的标志位来控制车辆线程的状态。
import threading, time, random
events = threading.Event()
def lighter():
if not events.isSet():
events.set() # 初始化绿灯Event set
counter = 0
while True:
if counter < 5:
print('\033[42;0mGreen is lighten...\033[0m')
elif counter < 10:
if events.isSet():
events.clear()
print('\033[41;0mRed is lighten...\033[0m')
else:
counter = 0
print('\033[42;1m--green light on---\033[0m')
events.set()
time.sleep(1)
counter += 1
def car(i):
while True:
if events.isSet():
print("car[%s] is running..." % i)
time.sleep(random.randrange(10))
else:
print('car is waiting green lighten...')
events.wait()
if __name__ == '__main__':
lighter1 = threading.Thread(target=lighter)
lighter1.start()
for i in range(3):
t = threading.Thread(target=car, args=(i,))
t.start()
2.4、死锁
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源时,就会造成死锁。
尽管死锁很少发生,但一旦发生就会造成应用的停止响应。
示例代码:
import time
from threading import Thread, Lock
la = Lock()
lb = Lock()
def task1():
if la.acquire():
print('task1获取锁A')
time.sleep(1)
if lb.acquire():
print('task1获取锁B')
lb.release()
print('task1释放锁B')
la.release()
print('task1释放锁A')
def task2():
if lb.acquire():
print('task2获取锁B')
time.sleep(1)
if la.acquire():
print('task2获取锁A')
la.release()
print('task2释放锁A')
lb.release()
print('task2释放锁B')
if __name__ == '__main__':
t1 = Thread(target=task1)
t2 = Thread(target=task2)
t1.start()
t2.start()
t1.join()
t2.join()
-
解决死锁
-
添加超时时间
import time from threading import Thread, Lock la = Lock() lb = Lock() def task1(): if la.acquire(): print('task1获取锁A') time.sleep(1) if lb.acquire(timeout=5): print('task1获取锁B') lb.release() print('task1释放锁B') la.release() print('task1释放锁A') def task2(): if lb.acquire(): print('task2获取锁B') time.sleep(1) if la.acquire(timeout=4): print('task2获取锁A') la.release() print('task2释放锁A') lb.release() print('task2释放锁B') if __name__ == '__main__': t1 = Thread(target=task1) t2 = Thread(target=task2) t1.start() t2.start() t1.join() t2.join()
-
银行家算法:换没有学习,感兴趣的话自行查阅相关文档
-
3、线程间通信
线程间通信也是通过Queue来完成,这里不在讲解。
示例代码:
import queue
import random
import time
from threading import Thread
def produce(q):
i = 0
while i < 10:
num = random.randint(1, 100)
q.put(num)
print('生产者生产数据:%d' % num)
time.sleep(0.5)
i += 1
q.put(None)
# 任务结束
q.task_done()
def consume(q, n):
while True:
item = q.get()
if item is None:
break
print('%s获取:%d' % (n, item))
time.sleep(1)
# 任务结束
q.put(None)
q.task_done()
if __name__ == '__main__':
q = queue.Queue(10)
# 创建生产者
tp = Thread(target=produce, args=(q,))
tp.start()
# 创建消费者
tc1 = Thread(target=consume, args=(q, '消费者1'))
tc2 = Thread(target=consume, args=(q, '消费者2'))
tc1.start()
tc2.start()
tp.join()
tc1.join()
tc2.join()
3、协程
3.1、简介
协程,又称微线程,纤程,是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。
因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态(进入上一次离开时所处逻辑流的位置)
协程与线程类似,每个协程表示一个执行单元,既有自己的本地数据,也与其他协程共享全局数据和其他资源。
协程存在于线程中,需要用户来编写调度逻辑,对CPU而言,不需要考虑协程如何调度,切换上下文。
- 优点
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销
- 高并发+高扩展性+低成本
"原子操作(atomic operation)是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何上下文切换 (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
3.2、greenlet
greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator。
greenlet的工作流程:进行访问网络的IO操作时,出现阻塞,greenlet就显式切换到另一段没有被阻塞的代码执行,直到原来的阻塞状况消失以后,再切换回原来代码段继续处理。因此,greenlet是一种合理安排的串行方法。
greenlet.switch()可实现协程的切换,greenlet并不能实现自动切换。
示例:模拟2个耗时任务
import time
from greenlet import greenlet
def task1():
for i in range(5):
print('A ' + str(i))
g2.switch()
# 模拟耗时操作
time.sleep(0.1)
def task2():
for i in range(5):
print('B ' + str(i))
g1.switch()
# 模拟耗时操作
time.sleep(0.1)
if __name__ == '__main__':
g1 = greenlet(task1)
g2 = greenlet(task2)
g1.switch()
3.3、gevent和猴子补丁
gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程。gevent是对greenlet进行封装,实现协程的自动切换。
通过gevent.sleep模仿IO操作,实现协程的切换。
- 常用方法
- gevent.spawn 用来形成协程
- gevent.joinall 添加这些协程任务,并且执行给定的gevent,同时阻塞当前程序流程,当所有gevent执行完毕程序继续向下执行
- gevent.sleep 模拟IO操作多少时间
示例:
import time
import gevent
def task1():
for i in range(5):
print('A ' + str(i))
# 模拟耗时操作
time.sleep(0.1)
def task2():
for i in range(5):
print('B ' + str(i))
# 模拟耗时操作
time.sleep(0.1)
if __name__ == '__main__':
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
g1.join()
g2.join()
协程切换是在IO操作时自动完成,在启动时通过monkey.patch_all()实现将一些常见的阻塞,如socket,select,urllib等地方实现协程跳转,因为gevent并不能完全识别所有当前操作是否为IO操作,而未切换。
示例:
import time
import gevent
from gevent import monkey
monkey.patch_all()
def task1():
for i in range(5):
print('A ' + str(i))
# 模拟耗时操作
time.sleep(0.1)
def task2():
for i in range(5):
print('B ' + str(i))
# 模拟耗时操作
time.sleep(0.1)
if __name__ == '__main__':
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
g1.join()
g2.join()
3.4、简单应用
示例:模拟爬取3个网页
from gevent import monkey
monkey.patch_all()
import requests
import gevent
def download(url):
resp = requests.get(url)
print('下载了{}的数据, 长度:{}'.format(url, len(resp.text)))
if __name__ == '__main__':
urls = ['https://www.baidu.com', 'https://www.163.com', 'https://www.qq.com']
arr = []
for url in urls:
g = gevent.spawn(download, url)
arr.append(g)
gevent.joinall(arr)
- 注意:打补丁放在import requests之前,否则补丁打不上会报错
gevent还提供对池的支持,当拥有动态数量的greenlet需要进行并发管理(限制并发数)时,就可以使用池,在处理大量的网络或IO操作时非常重要。
示例:注意python版本
from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
import requests
def run_task(url):
print('Visit --> %s'% url)
try:
res = requests.get(url)
data = res.text
print('%s bytes received from %s' %(len(data),url))
except Exception as value:
print(value)
return 'url:%s --> finished' % url
if __name__ == '__main__':
urls = ['https://www.baidu.com/','https://github.com/','https://www.python.org/']
pool = Pool(2)
result = pool.map(run_task, urls)
print(result)
参考地址:
参考视频:https://www.bilibili.com/video/BV1R7411F7JV p274~281
源代码仓库地址:https://gitee.com/gaogzhen/python-study
QQ群:433529853