当你想要对节点进行协调时,PAIR套接字就不怎么合适了,这也是线程和节点之间的不同之处。一般来说,节点是来去*的,而线程则较为稳定。使用PAIR套接字时,若远程节点断开连接后又进行重连,PAIR不会予以理会。
第二个区别在于,线程的数量一般是固定的,而节点数量则会经常变化。让我们以气象信息模型为基础,看看要怎样进行节点的协调,以保证客户端不会丢失最开始的那些消息。
下面是程序运行逻辑:
- 发布者知道预期的订阅者数量,这个数字可以任意指定;
- 发布者启动后会先等待所有订阅者进行连接,也就是节点协调。每个订阅者会使用另一个套接字来告知发布者自己已就绪;
- 当所有订阅者准备就绪后,发布者才开始发送消息。
这里我们会使用REQ-REP套接字来同步发布者和订阅者。发布者的代码如下:
syncpub: Synchronized publisher in C
#include "../zhelpers.h" #define SUBSCRIBERS_EXPECTED 10 // We wait for 10 subscribers int main (void) { void *context = zmq_ctx_new (); // Socket to talk to clients void *publisher = zmq_socket (context, ZMQ_PUB); int sndhwm = 11000000; // ZMQ_SNDHWM:对向外发送的消息设置高水位(最大缓存量) zmq_setsockopt (publisher, ZMQ_SNDHWM, &sndhwm, sizeof (int)); zmq_bind (publisher, "tcp://*:5561"); // Socket to receive signals void *syncservice = zmq_socket (context, ZMQ_REP); zmq_bind (syncservice, "tcp://*:5562"); // Get synchronization from subscribers printf ("Waiting for subscribers\n"); int subscribers = 0; while (subscribers < SUBSCRIBERS_EXPECTED) { // - wait for synchronization request char *string = s_recv (syncservice); free (string); // - send synchronization reply s_send (syncservice, ""); subscribers++; } // Now broadcast exactly 1M updates followed by END printf ("Broadcasting messages\n"); int update_nbr; for (update_nbr = 0; update_nbr < 1000000; update_nbr++) s_send (publisher, "Rhubarb"); s_send (publisher, "END"); zmq_close (publisher); zmq_close (syncservice); zmq_ctx_destroy (context); return 0; }
以下是订阅者的代码:
syncsub: Synchronized subscriber in C
#include "../zhelpers.h" #define SUBSCRIBERS_EXPECTED 10 // We wait for 10 subscribers int main (void) { void *context = zmq_ctx_new (); // First, connect our subscriber socket void *subscriber = zmq_socket (context, ZMQ_SUB); zmq_connect (subscriber, "tcp://localhost:5561"); zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0); // 0MQ is so fast, we need to wait a while... sleep (1); // Second, synchronize with publisher void *syncclient = zmq_socket (context, ZMQ_REQ); zmq_connect (syncclient, "tcp://localhost:5562"); // - send a synchronization request s_send (syncclient, ""); // - wait for synchronization reply char *string = s_recv (syncclient); free (string); // Third, get our updates and report how many we got int update_nbr = 0; while (1) { char *string = s_recv (subscriber); if (strcmp (string, "END") == 0) { free (string); break; } free (string); update_nbr++; } printf ("Received %d updates\n", update_nbr); zmq_close (subscriber); zmq_close (syncclient); zmq_ctx_destroy (context); return 0; }
shell脚本
#!/bin/bash echo "正在启动订阅者..." for a in 1 2 3 4 5 6 7 8 9 10; do ./syncsub & done echo "正在启动发布者..." ./syncpub
out:
zf@eappsvr-0:~/ds/zmq/test/syncsub> ./run.sh 正在启动订阅者... 正在启动发布者... Waiting for subscribers Broadcasting messages Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates