ZeroMQ使用教程

介:

ZeroMQ简称ZMQ,它对socket编程进行了封装,通俗的说,它就像一个框架一样,对socket lib进行了很好的封装,让socket编程变得更加简单。

ZMQ采用消息队列的方式对socket的包进行管理,它支持多线程,同时ZMQ是开源的,你可以在不同的Linux内核架构上缩减ZMQ模块,这在ARM嵌入式上非常有利。

ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”,目前还未成功。

ZMQ不是消息队列服务器,是一个基于消息队列模式的Socket库。

 

ZMQ的特点:

socket是1:1,而ZMQ对socket封装了以后具有n:1的特点,所谓的n:1就是无数客户端可以同时连接一个服务器,你无需去做线程服务分发这样的工作,因为ZMQ都已经为你实现好了。

同时socket的建立与通讯较为复杂,用户需要选择协议,端口,通讯类型、AF_INET、IPv4或者6等等,同时用户还需要考虑发送数据与接收数据的缓冲区大小,包括需要自己管理套接字,而ZMQ屏蔽了这些细节,让用户写少量的代码就能实现一个基本的服务器与客户端程序,同时它支持大文件传输,使得用户不用去关心很多细节,同时也降低了网络开发门槛,会让许多人可能都不太了解BSD的套接字协议。

所以不建议新手采用ZMQ,如果你是学习者则建议先从socket开始学起。

 

ZMQ的三种模式:

1.应答模式

应答模式在ZMQ里是REQ和REP

服务端是REP

客户端是REQ

典型的一问一答协议,即客户端需要首先发送hello,服务器则返回word,若客户端发送hello,服务器没有应答,后续通讯将不成立。

如:

客户端首先对服务端发送了hello,那么客户端会等待服务端应答,若在此期间客户端再次向服务端发送消息,服务端是收不到的,客户端有一个消息队列,会放入消息队列,只有在 客户端收到服务端的回应之后才会去依次处理消息队列里的内容。

2.订阅模式

即PUB/SUB

PUB代表服务器,SUB代表客户端

这种服务即服务器会不停发送数据,然后客户端对其进行订阅,客户端会收到服务器发送的数据,且不需要做出应答,客户端也不需要发送打招呼消息,只需要连接上就会收到服务器的订阅消息。

同时服务器不具有收客户端发送消息的能力。

这是单向的,即服务器只能发,客户端只能收,可以同时多个客户端订阅一个服务器。

3.分布式模式

PUSH/PULL

这种协议即服务器收到消息会立马推送给连接的客户端。

如一个使用PUSH协议的服务器,然后有四个PULL客户端连接上去。

如果我想同时发送消息给其它四个客户端,我不需要一个一个发,只需要给服务器发一个消息,服务器会自动推送给其它四个。

 

安装教程

1.Linux

去官方下载合适的版本:http://download.zeromq.org/ 下载完成后解压至任意目录并进入到此目录下

执行configure,prefix参数为安装目录

若出现缺失ibsodium,加上--without-libsodium参数

./configure --prefix=/usr/local/zeromq

编译与安装

make
make install

安装完成之后,安装目录下会有include和lib,一个是头文件目录一个是库文件目录。

目前发行版仓库已经自带了ZMQ

各大发行版安装方式:

Fedora

dnf install zeromq-devel

Ubuntu/Debian/Mint

apt-get install libzmq3-dev

Arch

pacman -S zeromq

SUSE

zypper install zeromq-devel

2.Windows

这里下载安装包https://zeromq.org/download/

解压,并打开.sln文件,编译生成后会在当前目录下生成include和lib文件,把include和lib添加到你的项目工程里就可以使用了。

 

使用教程

1.应答模式REQ/REP

客户端:

#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

int main(void)
{
    printf("Connecting to server...\n");

    void * context = zmq_ctx_new();
    void * socket = zmq_socket(context, ZMQ_REQ);
    zmq_connect(socket, "tcp://localhost:6666");

    while(1)
    {
        char buffer[10];
        const char * requestMsg = "Hello";
        int bytes = zmq_send(socket, requestMsg, strlen(requestMsg), 0);
        printf("[Client][%d] Sended Request Message: %d bytes, content == \"%s\"\n", i, bytes, requestMsg);

        bytes = zmq_recv(socket, buffer, 10, 0);
        buffer[bytes] = '\0';
        printf("[Client][%d] Received Reply Message: %d bytes, content == \"%s\"\n", i, bytes, buffer);

    }

    zmq_close(socket);
    zmq_ctx_destroy(context);

    return 0;
}

