标签:通过 time des 报错 错误 rsize nbsp text long
简单来说,就是服务端不断发布消息,客户端订阅了就会收到消息。
下面我们看个简单的实力:
Server:
#include <stdlib.h> #include <zmq.h> #include <string.h> #include <unistd.h> #include <time.h> #define buffersize 4096 #define randof(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0)) int main(int argc, char* argv[]) { // [0]创建对象 void* ctx = zmq_ctx_new(); void* publisher = zmq_socket(ctx, ZMQ_PUB); // [1]绑定到5566端口 zmq_bind(publisher, "tcp://*:5566"); // 初始化随机数生成器 srandom ((unsigned) time (NULL)); while (1) { int zipcode, temperature, relhumidity; zipcode = randof (100000); temperature = randof (215) - 80; relhumidity = randof (50) + 10; // Send message to all subscribers char update [20]; sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity); printf("server send: %s\n", update); //s_send (publisher, update); zmq_send (publisher, update, strlen (update), 0); sleep(1); } zmq_close(publisher); zmq_ctx_destroy(ctx); return 0; }
Client:
#include <stdlib.h> #include <zmq.h> #include <string.h> #include <unistd.h> #include <time.h> #include <assert.h> static char *s_recv (void *socket) { char buffer [256]; int size = zmq_recv (socket, buffer, 255, 0); if (size == -1) return NULL; buffer[size] = ‘\0‘; return strndup (buffer, sizeof(buffer) - 1); } int main (int argc, char *argv []) { // [0]创建对象,连接到5566端口 printf ("Collecting updates from weather server...\n"); void *context = zmq_ctx_new (); void *subscriber = zmq_socket (context, ZMQ_SUB); int rc = zmq_connect (subscriber, "tcp://localhost:5566"); assert (rc == 0); // [1]设置过滤条件,设置为空,表示全订阅,这里“1”表示匹配开头为“1”的数据 const char *filter = "1"; rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter)); assert (rc == 0); // [2]接受数据 int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { char *string = s_recv (subscriber); printf ("client: %s\n", string); int zipcode, temperature, relhumidity; sscanf (string, "%d %d %d", &zipcode, &temperature, &relhumidity); total_temp += temperature; free (string); } printf ("Average temperature for zipcode ‘%s‘ was %dF\n", filter, (int) (total_temp / update_nbr)); zmq_close (subscriber); zmq_ctx_destroy (context); return 0; }
out:
// server server send: 43345 -41 19 server send: 44203 110 59 server send: 78038 2 25 server send: 55377 59 18 server send: 40135 -65 36 server send: 37950 43 10 // client zf@eappsvr-0:~/ds/zmq/test/pub_sub> ./client Collecting updates from weather server... client.... client: 10057 67 11 client: 16839 94 25
注意:
需要注意的是,在使用SUB套接字时,必须使用zmq_setsockopt()方法来设置订阅的内容。如果你不设置订阅内容,那将什么消息都收不到,新手很容易犯这个错误。订阅信息可以是任何字符串,可以设置多次。只要消息满足其中一条订阅信息,SUB套接字就会收到。订阅者可以选择不接收某类消息,也是通过zmq_setsockopt()方法实现的。
PUB-SUB套接字组合是异步的。客户端在一个循环体中使用zmq_recv()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用zmq_send()发送消息,但不能在PUB套接字上使用zmq_recv()。
关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。
标签:通过 time des 报错 错误 rsize nbsp text long
原文地址:https://www.cnblogs.com/vczf/p/12751249.html