Python: threading.Semaphore & threading.BoundedSemaphore & GIL

Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 

import threading, time, logging, random

FORMAT = '%(asctime)-15s %(process)d %(lineno)-3s [%(threadName)-11s %(thread)6d] %(message)s'
logging.basicConfig(format=FORMAT, level=logging.DEBUG)


def vacuity(semaphore: threading.Semaphore):
    logging.info(f'before semaphore.acquire()')
    logging.error(semaphore.acquire(blocking=True))
    logging.critical(f'after semaphore.acquire()')


semaphore = threading.Semaphore(3)
logging.info(semaphore.acquire())
logging.info(semaphore.acquire())
logging.info(semaphore.acquire())

threading.Thread(target=vacuity, args=(semaphore,)).start()
logging.info('~' * 60)
logging.info(semaphore.acquire(blocking=False))
logging.info(semaphore.acquire(timeout=2))

logging.info(f'semaphore.release {semaphore.release()}')

Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 

import threading, time, logging, random

FORMAT = '%(asctime)-15s %(process)d %(lineno)-3s [%(threadName)-11s %(thread)6d] %(message)s'
logging.basicConfig(format=FORMAT, level=logging.DEBUG)


class Connection:
    def __init__(self, name):
        self.name = name

    def __repr__(self):
        return f'Connection: {self.name}'

    __str__ = __repr__


class Pool:
    def __init__(self, count):
        self.count = count
        self.__semaphores = threading.Semaphore(count)
        self.pool = [Connection(f'Connection-{b}') for b in range(self.count)]

    def get(self):
        # if len(self.pool) > 0:
        #     time.sleep(0.0001)
        #     return self.pool.pop()
        # else:
        #     return None
        self.__semaphores.acquire(blocking=True)
        return self.pool.pop()

    def revert(self, connection: Connection):
        self.pool.append(connection)
        self.__semaphores.release()  # 必须在后, 在前可能, 刚执行完, 就进程切换


p = Pool(3)


def get_revert(pool: Pool):
    connection = pool.get()
    logging.info(f'get {connection}')
    threading.Event().wait(random.randint(1, 5) / 1000)
    pool.revert(connection)


for b in range(100):
    threading.Thread(name=f'Thread-{b}', target=get_revert, args=(p,)).start()

Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 

 Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 

import threading, time, logging, random, datetime

FORMAT = '%(asctime)-15s %(process)d %(lineno)-3s [%(threadName)-11s %(thread)6d] %(message)s'
logging.basicConfig(format=FORMAT, level=logging.DEBUG)

commence = datetime.datetime.now()


def calc():
    sum = 0
    for _ in range(1000000000):
        sum += 1


calc()
calc()
calc()
calc()
calc()

delta = (datetime.datetime.now() - commence).total_seconds()

logging.error(delta)

单进程单线程 四核cpu Utilization Rate: 25%

Python: threading.Semaphore & threading.BoundedSemaphore & GIL

 

 

import threading, time, logging, random, datetime

FORMAT = '%(asctime)-15s %(process)d %(lineno)-3s [%(threadName)-11s %(thread)6d] %(message)s'
logging.basicConfig(format=FORMAT, level=logging.DEBUG)


def calc():
    sum = 0
    for _ in range(1000000000):
        sum += 1


commence = datetime.datetime.now()

vails = []

for _ in range(5):
    t = threading.Thread(target=calc())
    t.start()
    vails.append(t)

for vail in vails:
    vail.join()

delta = (datetime.datetime.now() - commence).total_seconds()

logging.error(delta)

单进程多线程
利用率 25%

 

上一篇:Pandas数据探索分析,分享两个神器


下一篇:java学习笔记:Storm 常用配置