标签:sub ++ ber 断开连接 des 一般来说 发布 expec nal
当你想要对节点进行协调时,PAIR套接字就不怎么合适了,这也是线程和节点之间的不同之处。一般来说,节点是来去自由的,而线程则较为稳定。使用PAIR套接字时,若远程节点断开连接后又进行重连,PAIR不会予以理会。
第二个区别在于,线程的数量一般是固定的,而节点数量则会经常变化。让我们以气象信息模型为基础,看看要怎样进行节点的协调,以保证客户端不会丢失最开始的那些消息。
下面是程序运行逻辑:
这里我们会使用REQ-REP套接字来同步发布者和订阅者。发布者的代码如下:
syncpub: Synchronized publisher in C
#include "../zhelpers.h" #define SUBSCRIBERS_EXPECTED 10 // We wait for 10 subscribers int main (void) { void *context = zmq_ctx_new (); // Socket to talk to clients void *publisher = zmq_socket (context, ZMQ_PUB); int sndhwm = 11000000; // ZMQ_SNDHWM:对向外发送的消息设置高水位(最大缓存量) zmq_setsockopt (publisher, ZMQ_SNDHWM, &sndhwm, sizeof (int)); zmq_bind (publisher, "tcp://*:5561"); // Socket to receive signals void *syncservice = zmq_socket (context, ZMQ_REP); zmq_bind (syncservice, "tcp://*:5562"); // Get synchronization from subscribers printf ("Waiting for subscribers\n"); int subscribers = 0; while (subscribers < SUBSCRIBERS_EXPECTED) { // - wait for synchronization request char *string = s_recv (syncservice); free (string); // - send synchronization reply s_send (syncservice, ""); subscribers++; } // Now broadcast exactly 1M updates followed by END printf ("Broadcasting messages\n"); int update_nbr; for (update_nbr = 0; update_nbr < 1000000; update_nbr++) s_send (publisher, "Rhubarb"); s_send (publisher, "END"); zmq_close (publisher); zmq_close (syncservice); zmq_ctx_destroy (context); return 0; }
以下是订阅者的代码:
syncsub: Synchronized subscriber in C
#include "../zhelpers.h" #define SUBSCRIBERS_EXPECTED 10 // We wait for 10 subscribers int main (void) { void *context = zmq_ctx_new (); // First, connect our subscriber socket void *subscriber = zmq_socket (context, ZMQ_SUB); zmq_connect (subscriber, "tcp://localhost:5561"); zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0); // 0MQ is so fast, we need to wait a while... sleep (1); // Second, synchronize with publisher void *syncclient = zmq_socket (context, ZMQ_REQ); zmq_connect (syncclient, "tcp://localhost:5562"); // - send a synchronization request s_send (syncclient, ""); // - wait for synchronization reply char *string = s_recv (syncclient); free (string); // Third, get our updates and report how many we got int update_nbr = 0; while (1) { char *string = s_recv (subscriber); if (strcmp (string, "END") == 0) { free (string); break; } free (string); update_nbr++; } printf ("Received %d updates\n", update_nbr); zmq_close (subscriber); zmq_close (syncclient); zmq_ctx_destroy (context); return 0; }
shell脚本
#!/bin/bash echo "正在启动订阅者..." for a in 1 2 3 4 5 6 7 8 9 10; do ./syncsub & done echo "正在启动发布者..." ./syncpub
out:
zf@eappsvr-0:~/ds/zmq/test/syncsub> ./run.sh 正在启动订阅者... 正在启动发布者... Waiting for subscribers Broadcasting messages Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates Received 1000000 updates
标签:sub ++ ber 断开连接 des 一般来说 发布 expec nal
原文地址:https://www.cnblogs.com/vczf/p/12886588.html