python – zero-mq:socket.recv()调用阻塞

我正在尝试使用zero-mq.My要求非常简单.我希望能够在网络中的两个对等体之间进行通信.我在本书的示例中遇到了这个程序.

$pub_server.py

import zmq
import random
import sys
import time

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
    topic = random.randrange(9999,10005)
    messagedata = random.randrange(1,215) - 80
    print "%d %d" % (topic, messagedata)
    socket.send("%d %d" % (topic, messagedata))
    time.sleep(1)

$sub_client.py

import sys
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
for update_nbr in range (5):
    string = socket.recv()
    topic, messagedata = string.split()
    total_value += int(messagedata)
    print ('{} {}'.format(topic, messagedata))

print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/update_nbr)))

我对这个模型的问题是

string = socket.recv()

块直到我收到消息.我不希望这种情况发生.我希望消息在接收方排队,以便我可以将它从队列中取出(或类似的东西)

是否存在零mq的某些模型允许这样做?

解决方法:

如果传递zmq.NOBLOCK标志参数,则zmq.Socket.recv不会阻止.

文档说:

If NOBLOCK is set, this method will raise a ZMQError with EAGAIN if a message is not ready.

zmq将对它接收的消息进行排队,并为每个recv()调用返回一条消息,直到此队列耗尽,然后引发ZMQError.

下面的exmaples中使用的zmq.Again是zmq.EAGAIN的包装器.

例如:

while True:
    try:
        #check for a message, this will not block
        message = socket.recv(flags=zmq.NOBLOCK)

        #a message has been received
        print "Message received:", message

    except zmq.Again as e:
        print "No message received yet"

    # perform other important stuff
    time.sleep(10)

也许可以编写sub_client.py示例来使用非阻塞行为,如下所示:

import sys, time
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
received_value_count = 0
do_receive_loop = True
while do_receive_loop:
    try:
        #process all messages waiting on subscribe socket
        while True:
            #check for a message, this will not block
            string = socket.recv(flags=zmq.NOBLOCK)

            #message received, process it
            topic, messagedata = string.split()
            total_value += int(messagedata)
            print ('{} {}'.format(topic, messagedata))

            #check if we have all the messages we want
            received_value_count += 1
            if received_value_count > 4:
                do_receive_loop = False
                break

    except zmq.Again as e:
        #No messages waiting to be processed
        pass

    #Here we can do other stuff while waiting for messages
    #contemplate answer to 'The Last Question'
    time.sleep(15)
    print "INSUFFICIENT DATA FOR MEANINGFUL ANSWER"

print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/5)))
上一篇:python – ZMQ DEALER ROUTER高频丢失消息?


下一篇:c# – 尝试使用ZeroMQ构建分布式爬虫