我正在编写简单的生产者/消费者程序.
import zmq
@asyncio.coroutine
def receive_data(future,s):
print("begin to recv sth from.....socket"
my_data = s.recv()
future.set_result(my_data)
@asyncio.coroutine
def producer(loop,q,s):
while True:
future = asyncio.Future()
yield from receive_data(future,s)
data = str(future.result())
yield from q.put(data)
@asyncio.coroutine
def consumer(loop,q):
while True:
a = yield from q.get()
print("i am get..."+str(a)+"..."+str(type(a)))
loop = asyncio.get_event_loop()
c = zmq.Context()
s = c.socket(zmq.REP)
s.bind('tcp://127.0.0.1:5515')
q = asyncio.Queue()
tasks=[asyncio.Task(producer(loop,q,s)),asyncio.Task(comsumer(loop,q))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
s.close()
看来消费者没有机会执行.
套接字每500毫秒接收一次数据,因此当receive_data函数中的yield挂起生产者协程时,消费者协程将打印信息.
有什么可以解释的?
解决方法:
s.recv()阻止了调用,因此receive_data会挂起,直到新的ZMQ消息到达为止.
这阻止了事件循环,并且使用者没有机会执行自己.
您可以将zmq.NOBLOCK标志传递给.recv,如果没有可用数据使eventloop可以迭代其他准备好的任务,则可以调用asyncio.sleep(0).
或者只使用aiozmq库:)