zeromq:官网 安装 demo及各语言绑定 golang绑定
实验环境:win10 x64/centos6 x86 zeromq4.0.6
zmq三种模式:push/pull、pub/sub、req/resp
一、push/pull模式:
A程序PUSH代码如下:
import zmq
import time
context = zmq.Context() sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557") for task_nbr in range(): workload = 'task2---'+str(task_nbr)
sender.send_string(u'%s' % workload)
time.sleep(0.1)
B程序PULL代码如下:
import time
import zmq context = zmq.Context() receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557") while True: s = receiver.recv()
time.sleep()
A程序准备发送1000个task任务,完事退出;B程序一直pull。
A先启动,此时push会一直阻塞,直到有一个pull连接,task才发出,假如中途B挂掉,A也继续进入阻塞,B再次上线,pull开始从上次A阻塞时的任务拉取。假如A挂掉,task任务发送中断,则B一直pull到A挂前发出的任务为止,这时候A再重启上线,则是另一回事了,任务for循环又重新开始,B继续。
B先启动,A后启动,场景和以上A先启动一样。
push一端具有负载均衡功能,pull一端可以多启动几个实例,均衡执行task,若中途pull一端挂掉,则push自动把任务分发到其他pull上。
二、 pub/sub模式
A程序代码如下:
pub_Queue = Queue.Queue()
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5557") while True:
publisher.send_string(msg)
BCD代码如下:
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://*:5557")
subscriber.setsockopt(zmq.SUBSCRIBE, "") while True: contents = subscriber.recv()
print("%s" % contents)
A pub端通过sub端subscriber.setsockopt(zmq.SUBSCRIBE, "2")的“2”过滤信息,sub端可以订阅多个。远程公网使用该模式,需要pub端定期维持心跳包,比如半分钟pub一条信息。
三、req/resp模式
A程序代码如下:
import zmq context = zmq.Context() socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555") for request in range():
socket.send(b"Hello") message = socket.recv()
print "Received reply %s [ %s ]" % (request, message)
B程序代码如下:
import time
import zmq context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555") while True:
message = socket.recv() time.sleep() socket.send(b"World")
可以没有先后启动顺序,假如A先启动,阻塞;假如B先启动,等待请求接入。
过程中,假如A挂掉,再启动,可以继续请求应答;而假如时B服务端挂掉再重启,则A客户端已经连不上了,阻塞。情形和以上几种模式一样的。
总结:
以上是通过官网python bind的源码做的实验。虽然不要求客户端/服务端的先后启动次序,但是中途若是服务端挂掉再启动,就是另一种情况了。在表现形式上来说,似乎此时的c/s线路不再是挂掉之前的,而是服务端又启动了一条线路一样。这个时候,就需要客户端去检测判断服务端状态,以确定重连机制。到底是不是这样呢?需要深入zeromq的代码一看究竟了!