我正在尝试使用zero-mq.My要求非常简单.我希望能够在网络中的两个对等体之间进行通信.我在本书的示例中遇到了这个程序.
$pub_server.py
import zmq
import random
import sys
import time
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
while True:
topic = random.randrange(9999,10005)
messagedata = random.randrange(1,215) - 80
print "%d %d" % (topic, messagedata)
socket.send("%d %d" % (topic, messagedata))
time.sleep(1)
$sub_client.py
import sys
import zmq
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)
# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
# Process 5 updates
total_value = 0
for update_nbr in range (5):
string = socket.recv()
topic, messagedata = string.split()
total_value += int(messagedata)
print ('{} {}'.format(topic, messagedata))
print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/update_nbr)))
我对这个模型的问题是
string = socket.recv()
块直到我收到消息.我不希望这种情况发生.我希望消息在接收方排队,以便我可以将它从队列中取出(或类似的东西)
是否存在零mq的某些模型允许这样做?
解决方法:
如果传递zmq.NOBLOCK标志参数,则zmq.Socket.recv不会阻止.
文档说:
If NOBLOCK is set, this method will raise a ZMQError with EAGAIN if a message is not ready.
zmq将对它接收的消息进行排队,并为每个recv()调用返回一条消息,直到此队列耗尽,然后引发ZMQError.
下面的exmaples中使用的zmq.Again是zmq.EAGAIN的包装器.
例如:
while True:
try:
#check for a message, this will not block
message = socket.recv(flags=zmq.NOBLOCK)
#a message has been received
print "Message received:", message
except zmq.Again as e:
print "No message received yet"
# perform other important stuff
time.sleep(10)
也许可以编写sub_client.py示例来使用非阻塞行为,如下所示:
import sys, time
import zmq
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)
# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
# Process 5 updates
total_value = 0
received_value_count = 0
do_receive_loop = True
while do_receive_loop:
try:
#process all messages waiting on subscribe socket
while True:
#check for a message, this will not block
string = socket.recv(flags=zmq.NOBLOCK)
#message received, process it
topic, messagedata = string.split()
total_value += int(messagedata)
print ('{} {}'.format(topic, messagedata))
#check if we have all the messages we want
received_value_count += 1
if received_value_count > 4:
do_receive_loop = False
break
except zmq.Again as e:
#No messages waiting to be processed
pass
#Here we can do other stuff while waiting for messages
#contemplate answer to 'The Last Question'
time.sleep(15)
print "INSUFFICIENT DATA FOR MEANINGFUL ANSWER"
print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/5)))