多线程
(博客为本人学习与总结使用,内容借鉴了较多网络内容,进行的归总
爱生活 love11)
线程也叫轻量级进程,是操作系统中进行运算调度的最小单位,它被包含在进程当中,是进程实际运作单位。线程不拥有系统资源,但是它能够跟同属一个进程的其他线程共享进程所拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个进程间线程能够并行运行。
线程的优点
线程在程序中是独立的、并发的执行流。与分割的进程相比,线程之间的隔离程度较低,他们共享内存、文件句柄以及其他进程应有的状态。
因为线程的划分尺度小于进程,使得线程的并发性高。进程在执行过程中拥有独立的内存单元,而多个线程能够共享内存,从而极大的提高了程序的运行效率。
线程比进程具有更高的性能,这是由于同一个进程中的多个线程能够共享同一个进程的虚拟空间。线程共享的环境包括进程代码块,进程的公有数据等,利用这些公有数据,线程之间很容易进行通信。
操作系统在创建进程是需要为进程分配独立的内存空间,并分配大量的相关资源,但创建线程要简单的多。因此使用多线程实现并发比使用多线程实现并发性能要高的多。
线程的实现
线程的实现有两种方式
普通创建模式
from threading import Thread,current_thread
import time
def run(n):
print(current_thread().name,"run")
time.sleep(n)
print(current_thread().name,'sleep',n)
if __name__ == '__main__':
t1 = Thread(target=run,args=(2,))
t2 = Thread(target=run,args=(3,))
t1.start()
t2.start()
自定义线程,通过继承方式创建线程,本质是重写run’方法
from threading import Thread,current_thread
import time
class MyThread(Thread):
def __init__(self,n):
super(MyThread, self).__init__()
self.n = n
def run(self):
print(current_thread().name, "run")
time.sleep(self.n)
print(current_thread().name, 'sleep', self.n)
if __name__ == '__main__':
t1 = MyThread(2)
t2 = MyThread(3)
t1.start()
t2.start()
Thread实例对象的一些方法:
isAlive(): 返回线程是否活动的。已经修改为is_alive()
getName(): 返回线程名。
setName(): 设置线程名。
threading模块提供的一些方法:
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
守护线程
无论是进程还是线程,都遵循一个规则,守护线程会等待主线程运行完毕后销毁
需要注意的是,运行完毕并不一定是终止运行
对主进程来说,运行完毕是指主进程的代码块运行完毕
对主线程来说,运行完毕是指主线程内所有非守护线程运行完毕,主线程才算运行完毕
- 主进程在其代码运行结束之后就已经算运行完毕了(守护进程在此时就被收回),然后主进程一直等待非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会完全结束。
- 主线程在其它非守护线程运行完毕才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源会被回收,而进程必须保证非守护线程全部运行结束才能算运行完毕。
import threading
import time
def run(n):
print(threading.current_thread().name,'run')
time.sleep(n)
print(threading.current_thread().name,'end')
if __name__ == '__main__':
t1 = threading.Thread(target=run,args=(2,))
t2 = threading.Thread(target=run,args=(3,))
t2.setDaemon(True)#将t2设置为守护线程
t1.start()
t2.start()
print('main end')#主代码块运行结束,等待非守护线程运行结束
代码里将t2作为了守护线程,t1等待2秒,t2等待3秒,当t1,t2都start之后,主线程先运行完其代码块,然后等待非守护线程的t1运行结束,然后回收资源,所以t2线程不打印end信息。
互斥锁
多线程之间的数据是共享的,如何在多个线程抢占同一个资源的同时,保证数据的安全,以及合理的顺序,这就是互斥锁出现的必要条件。
现有需求,订单需要生成1000个杯子,雇佣10个工人进行生产
import threading
import time
def work(count = 10):
flag = False
print('i am work for you')
while True:
if len(cups) >= count:
flag = True
time.sleep(0.01)
if not flag:
cups.append(1)
if flag:
break
if __name__ == '__main__':
cups = []
work_list = []
for i in range(10):
t = threading.Thread(target=work,args=(1000,))
work_list.append(t)
t.start()
for i in work_list:
i.join()
print(len(cups))
print('main end')
此时判断失效,多生产了9个杯子。
将资源进行加锁
import threading
import time
from threading import Lock
def work(count = 10):
flag = False
print('i am work for you')
while True:
lock.acquire()
if len(cups) >= count:
flag = True
time.sleep(0.01)
if not flag:
cups.append(1)
lock.release()
if flag:
break
print(len(cups))
if __name__ == '__main__':
cups = []
work_list = []
lock = Lock()
for i in range(10):
t = threading.Thread(target=work,args=(1000,))
work_list.append(t)
t.start()
'''for i in work_list:
i.join()'''
print('main end')
一般来说,加锁后还需要一些代码实现,在释放锁之前还有可能出现异常,如果出现异常导致锁无法释放,但是当前线程可能会因为这个异常终止了,这就产生了死锁。
所以,加锁、解锁都是成对出现的,为了避免因为异常导致锁未释放的情况,可以使用try/finally语句保证锁最后被释放,或者使用with语句,通过上下文管理器管理加锁,解锁的过程。所以上述代码的加、解锁的过程可以做如下修改。
1.通过try/finally进行加锁
try:
lock.acquire()
if len(cups) >= count:
flag = True
time.sleep(0.01)
if not flag:
cups.append(1)
finally:
lock.release()
if flag:
break
2.使用witth进行加锁
with lock:
if len(cups) >= count:
flag = True
time.sleep(0.01)
if not flag:
cups.append(1)
if flag:
break
锁的使用场景
锁适用于访问和修改同一个共享资源的情况,即读写同一个资源的时候。如果是全部都是读取同一个资源的情况,不需要进行加锁。因为读取不会改变共享资源的值,每次读取的值都是一样的,可以认为该共享资源是不可变的,所以不需要加锁。
使用锁时,应该注意:
- 少用锁。必要的时候用锁,使用了锁,多线程访问时就变成了串行,要么排队,要么争抢
- 加锁的时间越短越好,不需要了就立刻释放锁。
- 一定要避免死锁。
死锁
当线程A持有独占锁a,并尝试去获取独占锁b的同时,线程B持有独占锁b,并尝试获取独占锁a的情况下,就会发生AB两个线程由于互相持有对方需要的锁,而发生阻塞现象,我们称之为死锁。即死锁是指多个进程因为竞争资源而造成的一种僵局,若无外力作用,这些进程都将无法推进。
死锁产生的原因
竞争系统资源
进程运行推进的顺序不当
资源分配不当
死锁产生的四个必要条件
互斥条件:一个资源每次只能被一个进程使用
请求与保持条件:一个进程因请求资源而阻塞时,对方已获得的资源保持不放
不剥夺条件:进程已获得的资源,在未使用完之前,不能被强行剥夺
循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系
解决死锁的办法
减少资源占用的时间,可以降低死锁发生的概率
银行家算法,银行家算法的本质是优先满足占用资源较少的任务
理解了死锁的原因,尤其是死锁产生的四个必要条件,就可以最大的避免、预防和解除死锁。
产生死锁的代码
from threading import Thread,current_thread,Lock
import time
def task1(lock1,lock2):
if lock1.acquire():
print('%s获取到lock1的锁'%current_thread().name)
for i in range(5):
print('%s ----------> %d'%(current_thread().name,i))
time.sleep(0.01)
if lock2.acquire():
print('%s获取到lock1,lock2的锁'%current_thread().name)
lock2.release()
lock1.release()
def task2(lock1,lock2):
if lock2.acquire():
print('%s获取到lock2的锁' % current_thread().name)
for i in range(5):
print('%s ----------> %d' % (current_thread().name , i))
time.sleep(0.01)
if lock1.acquire():
print('%s获取到lock1,lock2的锁' % current_thread().name)
lock1.release()
lock2.release()
if __name__ == '__main__':
lock1 = Lock()
lock2 = Lock()
t1 = Thread(target=task1,args=(lock1,lock2))
t2 = Thread(target=task2,args=(lock1,lock2))
t1.start()
t2.start()
t1首先获取锁lock1,t2获取lock2,t1在lock2未释放的情况下再去获取lock2的锁,如果获取不到则将等待,并且不释放lock1的锁。t2再lock1未释放的情况下去获取lock1的锁,如果获取不到则将等待,并且不释放lock2的锁。两个进程互相竞争资源,产生死锁。
递归锁
为了解决死锁的问题,为了支持在同一个线程中多次请求同一个资源的情况,python提供了可重入锁,RLOCK。
RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。因此使用递归锁对上面的代码进行修改。
from threading import Thread,current_thread,Lock,RLock
import time
def task1(lock1,lock2):
if lock1.acquire():
print('%s获取到lock1的锁'%current_thread().name)
for i in range(5):
print('%s ----------> %d'%(current_thread().name,i))
time.sleep(0.01)
if lock2.acquire():
print('%s获取到lock1,lock2的锁'%current_thread().name)
lock2.release()
lock1.release()
def task2(lock1,lock2):
if lock2.acquire():
print('%s获取到lock2的锁' % current_thread().name)
for i in range(5):
print('%s ----------> %d' % (current_thread().name , i))
time.sleep(0.01)
if lock1.acquire():
print('%s获取到lock1,lock2的锁' % current_thread().name)
lock1.release()
lock2.release()
if __name__ == '__main__':
lock1 = lock2= RLock()
t1 = Thread(target=task1,args=(lock1,lock2))
t2 = Thread(target=task2,args=(lock1,lock2))
t1.start()
t2.start()
通过递归锁能够将产生死锁的代码解决死锁的问题,但是需要注意的是,lock1与lock2需要的是同一个锁。
Queue消息队列
上述操作都是通过加锁来控制共享数据的,保证数据的准确性。
python中提供了更加方便的线程间的通信机制,queue消息队列。
queue中提供了三类,代表了三种队列:
- Queue(maxsize=0),先进先出队列。若给定最大值,队列没有空间时阻塞,否则是无限队列。
- LifoQueue(maxsize=0),后进先出队列,相当于栈。
- PriorityQueue(maxsize=0),创建一个优先级队列。
Queue对象的方法:
- qsize():返回队列大小,是近似值(返回时可能队列大小被修改了)
- empty():判断队列是否为空
- full():判断队列是否为满
- put(item, block=True, timeout=None):将item加入队列,可选阻塞和阻塞时间
- put_nowait(item):即put(item, False)
- get(block=True, timeout=None):从队列中获取元素,可选阻塞和阻塞时间
- get_nowait():即get(False)
- task_done():用于表示队列中某个元素已经执行完成,会被join()调用
- join():队列中所有元素执行完毕并调用task_done()信号之前,保持阻塞
Queue模块异常:
- Empty:对空队列调用 get(timeout=n),如果等待n秒钟队列还是空的就会抛出异常
- Full:对满队列调用put(item,timeout=n),如果等待n秒钟仍然是满的就会抛出异常
简单的queue代码实例:
import random
import time
from queue import Queue
queue = Queue(3)
queue.put('香蕉')
queue.put('榴莲')
queue.put('西瓜')
print(queue.get())
print(queue.get())
print(queue.get())
queue在声明时,需要指定队列大小
使用Queue实现线程间的通信
from queue import Queue
from threading import Thread,current_thread
import time
import random
def producer(queue):
print('{}开门了'.format(current_thread().name))
foods = ['红烧狮子头', '香肠烤饭', '蒜蓉生蚝', '酸辣土豆丝', '肉饼']
for i in range(20):
food = random.choice(foods)
print('{}正在加工中'.format(food))
time.sleep(1)
print('{}做完了'.format(food))
queue.put(food)
queue.put(None)
def consumer(queue):
print('{}来吃饭了'.format(current_thread().name))
while True:
food = queue.get()
if food:
print('正在享用美食{}'.format(food))
time.sleep(0.5)
else:
print('{}把饭店吃光了,走人。。。'.format(current_thread().name))
break
if __name__ == '__main__':
queue = Queue(8)
t1 = Thread(target=producer,name = '肉饼',args=(queue,))
t2 = Thread(target=consumer,name='坤坤',args=(queue,))
t1.start()
t2.start()
信号量
semaphore管理一个内置的计数器
每当调用acquire时内置计数器-1
每当调用release时内置计数器+1
计数器不能小于0,当计数器为0时,acquire将阻塞,直到其他线程调用release
例如,公共厕所内只有5个坑,每次上厕所最多能有5个人同时上厕所,多了的人需要等待其他人上完了,才能上。
from threading import Thread,current_thread,Semaphore
import time
import random
def go_publice_wc():
sem.acquire()
print('{}正在上厕所'.format(current_thread().name))
time.sleep(0.5)
sem.release()
if __name__ == '__main__':
sem = Semaphore(5)
for i in range(20):
t = Thread(target=go_publice_wc)
t.start()
锁和信号量
锁只允许同一时间只有一个线程独占资源,它时特殊的信号量,即信号量计数器初始为1
信号量,允许有限个线程访问共享资源
全局解释器
由于全局解释器的存在,python的多线程是假的!!!!!
首先,一些语言(java,c,c++)是支持同一个进程中的多线程是可以应用多核cpu的,也就是我们现在听到的4核8核这种多核cpu技术的牛逼之处。那么我们之前说过应用多进程的时候如果有共享数据是不是会出现数据不安全的问题啊。就是多个进程同时进一个文件中去抢一个数据,大家把这个数据改了,但是还没有更新到原来的文件中,就被其他进程也计算了,这就导致了数据不安全的问题,这种问题在并发程序中确实会存在这样的问题。
python使用了GIL(全局解释器锁),这个是解释器级别的,锁的是整个线程,而不是对线程中的某些数据操作,每次只能有一个线程使用cpu,也就是说多线程用不了多核,但是这不是python语言的问题,是cpython解释器的特性,如果是jpython解释器就没有这种问题。
但是有了这个锁,我们就没办法并发了嘛。当我们的程序是偏计算的,也就是cpu占用很高(cpu)一直在计算,就不行了。但是如果是I\O密集型的(input,访问网址网络延迟,打开/关闭文件,文件读写)就可以。在多用户网站、爬网页、聊天软件、处理文件等操作一般很少占用cpu,那么多线程还是可以并发的,因为cpu只是快速的调度线程,而线程并没有什么计算,就像一对网络请求,cpu快速的将一个一个的线程调度出去,线程自己去执行I\O操作。
- 在python2.x里,GIL的释放逻辑是当前线程遇见IO操作或者ticks计数达到100时进行释放。(ticks可以看作是python自身的一个计数器,专门做用于GIL,每次释放后归零,这个计数可以通过sys.setcheckinterval 来调整)。而每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。并且由于GIL锁存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程才能执行),这就是为什么在多核CPU上,python的多线程效率并不高。
- 在python3.x中,GIL不使用ticks计数,改为使用计时器(执行时间达到阈值后,当前线程释放GIL),这样对CPU密集型程序更加友好,但依然没有解决GIL导致的同一时间只能执行一个线程的问题,所以效率依然不尽如人意。
GIL vs Lock
GIL已经保证了同一时间只有一个线程占用cpu,为什么还需要Lock。
首先我们需要明确,Lock的目的是为了保护共享数据,同一时间只有一个线程修改共享数据。
然后,我们得出结论,不同的锁保护不同的数据。
最后,问题就很明朗了。GIL和Lock是两把锁,保护的数据不一样。前者是解释器级别的锁(保护的是解释器级别的数据,例如垃圾回收的数据),后者保护的是自己开发的应用程序的数据,很明显GIL并不负责这件事,只能用户自定义加锁处理,这就需要Lock。
过程分析:
所有线程抢的是GIL的锁,或者说抢的是执行权限。
线程1抢到GIL,拿到执行权限,开始执行,然后加了Lock,还没有执行完毕,即线程1还没有释放Lock,有可能线程2抢到GIL,开始执行,执行过程发现Lock还没有被线程1释放,于是线程2进入阻塞状态,被夺走执行权限。有可能线程1又拿到了执行权限,然后正常执行,直到释放Lock,这就导致了串行的效果。
但是,以下代码仍然达到了串行的效果。
t1.start()
t1.join()
t2.start()
t2.join()
其实这与加锁是不同的。JOIN是直接强制其他线程进入阻塞,等待该线程结束,锁的是该线程所有的代码,而Lock锁的仅仅只是一部分操作共享数据的代码,Lock释放之后就进入并行状态。
因为python解释器帮你自动定期进行内存回收,你可以理解为python有一个独立的线程,每一段时间内它起wake up做一次全局轮询看看哪些数据是可以被清空的,此时你的程序里的线程和python解释器自己的线程是在并行的。假如你的线程删除了一个变量,python的解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其他线程又正好重新个这个还没来得及清空的内存空间赋值了,结果有可能就是这个新赋值的数据被删除了。为了解决类似问题,python解释器简单的加了个锁,即当一个线程运行时,其他人都不能动,这就解决了上述问题。
事件EVENT
线程的一个特性是每个线程都是独立运行且状态不可预测的。如果程序中的其他线程需要通过判断某个线程的状态来确定自己的下一步操作,这时线程同步问题将变得十分棘手。为了解决这个问题,我们可以通过使用threading库中的Event对象。该对象包含一个可由线程设置的信号标志,它允许线程等待某件事情的发生。在初始情况下,Event对象中的信号标志被设置为False。如果有线程等待一个Event对象,而这个Event对象的标志为False,那么这个线程将会被一直阻塞,直到该标志为True。一个线程如果将一个Event对象的标志设置为True,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行。
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
使用信号量模拟红绿灯
#利用event模拟红绿灯
from threading import Thread,Event
import time
def lighter():
count = 0
event.set() #初始值为绿灯
while True:
if 5 < count <= 10:
event.clear() #红灯,清除标志位
print("\33[41;1mred light is on...\033[0m")
elif count > 10:
event.set() #绿灯,设置标志位
count = 0
else:
print("\33[42;1mgreen light is on...\033[0m")
time.sleep(5)
count += 1
def car(name):
while True:
if event.isSet():
print("[%s] running..." % name)
time.sleep(2)
else:
print("[%s] sees red light,waiting..." % name)
event.wait()
print("[%s] green light is on,start going..." % name)
if __name__ == '__main__':
event = Event()
light = Thread(target=lighter, )
light.start()
car = Thread(target=car, args=("MINI",))
car.start()
条件condition
在python cookbook中建议event作为一次性事件使用 ,即一旦调用了set()方法,就弃用此event,因为多次使用clear()可能会因为程序员考虑上的疏忽造成线程混乱。如果线程打算重复通知某个时间,最好使用Condition对象来处理。
构造方法Condition(lock=None),可以传入一个Lock或RLock对象,默认是RLock
Conditon用于生产者,消费者模型,为了解决生产者消费者速度匹配问题
acquire(*args) : 获取锁
wait(self,time=None) : 等待或超时
notify(n=1) : 唤醒至多指定数目个数的等待线程,没有等待的线程就没有任何操作
notif_all() : 唤醒所有等待的线程
实现场景:当a同学王火锅里面添加鱼丸加满后(最多5个,加满后通知b去吃掉),通知b同学去吃掉鱼丸(吃到0的时候通知a同学继续添加)
# coding=utf-8
import threading
import time
con = threading.Condition()
num = 0
# 生产者
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
# 锁定线程
global num
con.acquire()
while True:
print "开始添加!!!"
num += 1
print "火锅里面鱼丸个数:%s" % str(num)
time.sleep(1)
if num >= 5:
print "火锅里面里面鱼丸数量已经到达5个,无法添加了!"
# 唤醒等待的线程
con.notify() # 唤醒小伙伴开吃啦
# 等待通知
con.wait()
# 释放锁
con.release()
# 消费者
class Consumers(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
con.acquire()
global num
while True:
print "开始吃啦!!!"
num -= 1
print "火锅里面剩余鱼丸数量:%s" %str(num)
time.sleep(2)
if num <= 0:
print "锅底没货了,赶紧加鱼丸吧!"
con.notify() # 唤醒其它线程
# 等待通知
con.wait()
con.release()
p = Producer()
c = Consumers()
p.start()
c.start()
condition对线程启动顺序敏感
condition有两层锁:
- 一把底层锁会在线程调用了wait方法的时候释放(是先创建一把锁(这就是第二把锁),然后再release),底层还是Lock或者RLock,
- 第二把锁会在每次调用wait的时候创建一把新的,并放入到cond的等待队列中(采用的是dqueue),一直acquire,等到notify方法的唤醒。notify方法会出队一把锁,这把锁就是刚刚创建的锁,然后release。
栅栏Barrier
有人翻译成栅栏,建设理解成屏障,可以想象成路障,道闸
Barrier(parties, action=None, timeout=None): 构建Barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值
n_waiting : 当前在屏障中等待的线程数
parties : 各方数,就是需要多少个等待
wait(timeout=None) : 等待通过屏障,返回0到线程数-1的整数,每个线程返回不同,如果wait方法设置了超时,并超时发送,屏障将处于broken状态。
Barrier实例:
broken :如果屏障处于打破的状态,返回True
abort() : 将屏障置于broken状态,等待中的线程或者调用等待方法的线程中都会抛出BrokenBarrierError异常,直到reset方法来恢复屏障
reset(): 恢复屏障,重新开始拦截
import threading
def plyer_display():
print('初始化通过完成,音视频同步完成,可以开始播放....')
def player_init(statu):
print(statu)
try:
# 设置超时时间,如果2秒内,没有达到障碍线程数量,
# 会进入断开状态,引发BrokenBarrierError错误
barrier.wait(2)
except Exception as e: # 断开状态,引发BrokenBarrierError错误
print("等待超时了... ")
else:
print("xxxooooxxxxxooooxxxoooo")
if __name__ == '__main__':
# 设置3个障碍对象
barrier = threading.Barrier(3, action=plyer_display, timeout=None)
statu_list = ["init ready", "video ready", "audio ready"]
thread_list = list()
for _ in range(0, 3):
for i in range(0, 3):
t = threading.Thread(target=player_init, args=(statu_list[i],))
t.start()
thread_list.append(t)
for t in thread_list:
t.join()
'''
init ready
video ready
audio ready
初始化通过完成,音视频同步完成,可以开始播放....
xxxooooxxxxxooooxxxoooo
xxxooooxxxxxooooxxxoooo
xxxooooxxxxxooooxxxoooo
'''
重新设置reset()
# 导入线程模块
import threading
def plyer_display():
print('初始化通过完成,音视频同步完成,可以开始播放....')
# 设置3个障碍对象
barrier = threading.Barrier(3, action=plyer_display, timeout=None)
def player_init(statu):
while True:
print(statu)
try:
# 设置超时时间,如果2秒内,没有达到障碍线程数量,
# 会进入断开状态,引发BrokenBarrierError错误
barrier.wait(2)
except Exception as e: # 断开状态,引发BrokenBarrierError错误
# print("断开状态... ")
continue
else:
print("xxxooyyyxxxooyyyxxxooyyy")
break
if __name__ == '__main__':
statu_list = ["init ready","video ready","audio ready"]
thread_list = list()
for i in range(0,3):
t = threading.Thread(target=player_init,args=(statu_list[i],))
t.start()
thread_list.append(t)
if i == 1: # 重置状态
print("不想看爱情片,我要看爱情动作片....")
barrier.reset()
for t in thread_list:
t.join()
线程池
系统启动一个新线程的成本是比较高的,因为涉及到与操作系统的交互。在这种情况下,使用线程池能够很好的提高性能,尤其是当程序中需要创建大量生存期很短暂的线程,更应该考虑使用线程池。
线程池在系统启动时就创建大量空闲线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当函数执行结束之后,线程并不会死亡,而是返回到线程池中变成空闲状态,等待执行下一个函数。
此外线程池还能够有效的控制系统中并发线程的数量。当系统中包含大量的并发线程时,会导致系统性能极具下降,甚至导致python解释器崩溃,而线程池最大的线程数参数能够控制系统中并发线程的数量。
线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即
ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
Exectuor 提供了如下常用方法:
- submit(fn,*args,**kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,**kwargs 代表以关键字参数的形式为 fn 函数传入参数。
- map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
- shutdown(wait=True):关闭线程池。程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
- 在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
使用线程池来执行线程任务的步骤如下:
- 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
- 定义一个普通函数作为线程任务。
- 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
- 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown()