ZeroMQ_10 节点协调

当你想要对节点进行协调时,PAIR套接字就不怎么合适了,这也是线程和节点之间的不同之处。一般来说,节点是来去*的,而线程则较为稳定。使用PAIR套接字时,若远程节点断开连接后又进行重连,PAIR不会予以理会。

第二个区别在于,线程的数量一般是固定的,而节点数量则会经常变化。让我们以气象信息模型为基础,看看要怎样进行节点的协调,以保证客户端不会丢失最开始的那些消息。

下面是程序运行逻辑:

  • 发布者知道预期的订阅者数量,这个数字可以任意指定;
  • 发布者启动后会先等待所有订阅者进行连接,也就是节点协调。每个订阅者会使用另一个套接字来告知发布者自己已就绪;
  • 当所有订阅者准备就绪后,发布者才开始发送消息。

ZeroMQ_10 节点协调

 

 

这里我们会使用REQ-REP套接字来同步发布者和订阅者。发布者的代码如下:

syncpub: Synchronized publisher in C

#include "../zhelpers.h"
#define SUBSCRIBERS_EXPECTED  10  //  We wait for 10 subscribers 

int main (void)
{
    void *context = zmq_ctx_new ();

    //  Socket to talk to clients
    void *publisher = zmq_socket (context, ZMQ_PUB);

    int sndhwm = 11000000;
    // ZMQ_SNDHWM:对向外发送的消息设置高水位(最大缓存量)
    zmq_setsockopt (publisher, ZMQ_SNDHWM, &sndhwm, sizeof (int));

    zmq_bind (publisher, "tcp://*:5561");

    //  Socket to receive signals
    void *syncservice = zmq_socket (context, ZMQ_REP);
    zmq_bind (syncservice, "tcp://*:5562");

    //  Get synchronization from subscribers
    printf ("Waiting for subscribers\n");
    int subscribers = 0;
    while (subscribers < SUBSCRIBERS_EXPECTED) {
        //  - wait for synchronization request
        char *string = s_recv (syncservice);
        free (string);
        //  - send synchronization reply
        s_send (syncservice, "");
        subscribers++;
    }
    //  Now broadcast exactly 1M updates followed by END
    printf ("Broadcasting messages\n");
    int update_nbr;
    for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
        s_send (publisher, "Rhubarb");

    s_send (publisher, "END");

    zmq_close (publisher);
    zmq_close (syncservice);
    zmq_ctx_destroy (context);
    return 0;
}

以下是订阅者的代码:

syncsub: Synchronized subscriber in C

#include "../zhelpers.h"
#define SUBSCRIBERS_EXPECTED  10  //  We wait for 10 subscribers 

int main (void)
{
    void *context = zmq_ctx_new ();

    //  First, connect our subscriber socket
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5561");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);

    //  0MQ is so fast, we need to wait a while...
    sleep (1);

    //  Second, synchronize with publisher
    void *syncclient = zmq_socket (context, ZMQ_REQ);
    zmq_connect (syncclient, "tcp://localhost:5562");

    //  - send a synchronization request
    s_send (syncclient, "");

    //  - wait for synchronization reply
    char *string = s_recv (syncclient);
    free (string);

    //  Third, get our updates and report how many we got
    int update_nbr = 0;
    while (1) {
        char *string = s_recv (subscriber);
        if (strcmp (string, "END") == 0) {
            free (string);
            break;
        }
        free (string);
        update_nbr++;
    }
    printf ("Received %d updates\n", update_nbr);

    zmq_close (subscriber);
    zmq_close (syncclient);
    zmq_ctx_destroy (context);
    return 0;
}

shell脚本

#!/bin/bash
echo "正在启动订阅者..."
for a in 1 2 3 4 5 6 7 8 9 10; do
    ./syncsub &
done
echo "正在启动发布者..."
./syncpub

out:

zf@eappsvr-0:~/ds/zmq/test/syncsub> ./run.sh 
正在启动订阅者...
正在启动发布者...
Waiting for subscribers
Broadcasting messages
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates

 

上一篇:ZMQ的相关介绍


下一篇:zmq模块的理解和使用