一堆锁
死锁现象 (重点)
死锁指的是某个资源被占用后,一直得不到释放,导致其他需要这个资源的线程进入阻塞状态.
- 产生死锁的情况
- 对同一把互斥锁加了多次
- 一个共享资源,要访问必须同时具备多把锁,但是这些锁被不同线程或者进程所持有,就会导致相互等待对方释放从而程序就卡死了
- 第二种情况的解决方法:
- 抢锁一定按照相同的顺序去抢
- 给抢锁加上超时,如果超时则放弃执行
递归锁 (了解)
-
与普通的区别
- 相同: 多线程之间都有互斥的效果
- 不同: 同一个线程可以对这个锁执行多次acquire
-
解决方法
同一个线程必须保证,加锁的次数和解锁的次数相同,其他线程才能够抢到这把锁
信号量 (了解)
可以限制同时并执行公共代码的线程数量
如果限制数量为1,则与普通互斥锁没有区别(默认为1)
from threading import Semaphore,current_thread,Thread
import time
s = Semaphore(2)
def task():
s.acquire()
time.sleep(1)
print(current_thread().name)
s.release()
for i in range(10):
Thread(target=task).start()
# 结果是每次都会执行两个子线程
GIL全局锁 (重点)
什么是GIL锁?
在cpython中,全局解释器锁(GIL,是为了阻止多个本地线程在同一时间执行python字节码的互斥锁,
因为cpython的内存管理是非线程安全的,这个锁是非常必要的,因为其他越来越多的特性依赖这个特性.
'''
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)
'''
为什么需要GIL锁
-
线程安全问题具体的表现
cpython解释器与python程序之间的关系:
- python程序本质就是一堆字符串,所以运行一个python程序时,必须要开启一个解释器,但是一个python程序中,解释器只有一个,所有代码都要交给它来解释执行;因此当多个线程都要执行代码时就会产生线程安全问题.
cpython解释器与GC线程
GC线程: 执行python变量名管理的线程
python会自动帮我们处理垃圾,清扫垃圾也是一堆代码,因此也需要开启一个线程来执行,这个线程就是GC线程.
而GC线程与我们程序中的线程就会产生安全问题
例如: 线程a 要定义一个变量
- 先申请一块内存空间
- 再把数据装进去
- 最后将引用计数加1
如果在进行到第二步的时候,CPU切换到了GC线程,GC线程就会把这个值当垃圾清理掉,这就会造成线程安全问题.
带来的问题
GIL是一把互斥锁,互斥锁将导致效率降低
具体表现:
在cpython即便开启了多线程,而且CPU也是多核的,却无法并行执行任务,因为解释器只有一个,同一时间只有一个任务在执行.
如何解决 (重点)
没办法解决,只能尽可能的避免GIL锁影响我们的效率
使用多进程能够实现并行,从而更好的利用多核CPU
-
对任务进行区分(分为两类): (重点)
-
计算密集型:
基本没有IO 大部分时间在计算,例如:人脸识别/图像处理
由于多线程不能 并行,所以应该使用多进程,将任务分给不同CPU核心
-
IO密集型:
由于网络IO速度对比CPU处理速度非常慢多线程并不会造成太大的影响
另外如有大量客户端连接服务,进程根本开不起来,只能用多线程
-
关于新能的探讨 (了解)
之所以加锁是为了解决线程安全问题,但是有了锁,导致cpython中多线程不能并行,只能并发
但是并不能急就此否认python,有一下几点原因
- python是一门语言,而GIL只是cpython解释器的问题,只是python多解释器中的一种
- 如果是单核CPU,GIL不会造成任何影响
- 由于目前大多数程序都是基于网络的,网络速度对比CPU是非常慢的,即使CPU也无法提高效率
- 对于IO密集型任务,不会有太大的影响
- 如果没有这把锁,我们程序员就必须自己来解决安全问题
性能测试
from multiprocessing import Process
from threading import Thread
import time
# # 计算密集型任务
#
# def task():
# for i in range(100000000):
# 1+1
#
#
# if __name__ == '__main__':
# start_time = time.time()
#
# ps = []
# for i in range(5):
# p = Process(target=task)
# # p = Thread(target=task)
# p.start()
# ps.append(p)
#
# for i in ps:i.join()
#
# print("共耗时:",time.time()-start_time)
# 多进程胜
# IO密集型任务
def task():
for i in range(100):
with open(r"1.死锁现象.py",encoding="utf-8") as f:
f.read()
if __name__ == '__main__':
start_time = time.time()
ps = []
for i in range(10):
p = Process(target=task)
# p = Thread(target=task)
p.start()
ps.append(p)
for i in ps:i.join()
print("共耗时:",time.time()-start_time)
# 多线程胜
GIL锁与自定义锁的区别
- GIL锁,锁住的是解释器级别的数据
- 自定义锁,锁的是解释器以外的共享资源,例如:硬盘上的文件/ 控制台,对于这种不属于解释器的数据资源就应该自己加锁处理
线程池与进程池
- 线程池: 就是装线程的容器
- 进程池: 就是装进程的容器
为什么要装到容器中
- 可以避免频繁的创建和销毁(进行/线程)所造成的资源开销
- 可以限制同时存在的线程数量,以保证服务器不会应为资源不足而导致崩溃
- 帮我们管理了线程的生命周期
- 管理了任务的分配
如果进程不结束,池子里面的进程或者线程也是一直存活的
import os
import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import activeCount,enumerate,currentThread
# # 创建一个线程池 指定最多可以容纳两个线程
# pool = ThreadPoolExecutor(20)
#
# def task():
# print(currentThread().name)
#
# # 提交任务到池子中
# pool.submit(task)
# pool.submit(task)
#
# print(enumerate())
# 进程池的使用
def task():
time.sleep(1)
print(os.getpid())
if __name__ == '__main__':
pool = ProcessPoolExecutor(2)
pool.submit(task)
pool.submit(task)
pool.submit(task)
同步与异步
-
同步:
指的是,提交任务后必须在原地等待,直到任务结束. 同步不等于阻塞
-
异步:
指的是,提交任务后不需要在原地等待,可以继续往下执行代码
异步效率高于同步,异步任务将导致一个问题:就是任务的发起方不知道任务何时处理完毕
异步/同步指的是提交任务的方式
解决方法:
-
轮询: 每隔一段时间就问一次
效率低,无法及时获取结果 (不推荐使用)
-
异步回调: 让任务的执行方主动通知
可以及时拿到任务的结果,(推荐方式)
# 异步回调
from threading import Thread
# 具体的任务
def task(callback):
print("run")
for i in range(100000000):
1+1
callback("ok")
#回调函数 参数为任务的结果
def finished(res):
print("任务完成!",res)
print("start")
t = Thread(target=task,args=(finished,))
t.start() #执行task时 没有导致主线程卡主 而是继续运行
print("over")
线程池中回调的使用
# 使用案例:
def task(num):
time.sleep(1)
print(num)
return "hello python"
def callback(obj):
print(obj.result())
pool = ThreadPoolExecutor()
res = pool.submit(task,123)
res.add_done_callback(callback)
print("over")