服务器:

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket = zmq_socket(context, ZMQ_REP);
    zmq_bind(socket, "tcp://*:6666");

    while(1)
    {
        char buffer[10];
        int bytes = zmq_recv(socket, buffer, 10, 0);
        buffer[bytes] = '\0';
        printf("[Server] Recevied Request Message: %d bytes, content == \"%s\"\n", bytes, buffer);

        sleep(1);

        const char * replyMsg = "World";
        bytes = zmq_send(socket, replyMsg, strlen(replyMsg), 0);
        printf("[Server] Sended Reply Message: %d bytes, content == \"%s\"\n", bytes, replyMsg);
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);

    return 0;
}

2.订阅模式PUB/SUB

服务器

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <zmq.h>


int main()
{
    printf("Hello world!\n");

    void* context = zmq_ctx_new();
    assert(context != NULL);

    int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);

    assert(ret == 0);

    void* publisher = zmq_socket(context, ZMQ_PUB);
    assert(publisher != NULL);

    ret = zmq_bind(publisher, "tcp://192.168.1.5:8888");

    assert(ret == 0);

    while(1)
    {
        ret = zmq_send(publisher, "Hi,I'm server", 16, 0);

        assert(ret == 7);

        printf("%d\n", ret);

        sleep(1);

    }

    printf("1\n");

    return 0;
}

客户端

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <zmq.h>

int main()
{
    printf("Hello world!\n");

    void* context = zmq_ctx_new();
    assert(context != NULL);

    int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);
    assert(ret == 0);

    void* subscriber = zmq_socket(context, ZMQ_SUB);
    assert(subscriber != NULL);

    ret = zmq_connect(subscriber, "tcp://192.168.1.5:8888");
    assert(ret == 0);

    ret = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);

    assert(ret == 0);

    char buf[16];
    while(1)
    {
        ret = zmq_recv(subscriber, buf, 16, ZMQ_DONTWAIT);
        if (ret != -1)
        {
            buf[ret] = '\0';
            printf("%s\n", buf);
        }
        sleep(1);
    }



    return 0;
}

3.分布式推送PUSH/PULL

分发者 ventilator
执行者 worker
收集结果的接收者 sink

ventilator:

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
 
int main(void)
{
    void * context = zmq_ctx_new();
    void * sender = zmq_socket(context, ZMQ_PUSH);
    zmq_bind(sender, "tcp://*:6666");
	printf ("Press Enter when the workers are ready: ");
    getchar ();
	printf ("Sending tasks to workers...\n");
    while(1)
    { 
        const char * replyMsg = "World";
        zmq_send(sender, replyMsg, strlen(replyMsg), 0);
        printf("[Server] Sended Reply Message content == \"%s\"\n", replyMsg);
    }
 
    zmq_close(sender);
    zmq_ctx_destroy(context);
 
    return 0;
}

worker:

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

int main(void)
{
void * context = zmq_ctx_new();
void * recviver = zmq_socket(context, ZMQ_PULL);
zmq_connect(recviver, "tcp://localhost:6666");

void * sender = zmq_socket(context, ZMQ_PUSH);
zmq_connect(sender, "tcp://localhost:5555");

while(1)
{
char buffer [256];
int size = zmq_recv (recviver, buffer, 255, 0);
if(size < 0)
{
return -1;
}
printf("buffer:%s\n",buffer);
const char * replyMsg = "World";
zmq_send(sender, replyMsg, strlen(replyMsg), 0);
printf("[Server] Sended Reply Message content == \"%s\"\n", replyMsg);
}

zmq_close(recviver);
zmq_close(sender);
zmq_ctx_destroy(context);

return 0;
}

sink:

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
 
int main(void)
{
    void * context = zmq_ctx_new();
    void * socket = zmq_socket(context, ZMQ_PULL);
    zmq_bind(socket, "tcp://*:5555");
 
    while(1)
    { 
       	char buffer [256];
		int size = zmq_recv (socket, buffer, 255, 0);
		if(size < 0)
		{
			return -1;
		}
        printf("buffer:%s\n",buffer);
    }
 
    zmq_close(socket);
    zmq_ctx_destroy(context);
 
    return 0;
}

流程图:

ZeroMQ使用教程

由ventilator通知work分发。

results连接到work,work连接至ventilator,当有需要推送的消息时,由ventilator推送给work,work在推送给其它客户端。

上一篇:B类直播产品化建设浅谈


下一篇:树莓派入坑总结与小案例实时视频监控