条件锁condition与Queue()

在学习之前你应该先了解锁和队列基础

import queue
import time
import random
import threading
import asyncio
import   logging
# from queue import Empty
logging.basicConfig(level = logging.INFO,format = '%(asctime)s  - %(levelname)s -->%(funcName)s  at line %(lineno)d: \n %(message)s')
log= logging.getLogger()
# queue.qsize 不可以限制,作为应答式condition
q_init = queue.Queue(5)

async def jobs(item):
    time.sleep(random.randint(1,3))
    status = random.randint(0, 1)
    if status == 0:
        return ("success",item)
    else:
        return ("failed",item)

async def do_work(item):
    logging.info("do something %s,time start %s" % (item, time.asctime()))
    a =await jobs(item)
    return a

def async_runner(checker):
    new_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(new_loop)
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(do_work(checker))
    loop.run_until_complete(asyncio.wait([task]))
    st = task.result()
    return st

def worker_consumer(q_init,cond):
        # checker=None
        # while checker !="stop":
        while True:
            try:
                if cond.acquire():
                    if not q_init.empty():
                        cond.notify()
                        checker = q_init.get()
                        if checker == "stop":
                            cond.release()
                            break
                        st = async_runner(checker)
                        if st[0] in ["success", "failed"]:
                            logging.info("%s task finished status is %s" % (st[1], st[0]))
                            logging.info("大王我们吃完了%s鹌鹑蛋,再来点吧" % checker)
                            q_init.task_done()
                    cond.release()
            except Exception:
                 logging.debug("queue is empty ")

def producer(cond,q_init):
        item = 1
        while True:
            if cond.acquire():
                if q_init.qsize() <5:
                    q_init.put(item)
                    logging.info("孩儿们,加了%s鹌鹑蛋通知下可以吃啦" % item)
                    cond.notify()
                else:
                    cond.wait()
                cond.release()
                item += 1
                if item >= 11:
                   break
        q_init.join()
        for  i in range(thread_num):
            q_init.put("stop")

if __name__ == '__main__':
        # 消费者>生产者
        thread_num=5
        cond=threading.Condition()
        producer = [threading.Thread(target=producer,args=(cond,q_init)) for i in range(1)]
        consumer = [threading.Thread(target=worker_consumer, args=(q_init,cond)) for i in range(thread_num)]
        for p in producer:
                p.start()
        for k in consumer:
                k.start()

        for m in consumer:
            m.join()

  结果:

2019-12-21 12:50:01,781  - INFO -->producer  at line 70: 
 孩儿们,加了1鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:01,781  - INFO -->producer  at line 70: 
 孩儿们,加了2鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:01,781  - INFO -->producer  at line 70: 
 孩儿们,加了3鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:01,781  - INFO -->producer  at line 70: 
 孩儿们,加了4鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:01,782  - INFO -->producer  at line 70: 
 孩儿们,加了5鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:01,785  - INFO -->do_work  at line 22: 
 do something 1,time start Sat Dec 21 12:50:01 2019
2019-12-21 12:50:04,787  - INFO -->worker_consumer  at line 57: 
 1 task finished status is failed
2019-12-21 12:50:04,787  - INFO -->worker_consumer  at line 58: 
 大王我们吃完了1鹌鹑蛋,再来点吧
2019-12-21 12:50:04,788  - INFO -->do_work  at line 22: 
 do something 2,time start Sat Dec 21 12:50:04 2019
2019-12-21 12:50:06,789  - INFO -->worker_consumer  at line 57: 
 2 task finished status is failed
2019-12-21 12:50:06,789  - INFO -->worker_consumer  at line 58: 
 大王我们吃完了2鹌鹑蛋,再来点吧
2019-12-21 12:50:06,791  - INFO -->do_work  at line 22: 
 do something 3,time start Sat Dec 21 12:50:06 2019
2019-12-21 12:50:08,793  - INFO -->worker_consumer  at line 57: 
 3 task finished status is failed
2019-12-21 12:50:08,793  - INFO -->worker_consumer  at line 58: 
 大王我们吃完了3鹌鹑蛋,再来点吧
2019-12-21 12:50:08,794  - INFO -->do_work  at line 22: 
 do something 4,time start Sat Dec 21 12:50:08 2019
2019-12-21 12:50:09,795  - INFO -->worker_consumer  at line 57: 
 4 task finished status is failed
2019-12-21 12:50:09,795  - INFO -->worker_consumer  at line 58: 
 大王我们吃完了4鹌鹑蛋,再来点吧
2019-12-21 12:50:09,796  - INFO -->do_work  at line 22: 
 do something 5,time start Sat Dec 21 12:50:09 2019
2019-12-21 12:50:12,798  - INFO -->worker_consumer  at line 57: 
 5 task finished status is success
2019-12-21 12:50:12,798  - INFO -->worker_consumer  at line 58: 
 大王我们吃完了5鹌鹑蛋,再来点吧
2019-12-21 12:50:12,799  - INFO -->producer  at line 70: 
 孩儿们,加了7鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:12,799  - INFO -->producer  at line 70: 
 孩儿们,加了8鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:12,799  - INFO -->producer  at line 70: 
 孩儿们,加了9鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:12,799  - INFO -->producer  at line 70: 
 孩儿们,加了10鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:12,801  - INFO -->do_work  at line 22: 
 do something 7,time start Sat Dec 21 12:50:12 2019
2019-12-21 12:50:14,802  - INFO -->worker_consumer  at line 57: 
 7 task finished status is success
2019-12-21 12:50:14,802  - INFO -->worker_consumer  at line 58: 
 大王我们吃完了7鹌鹑蛋,再来点吧
2019-12-21 12:50:14,804  - INFO -->do_work  at line 22: 
 do something 8,time start Sat Dec 21 12:50:14 2019
2019-12-21 12:50:17,805  - INFO -->worker_consumer  at line 57: 
 8 task finished status is success
2019-12-21 12:50:17,805  - INFO -->worker_consumer  at line 58: 
 大王我们吃完了8鹌鹑蛋,再来点吧
2019-12-21 12:50:17,806  - INFO -->do_work  at line 22: 
 do something 9,time start Sat Dec 21 12:50:17 2019
2019-12-21 12:50:20,808  - INFO -->worker_consumer  at line 57: 
 9 task finished status is failed
2019-12-21 12:50:20,808  - INFO -->worker_consumer  at line 58: 
 大王我们吃完了9鹌鹑蛋,再来点吧
2019-12-21 12:50:20,810  - INFO -->do_work  at line 22: 
 do something 10,time start Sat Dec 21 12:50:20 2019
2019-12-21 12:50:23,810  - INFO -->worker_consumer  at line 57: 
 10 task finished status is failed
2019-12-21 12:50:23,810  - INFO -->worker_consumer  at line 58: 
 大王我们吃完了10鹌鹑蛋,再来点吧

  

上一篇:并发编程 — Condition 使用及原理详解


下一篇:JUC-8-lock和Condition使用