简介:
ZeroMQ简称ZMQ,它对socket编程进行了封装,通俗的说,它就像一个框架一样,对socket lib进行了很好的封装,让socket编程变得更加简单。
ZMQ采用消息队列的方式对socket的包进行管理,它支持多线程,同时ZMQ是开源的,你可以在不同的Linux内核架构上缩减ZMQ模块,这在ARM嵌入式上非常有利。
ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”,目前还未成功。
ZMQ不是消息队列服务器,是一个基于消息队列模式的Socket库。
ZMQ的特点:
socket是1:1,而ZMQ对socket封装了以后具有n:1的特点,所谓的n:1就是无数客户端可以同时连接一个服务器,你无需去做线程服务分发这样的工作,因为ZMQ都已经为你实现好了。
同时socket的建立与通讯较为复杂,用户需要选择协议,端口,通讯类型、AF_INET、IPv4或者6等等,同时用户还需要考虑发送数据与接收数据的缓冲区大小,包括需要自己管理套接字,而ZMQ屏蔽了这些细节,让用户写少量的代码就能实现一个基本的服务器与客户端程序,同时它支持大文件传输,使得用户不用去关心很多细节,同时也降低了网络开发门槛,会让许多人可能都不太了解BSD的套接字协议。
所以不建议新手采用ZMQ,如果你是学习者则建议先从socket开始学起。
ZMQ的三种模式:
1.应答模式
应答模式在ZMQ里是REQ和REP
服务端是REP
客户端是REQ
典型的一问一答协议,即客户端需要首先发送hello,服务器则返回word,若客户端发送hello,服务器没有应答,后续通讯将不成立。
如:
客户端首先对服务端发送了hello,那么客户端会等待服务端应答,若在此期间客户端再次向服务端发送消息,服务端是收不到的,客户端有一个消息队列,会放入消息队列,只有在 客户端收到服务端的回应之后才会去依次处理消息队列里的内容。
2.订阅模式
即PUB/SUB
PUB代表服务器,SUB代表客户端
这种服务即服务器会不停发送数据,然后客户端对其进行订阅,客户端会收到服务器发送的数据,且不需要做出应答,客户端也不需要发送打招呼消息,只需要连接上就会收到服务器的订阅消息。
同时服务器不具有收客户端发送消息的能力。
这是单向的,即服务器只能发,客户端只能收,可以同时多个客户端订阅一个服务器。
3.分布式模式
PUSH/PULL
这种协议即服务器收到消息会立马推送给连接的客户端。
如一个使用PUSH协议的服务器,然后有四个PULL客户端连接上去。
如果我想同时发送消息给其它四个客户端,我不需要一个一个发,只需要给服务器发一个消息,服务器会自动推送给其它四个。
安装教程
1.Linux
去官方下载合适的版本:http://download.zeromq.org/ 下载完成后解压至任意目录并进入到此目录下
执行configure,prefix参数为安装目录
若出现缺失ibsodium,加上--without-libsodium参数
./configure --prefix=/usr/local/zeromq
编译与安装
make
make install
安装完成之后,安装目录下会有include和lib,一个是头文件目录一个是库文件目录。
目前发行版仓库已经自带了ZMQ
各大发行版安装方式:
Fedora
dnf install zeromq-devel
Ubuntu/Debian/Mint
apt-get install libzmq3-dev
Arch
pacman -S zeromq
SUSE
zypper install zeromq-devel
2.Windows
这里下载安装包https://zeromq.org/download/
解压,并打开.sln文件,编译生成后会在当前目录下生成include和lib文件,把include和lib添加到你的项目工程里就可以使用了。
使用教程
1.应答模式REQ/REP
客户端:
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
int main(void)
{
printf("Connecting to server...\n");
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_REQ);
zmq_connect(socket, "tcp://localhost:6666");
while(1)
{
char buffer[10];
const char * requestMsg = "Hello";
int bytes = zmq_send(socket, requestMsg, strlen(requestMsg), 0);
printf("[Client][%d] Sended Request Message: %d bytes, content == \"%s\"\n", i, bytes, requestMsg);
bytes = zmq_recv(socket, buffer, 10, 0);
buffer[bytes] = '\0';
printf("[Client][%d] Received Reply Message: %d bytes, content == \"%s\"\n", i, bytes, buffer);
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
服务器:
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
int main(void)
{
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_REP);
zmq_bind(socket, "tcp://*:6666");
while(1)
{
char buffer[10];
int bytes = zmq_recv(socket, buffer, 10, 0);
buffer[bytes] = '\0';
printf("[Server] Recevied Request Message: %d bytes, content == \"%s\"\n", bytes, buffer);
sleep(1);
const char * replyMsg = "World";
bytes = zmq_send(socket, replyMsg, strlen(replyMsg), 0);
printf("[Server] Sended Reply Message: %d bytes, content == \"%s\"\n", bytes, replyMsg);
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
2.订阅模式PUB/SUB
服务器
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <zmq.h>
int main()
{
printf("Hello world!\n");
void* context = zmq_ctx_new();
assert(context != NULL);
int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);
assert(ret == 0);
void* publisher = zmq_socket(context, ZMQ_PUB);
assert(publisher != NULL);
ret = zmq_bind(publisher, "tcp://192.168.1.5:8888");
assert(ret == 0);
while(1)
{
ret = zmq_send(publisher, "Hi,I'm server", 16, 0);
assert(ret == 7);
printf("%d\n", ret);
sleep(1);
}
printf("1\n");
return 0;
}
客户端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <zmq.h>
int main()
{
printf("Hello world!\n");
void* context = zmq_ctx_new();
assert(context != NULL);
int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);
assert(ret == 0);
void* subscriber = zmq_socket(context, ZMQ_SUB);
assert(subscriber != NULL);
ret = zmq_connect(subscriber, "tcp://192.168.1.5:8888");
assert(ret == 0);
ret = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
assert(ret == 0);
char buf[16];
while(1)
{
ret = zmq_recv(subscriber, buf, 16, ZMQ_DONTWAIT);
if (ret != -1)
{
buf[ret] = '\0';
printf("%s\n", buf);
}
sleep(1);
}
return 0;
}
3.分布式推送PUSH/PULL
分发者 ventilator
执行者 worker
收集结果的接收者 sink
ventilator:
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
int main(void)
{
void * context = zmq_ctx_new();
void * sender = zmq_socket(context, ZMQ_PUSH);
zmq_bind(sender, "tcp://*:6666");
printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers...\n");
while(1)
{
const char * replyMsg = "World";
zmq_send(sender, replyMsg, strlen(replyMsg), 0);
printf("[Server] Sended Reply Message content == \"%s\"\n", replyMsg);
}
zmq_close(sender);
zmq_ctx_destroy(context);
return 0;
}
worker:
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
int main(void)
{
void * context = zmq_ctx_new();
void * recviver = zmq_socket(context, ZMQ_PULL);
zmq_connect(recviver, "tcp://localhost:6666");
void * sender = zmq_socket(context, ZMQ_PUSH);
zmq_connect(sender, "tcp://localhost:5555");
while(1)
{
char buffer [256];
int size = zmq_recv (recviver, buffer, 255, 0);
if(size < 0)
{
return -1;
}
printf("buffer:%s\n",buffer);
const char * replyMsg = "World";
zmq_send(sender, replyMsg, strlen(replyMsg), 0);
printf("[Server] Sended Reply Message content == \"%s\"\n", replyMsg);
}
zmq_close(recviver);
zmq_close(sender);
zmq_ctx_destroy(context);
return 0;
}
sink:
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
int main(void)
{
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_PULL);
zmq_bind(socket, "tcp://*:5555");
while(1)
{
char buffer [256];
int size = zmq_recv (socket, buffer, 255, 0);
if(size < 0)
{
return -1;
}
printf("buffer:%s\n",buffer);
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
流程图:
由ventilator通知work分发。
results连接到work,work连接至ventilator,当有需要推送的消息时,由ventilator推送给work,work在推送给其它客户端。