第三十四天- 线程队列、线程池(map/submit/shutdown/回调函数)

1.线程列队

  queue队列 :使用import queue,用法与进程Queue一样

  class queue.Queue(maxsize=0)

 # 先进先出:
q = queue.Queue(3) # 也可以不加数字表示不限
q.put('约吗')
q.put('你个糟老头')
q.put('约个鬼!')
# q.put_nowait() # 没有数据会报错 可以try解决
print(q.get())
print(q.get())
print(q.get())
q.get_nowait()

先进先出

  class queue.LifoQueue(maxsize=0)

 import queue

 # 后进先出:
q = queue.LifoQueue(4)
q.put('first')
q.put('second')
q.put('third')
q.put_nowait(1)
print(q.get())
print(q.get())
print(q.get())
print(q.get_nowait())

后进先出

  class queue.PriorityQueue(maxsize=0)

 # 设置优先
q = queue.PriorityQueue(6) # put进入元组,元组第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((3,'a'))
q.put((2,'b'))
q.put((-3,'c')) # 优先级相同的两个数据,比较第二个元素ASCII码大小,若第二元素为字符串且第一个字符相同则比较第二个
q.put((20,'ww'))
q.put((20,'ws'))
# q.put(20,{'ws',22}) # 不能是字典
# q.put((20,('w',1))) # 后面的值必须是相同数据类型才能比较,可以是元祖,ascii码顺序排序 print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# print(q.get())

这三种队列都是线程安全的,不会出现多个线程抢占同一个资源或数据的情况。

2.线程池

  线程池:早期的时候没有线程池,现在python提供了一个新的内置模块 concurrent.futures,模块里面提供了新的线程池和进程池,之前的进程池是在multiprocessing里面,现在这个在这个新的模块里面,他俩用法上是一样的。为了统一使用方式将进程池和线程池放到一起,使用threadPollExecutor和ProcessPollExecutor的方式一样,且只要通过concurrent.futures导入就可用他们两个

  基本方法:

 #2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务 #map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作 #shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前 #result(timeout=None)
取得结果 #add_done_callback(fn)
回调函数
 import time
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor def func(n):
time.sleep(0.5)
return n*n if __name__ == '__main__':
t_pool = ThreadPoolExecutor(max_workers=4)
# t_pool = ProcessPoolExecutor(max_workers=4) # 用concurrent.futures可直接转换成进程
t_list = []
for i in range(10):
res = t_pool.submit(func,i) # submit异步提交任务
# print(res.result()) # 等待res的执行结果,拿到了就运行,拿不到就阻塞,放这会变成串行
t_list.append(res) t_pool.shutdown() # 相当于进程池的pool.close() pool.join() print('主线程结束') for res1 in t_list:
print(res1.result())

线程池代码示例

 import time
from concurrent.futures import ThreadPoolExecutor def func(n):
time.sleep(1)
return n*n if __name__ == '__main__': t_pool = ThreadPoolExecutor(6)
res = t_pool.map(func,range(10)) # map 取代for循环submit的操作
t_pool.shutdown() print('主线程执行结束')
for i in res:
print(i) # map包含了for循环和submit操作

map方法

 import time
from concurrent.futures import ThreadPoolExecutor def func1(n):
time.sleep(1)
return n*n def callback(s):
# print(s)
ss = s.result()+1
print(ss) if __name__ == '__main__':
t_pool = ThreadPoolExecutor(4)
for i in range(10):
t_pool.submit(func1,i).add_done_callback(callback) # 注意map函数没有callback,不能如下使用
# res = t_pool.map(func1,range(10))
# for i in res:
# i.add_done_callback(callback)

回调函数

  需注意:注意map函数没有callback

3.GIL锁

  背景:

  一些语言(java、c++、c)是支持同一个进程中的多个线程是可以应用多核CPU的,也就是我们会听到的现在4核8核这种多核CPU技术的牛逼之处。那么我们之前说过应用多进程的时候如果有共享数据是不是会出现数据不安全的问题啊,就是多个进程同时一个文件中去抢这个数据,大家都把这个数据改了,但是还没来得及去更新到原来的文件中,就被其他进程也计算了,导致数据不安全的问题啊,所以我们是不是通过加锁可以解决啊,多线程大家想一下是不是一样的,并发执行就是有这个问题。但是python最早期的时候对于多线程也加锁,但是python比较极端的(在当时电脑cpu确实只有1核)加了一个GIL全局解释锁,是解释器级别的,锁的是整个线程,而不是线程里面的某些数据操作,每次只能有一个线程使用cpu,也就说多线程用不了多核,但是他不是python语言的问题,是CPython解释器的特性,如果用Jpython解释器是没有这个问题的,Cpython是默认的,因为速度快,Jpython是java开发的,在Cpython里面就是没办法用多核,这是python的弊病,历史问题,虽然众多python团队的大神在致力于改变这个情况,但是暂没有解决。(这和解释型语言(python,php)和编译型语言有关系吗???待定!,编译型语言一般在编译的过程中就帮你分配好了,解释型要边解释边执行,所以为了防止出现数据不安全的情况加上了这个锁,这是所有解释型语言的弊端??)

  如图:

第三十四天- 线程队列、线程池(map/submit/shutdown/回调函数)

  但是有了这个锁我们就不能并发了吗?当我们的程序是偏计算的,也就是cpu占用率很高的程序(cpu一直在计算),就不行了,但是如果你的程序是I/O型的(一般你的程序都是这个)(input、访问网址网络延迟、打开/关闭文件读写),在什么情况下用的到高并发呢(金融计算会用到,人工智能(阿尔法狗),但是一般的业务场景用不到,爬网页,多用户网站、聊天软件、处理文件),I/O型的操作很少占用CPU,那么多线程还是可以并发的,因为cpu只是快速的调度线程,而线程里面并没有什么计算,就像一堆的网络请求,我cpu非常快速的一个一个的将你的多线程调度出去,你的线程就去执行I/O操作了

  GIL锁与Lock:

  GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图

第三十四天- 线程队列、线程池(map/submit/shutdown/回调函数)

  

上一篇:转 Kafka、RabbitMQ、RocketMQ等消息中间件的对比 —— 消息发送性能和优势


下一篇:冲刺阶段 day 10