zmq模块的理解和使用

  最近项目中接触到ZeroMQ, 内部实现挺复杂的,没时间深入了解,简单记录下使用方法吧,有时间会来填坑。 官方指导文档http://zguide.zeromq.org/page:all

  项目主要用ZeroMQ在多个ip主机上的服务间进行项目通信,直接用scoket也可以实现,但比较费时费力,ZeroMQ建立在socket的基础上,提供了一套更加简单强大的API,可以快速搭建起跨进程,跨ip等的通信网络。很多文章中都提到了socket只能实现一对一的通信,ZeroMQ可以实现多对多的连接,而且有三种模式供选择,可以根据业务需要,进行选择和使用。

  ZeroMQ的三种通信模式分别是:Request-Reply,  Publisher-subscriber,  Parallel Pipeline

  python安装zmq模块:pip install pyzmq

1. Request-Reply(应答模式)

  应答模式特点:

    1. 客户端提出请求,服务端必须回答请求,每个请求只回答一次

    2.  客户端没有收到答复前,不能再次进行请求

    3. 可以有多个客户端提出请求,服务端能保证各个客户端只接收到自己的答复

       4. 如果服务端断掉或者客户端断掉会产生怎样的影响?

      如果是客户端断掉,对服务端没有任何影响,如果客户端随后又重新启动,那么两方继续一问一答,但是如果是服务端断掉了,就可能会产生一些问题,这要看服务端是在什么情况下断掉的,如果服务端收是在回答完问题后断掉的,那么没影响,重启服务端后,双发继续一问一答,但如果服务端是在收到问题后断掉了,还没来得及回答问题,这就有问题了,那个提问的客户端迟迟得不到答案,就会一直等待答案,因此不会再发送新的提问,服务端重启后,客户端迟迟不发问题,所以也就一直等待提问。

zmq模块的理解和使用

 

 

 python 实现客户端和服务端代码如下:

zmq_server.py

import zmq


context = zmq.Context()            #创建上下文
socket = context.socket(zmq.REP)   #创建Response服务端socket
socket.bind("tcp://*:5555")        #socket绑定,*表示本机ip,端口号为5555,采用tcp协议通信

while True:
    message = socket.recv()
    print(type(message))          #接收到的消息也会bytes类型(字节)
    print("收到消息:{}".format(message))
    socket.send(b"new message")   #发送消息,字节码消息

zmq_client.py

#coding:utf-8

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

socket.send(b"A message")
response = socket.recv()
print(response)

常用数据发送API如下:

#发送数据
socket.send_json(data)   #data 会被json序列化后进行传输 (json.dumps)
socket.send_string(data, encoding="utf-8")   #data为unicode字符串,会进行编码成子节再传输
socket.send_pyobj(obj)    #obj为python对象,采用pickle进行序列化后传输
socket.send_multipart(msg_parts)   # msg_parts, 发送多条消息组成的迭代器序列,每条消息是子节类型,
                                    # 如[b"message1", b"message2", b"message2"]

#接收数据
socket.recv_json()
socket.recv_string()
socket.recv_pyobj()
socket.recv_multipart()

 

2. Publisher-Subscriber (发布-订阅模式)

  publiser广播消息到所有客户端,客户端根据订阅主题过滤消息zmq模块的理解和使用

 

 

 python实现代码如下, 其中publisher发布两条消息,第一条消息的topic为client1, 被第一个subscriber接收到;第二条消息的topic为client2, 被第二个subscriber接收到。

注意的是subscriber在匹配时,并不是完全匹配的,消息的topic为client1开头的字符串都会被匹配到,如果topic为"client1cient2", 也会被第一个subscriber接收到

zmq_server.py

#coding:utf-8
import zmq

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
topic = ["client1", "client2"]
while True:
    for t in topic:
        data = "message for {}".format(t)
        msg = [t.encode("utf-8"), data.encode("utf-8")]     #列表中的第一项作为消息的topic,sub根据topic过滤消息
        print(msg)
        socket.send_multipart(msg)

zmq_client1.py

#coding:utf-8

import zmq


context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.subscribe("client1")          #订阅主题topic为:client1
socket.connect("tcp://localhost:5555")
msg = socket.recv_multipart()
print(msg)

结果:
zmq模块的理解和使用

 

 

zmq_client2.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.subscribe("client2") #订阅主题topic为:client2
socket.connect("tcp://localhost:5555")
msg = socket.recv_multipart()
print(msg) 

结果:

zmq模块的理解和使用

3. Parallel Pipeline(并行管道模式)

    管道模式有三部分组成,如下图所示,最左边的producer通过push产生任务, 中间的consumer接收任务处理后转发,最后result collector接收所有任务的结果。 相比于publisher-subscriber,多了一个数据缓存和处理负载的部分,当连接断开,数据不会丢失,重连后数据继续发送到客户端。

zmq模块的理解和使用

 

 

 python实现producer, consumer, resultcollector

producer.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5577")
for num in range(2000):
    work_message = {"num": num}
    socket.send_json(work_message)

consumer.py

import random
import zmq
context = zmq.Context()
consumer_id = random.randint(1, 1000)
#接收工作
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5577")
#转发结果
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5578")
while True:
    msg = consumer_receiver.recv_json()
    data = msg["num"]
    result = {"consumer_id":consumer_id, "num":data}
    consumer_sender.send_json(result)

resultcollector.py

#coding:utf-8

import zmq

context = zmq.Context()
result_receiver = context.socket(zmq.PULL)
result_receiver.connect("tcp://localhost:5578")
result = result_receiver.recv_json()
collecter_data = {}
for x in range(1000):
    if result['consumer_id'] in collecter_data:
        collecter_data[result['consumer_id']] = collecter_data[result['consumer_id']] + 1
    else:
        collecter_data[result['consumer_id']] = 1
    if x == 999:
        print(collecter_data)

执行顺序:

python producer.py
python consumer.py
python resultcollector.py

 

 参考文章:

https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/patterns/pushpull.html 

https://segmentfault.com/a/1190000012010573

 

上一篇:ZeroMQ_10 节点协调


下一篇:ZeroMQ:16---模式之(ØMQ模式总览)