-
同步、异步、多线程、单线程、并发、并行
多线程(异步)需要解决同步问题,单线程(同步)需要解决异步问题
并行(无共同资源,即多个CPU,每个进程对应一个CPU)
并发(有共同资源,即一个CPU,同一时段做多件事)
并行不需要解决,并发需要解决,多线程就是解决并发的处理方案
解决并发: 【 "食堂打饭模型", 中午12点,大家都涌向食堂,就是并发.人很多就是高并发.】 1. 队列, 缓冲区: 队列: 即排队. 缓冲区: 排成的队列. 优先队列: 如果有男生队伍和女生队伍,女生队伍优先打饭,就是优先队列. 2. 争抢: 锁机制: 争抢打饭,有人抢到,该窗口在某一时刻就只能为这个人服务,锁定窗口,即锁机制. 争抢也是一种高并发解决方案,但是有可能有人很长时间抢不到,所以不推荐. 3. 预处理: 统计大家爱吃的菜品,最爱吃的80%热门菜提前做好,20%冷门菜现做,这样即使有人锁定窗口,也能很快释放. 这是一种提前加载用户需要的数据的思路,预处理思想,缓存常用. 4. 并行: 开多个打饭窗口,同时提供服务. IT日常可以通过购买更多服务器,或多开线程,进程实现并行处理,解决并发问题. 这是一种水平扩展的思路. 注: 如果线程在单CPU上处理,就不是并行了. 5. 提速: 通过提高单个窗口的打饭速度,也是解决并发的方式. IT方面提高单个CPU性能,或单个服务器安装更多的CPU. 这是一种垂直扩展的思想. 6. 消息中间件: 如上地地铁站的九曲回肠的走廊,缓冲人流. 常见消息中间件: RabbitMQ, ActiveMQ(Apache), RocketMQ(阿里Apache), kafka(Apache)等.
-
后台线程和前台线程
is/setDaemon(bool):获取/设置是后台线程(默认前台线程为False)
-
如果是后台线程(当我们使用setDaemon(True)方法,设置子线程为守护线程时),主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止
-
如果是前台线程(其实就是setDaemon(False)),主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
if __name__ == '__main__': for i in range(4): t = MyThread(i) t.setDaemon(True) t.start() print('main thread end!')
-
-
join():阻塞当前上下文环境,直到调用此方法的线程终止或到达指定的timeout,即使设置了setDeamon(True)主线程依然要等待子线程结束。
join的原理就是依次检验线程池中的线程是否结束,没有结束就阻塞直到线程结束,如果结束则跳转执行下一个线程的join函数。
import threading from time import sleep lock = threading.Lock() def music(i): print('i am at music = ',i) sleep(1) lock.acquire() print('当前线程名字:{0}'.format(threading.current_thread())) lock.release() lock.acquire() print(threading.current_thread(),'->>>',threading.active_count()) lock.release() thread_list=[] for i in range(5): th = threading.Thread(target=music, args=(i,)) thread_list.append(th) th.start() for t in thread_list: t.join() # 为了阻塞主线程 print('===========这是主线程=========')
-
线程安全
import threading import time zero = 0 lock = threading.Lock() def change_zero(): global zero for i in range(5000000): lock.acquire() zero = zero + 1 # 不是原子操作 lock.release() lock.acquire() zero = zero - 1 # 不是原子操作 lock.release() th1 = threading.Thread(target= change_zero()) th2 = threading.Thread(target= change_zero()) # th1 和 th2 共享change_zero,只有一把锁 # th1 和 th2 的上下文会记住执行change_zero的状态 th1.start() th2.start() th1.join() th2.join() print(zero)
可能在一个线程执行时,两步只执行了一步x = zero + 1(即还没来得及对zero进行修改),GIL锁就给了另一个线程(不熟悉锁的概念以及GIL的可以先看这篇文章),等到GIL锁回到第一个线程,zero已经发生了改变,这时再执行接下来的zero = x,结果就不是对zero加1了。
-
i 加到 500000 使时间减少
import threading import time i = 0 lock = threading.Lock() def add_i(): global i while i < 500000: # 在这里停掉,会出现多加,即 500004 lock.acquire() i += 1 lock.release() if __name__ == '__main__': thread_list = [] for j in range(5): th = threading.Thread(target=add_i) thread_list.append(th) th.start() for t in thread_list: t.join() print(i) # 500004 # 【解决上述问题】 import threading import time i = 0 lock = threading.Lock() def add_i(): global i while True: lock.acquire() if i < 500000: i += 1 lock.release() else: lock.release() break if __name__ == '__main__': # print(time.perf_counter()) thread_list = [] for j in range(50): th = threading.Thread(target=add_i) thread_list.append(th) th.start() for t in thread_list: t.join() print(i) # 500000 # print(time.perf_counter())
-
Lock 和 RLock
- Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
- RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
# Lock对比Rlock import threading lock = threading.Lock() #Lock对象 lock.acquire() lock.acquire() #产生了死锁。 lock.release() lock.release() print lock.acquire() import threading rLock = threading.RLock() #RLock对象 rLock.acquire() rLock.acquire() #在同一线程内,程序不会堵塞。 rLock.release() rLock.release()
-
Condition类
Condition(条件变量)通常与一个锁关联。需要在多个Contidion*享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。
可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。
构造方法:
Condition([lock/rlock])实例方法:
-
acquire([timeout])/release(): 调用关联的锁的相应方法。
-
wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
-
notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
-
notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
# encoding: UTF-8 import threading import time # 商品 product = None # 条件变量 con = threading.Condition() # 生产者方法 def produce(): global product if con.acquire(): while True: if product is None: print ('produce...') product = 'anything' # 通知消费者,商品已经生产 con.notify() # 等待通知 # con.wait() time.sleep(2) # 消费者方法 def consume(): global product if con.acquire(): while True: if product is not None: print('consume...') product = None # 通知生产者,商品已经没了 con.notify() # 等待通知 con.wait() time.sleep(2) t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start() t1.start()
生产者消费者模型
import threading import time condition = threading.Condition() products = 0 class Producer(threading.Thread): def run(self): global products while True: if condition.acquire(): if products < 10: products += 1 print("Producer(%s):deliver one, now products:%s" %(self.name, products)) condition.notify()#不释放锁定,因此需要下面一句 condition.release() else: print("Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)) condition.wait()#自动释放锁定 time.sleep(2) class Consumer(threading.Thread): def run(self): global products while True: if condition.acquire(): if products > 1: products -= 1 print("Consumer(%s):consume one, now products:%s" %(self.name, products)) condition.notify() condition.release() else: print("Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)) condition.wait() time.sleep(2) if __name__ == "__main__": for p in range(0, 2): p = Producer() p.start() for c in range(0, 3): c = Consumer() c.start()
condition.notifyAll()
import threading alist = None condition = threading.Condition() def doSet(): if condition.acquire(): while alist is None: condition.wait() for i in range(len(alist))[::-1]: alist[i] = 1 print(alist[i]) condition.notify() condition.release() def doPrint(): if condition.acquire(): while alist is None: condition.wait() for i in alist: print(i) print() condition.notify() condition.release() def doCreate(): global alist if condition.acquire(): if alist is None: alist = [0 for i in range(10)] condition.notifyAll() condition.release() tset = threading.Thread(target=doSet, name='tset') tprint = threading.Thread(target=doPrint, name='tprint') tcreate = threading.Thread(target=doCreate, name='tcreate') tset.start() tprint.start() tcreate.start()
-