和Lock很像,信号量对象内部维护一个倒计算器,每次调用acquire()都会 -1 , 当acquire()时,计算器为0就会阻塞请求线程,直到其它线程对信号量 release()后,计数器 +1,恢复阻塞线程
name | implication |
Semaphore(value=1) | 构造方法,value<0,抛ValueError异常 |
acquire(blocking=True,timeout=None) | 获取Semaphore,计数器 -1,获取成功返回True |
release() | 释放信号量,计数器+1 |
计数器永远>=0
import threading,logging,time FORMAT='%(asctime)-15s\t [ %(threadName)s %(thread)8d ] %(message)s' logging.basicConfig(level=logging.ERROR,format=FORMAT) def w(barrier:threading.Semaphore): logging.error('in sub thread') logging.error(s.acquire()) logging.error('sub thread over') s=threading.Semaphore(3) logging.error(s.acquire()) logging.error(s.acquire()) logging.error(s.acquire()) # logging.error(s.acquire()) threading.Thread(target=w,args=(s,)).start() threading.Event().wait(2) logging.error(s.acquire(blocking=False)) logging.error(s.acquire(timeout=3)) logging.error(s.release())
连接池:
连接池应该有容量,有一个工厂方法可以获取连接,能够把不用的连接返回
import threading,logging,time FORMAT='%(asctime)-15s\t [ %(threadName)s %(thread)8d ] %(message)s' logging.basicConfig(level=logging.ERROR,format=FORMAT) class Conn: def __init__(self,name): self.name=name class Pool: def __init__(self,num,C:Conn): self.num=num self.__pool=[C('conn-{}'.format(b)) for b in range(num)] self.semaphore=threading.Semaphore(num) def acquire(self): # if len(self.__pool)>0: # return self.__pool.pop() # else: # return None self.semaphore.acquire() return self.__pool.pop() def release(self,conn:Conn): self.__pool.append(conn) self.semaphore.release() pool=Pool(3,Conn) pool.acquire() pool.acquire() pool.acquire()
class Conn: def __init__(self,name): self.name=name class Pool: def __init__(self,num:int): self.num=num self.__pool=[self.__connect('conn-{}'.format(b)) for b in range(self.num)] def __connect(self,name): return Conn(name) def acquire(self): if len(self.__pool)>0: return self.__pool.pop() else: pass def release(self,conn:Conn): self.__pool.append(conn)
上面acquire()方法,在多线程的时候,有线程安全问题
假设池中正好有一个链接,有可能多个线程判断池的长度是>0的,当一个线程拿走了连接对象,其他线程再来pop就会抛异常?
- 加锁,在读写的地方加锁
- 使用信号量Semaphore