我正在尝试编写一个服务器/客户端脚本,其中包含一个可以解决任务的服务器,以及执行它的多个工作者.
问题是我的呼吸机有很多任务,它会在心跳中填满记忆.
我尝试在绑定之前设置HWM,但没有成功.它只是在工作人员连接时继续发送消息,完全忽略已设置的HWM.我还有一个接收器,记录完成的任务.
server.py
import zmq
def ventilate():
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
sender.bind("tcp://*:5557")
# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")
print "Sending tasks to workers…"
# The first message is "0" and signals start of batch
sink.send('0')
print "Sent starting signal"
while True:
sender.send("Message")
if __name__=="__main__":
ventilate()
worker.py
import zmq
from multiprocessing import Process
def work():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
# Process t asks forever
while True:
msg = receiver.recv_msg()
print "Doing sth with msg %s"%(msg)
sender.send("Message %s done"%(msg))
if __name__ == "__main__":
for worker in range(10):
Process(target=work).start()
sink.py
import zmq
def sink():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
# Wait for start of batch
s = receiver.recv()
print "Received start signal"
while True:
msg = receiver.recv_msg()
print msg
if __name__=="__main__":
sink()
解决方法:
好吧,我玩了一下,我认为问题不在于PUSH HWM,而是你无法为PULL设置HWM.如果你看一下this documentation,你可以在那里看到N / A表示在HWM上采取行动.
PULL套接字似乎每个都需要数百个消息(我确实尝试设置一个HWM,以防万一它在PULL套接字上做了什么.它没有.).我通过更改呼吸机来发送带有递增整数的消息来证明这一点,并且在调用recv()之间将池中的每个worker更改为等待2秒.工作人员打印出他们正在处理具有完全不同整数的消息.例如,一个工作人员将处理消息10,而下一个正在处理消息400.随着时间的推移,您看到处理消息10的工作人员正在处理消息11,12,13等,而另一个是处理401,402等.
这向我表明ZMQ_PULL套接字正在某处缓冲消息.因此,虽然ZMQ_PUSH套接字确实有HWM,但PULL套接字正在快速请求消息,尽管它们实际上并未通过调用recv()来访问.因此,如果连接了PULL套接字,则会导致PUSH HWM被忽略.据我所知,你无法控制PULL套接字缓冲区的长度(我希望RCVHWM套接字选项可以控制它,但它似乎没有).
这个行为当然引出了一个问题:ZMQ_PULL HWM选项的重点是什么,只有你还可以控制接收套接字HWM才有意义.
在这一点上,我开始问0MQ people你是否遗漏了一些明显的东西,或者这是否被认为是一个错误.
对不起,我无法提供更多帮助!