标签:mat 进一步 sse ati 官方 复杂 共享数据 clu 解决问题
如果熟悉linux socket编程的同学阅读完了第一章, 一定有一种说不上来的别扭感觉.因为通常情况下, 当我们讨论socket的时候, 我们一般指的是操作系统提供的网络编程接口里的那个socket概念. 而在ZMQ中, 只是借用了这个概念的名字, 在ZMQ中, 我们讨论到socket的时候, 一般指代的是调用zmq_socket()
接口返回的那个socket, 具体一点: zmq socket.
zmq socket比起linux socket来说, 逻辑理解起来比较类似, 虽然两者内部完全就不是同一种东西.
zmq_socket()
, zmq_close()
zmq_setsockopt()
, zmq_getsockopt()
zmq_bind()
, zmq_connect()
zmq_msg_send()
, zmq_msg_recv()
, zmq_send()
, zmq_recv()
但与linux socket不同的是, zmq socket没有listen这个逻辑概念.
需要注意的是, zmq socket是void指针, 而消息则是结构实例. 这就意味着, 在C语言的API中, 需要zmq socket的地方, 传递的一定是值, 而需要传递消息的时候, 比如使用zmq_msg_send()
和zmq_msg_recv()
这样的接口, 消息参数则传递其地址. 其设计哲学是: 在zmq中, socket不归程序员掌控, 所以你可能拿到一个句柄(地址), 但不能看到它长什么样(不能看到socket实例), 但消息是程序员创建的, 是受程序员掌控的.
在两个结点上用ZMQ实现通讯, 你需要分别为两个结点创建socket, 并在其中一个结点上调用zmq_bind()
, 在另一个结点上创建对应的zmq_connect()
. 在ZMQ中, 请不要再以死板的"客户端", "服务端"来区分网络结点. 而要这样理解: zmq_bind()
调用应该发生在网络拓扑中那些不易变的结点上, 而zmq_connect()
应该发生在网络拓扑中那些易变的结点上.
ZMQ建立起的数据连接和常见的TCP连接有一些不同, 但也有一些共通之处, 如下:
在请求-回应套路中, 我们把比较不易变的逻辑结点称为服务端, 把易变, 也就是会经常性的退出, 或重新加入网络拓扑的结点称为客户端. 服务端向外提供服务, 必须提供一个"地址"供客户端去上门, 换句话说, 在这个套路拓扑中, 那些经常来来去去的客户端应该知道去哪找服务端. 但反过来, 服务端完全不关心去哪找客户端, 你爱来不来, 不来就滚, 不要打扰我飞升. 对于不易变的结点, 应该使用zmq_bind()
函数, 对于易变的结点, 应该采用zmq_connect
在传统的linux socket编程中, 如果服务端还没有上线工作, 这个时候去启动客户端程序, 客户端程序的connect()
调用会返回错误. 但在ZMQ中, 它妥善处理了这种情况. 客户端调用zmq_connect()
, 不会报错, 仅会导致消息被阻塞而发不出去.
不要小看这一点设计, 它反映出ZMQ的设计思想: 在请求-应答套路中, 它不光允许客户端可以随时退出, 再回来. 甚至允许服务端去上个厕所.
另外, 一个服务端可以多次调用zmq_bind()
以将自己关联到多个endpoint上.(所谓的endpoint, 就是通讯协议+通讯地址的组合, 它一般情况下指代了在这种通讯协议中的一个网络结点, 但这个结点可以是逻辑性的, 不一定只是一台机器).这就意味着, zmq socket可以同时接受来自多个不同通讯协议的多簇请求消息.
zmq_bind(socket, "tcp://*:5555");
zmq_bind(socket, "tcp://*:999");
zmq_bind(socket, "inproc://suprise_motherfucker");
但是, 对于同一种通讯协议里的同一个endpoint, 你只能对其执行一次zmq_bind()
操作. 这里有个例外, 就是ipc进程间通信. 逻辑上允许另外一个进程去使用之前一个进程已经使用过的ipc endpoint, 但不要滥用这特性: 这只是ZMQ提供给程序崩溃后恢复现场的一种手段, 在正常的代码逻辑中, 不要做这样的事情.
所以看到这里你大概能理解zmq对bind和connect这两个概念的态度: ZMQ努力的将这两个概念之间的差异抹平, 但很遗憾, zmq并没有将这两个操作抽象成一个类似于touch的操作. 但还是请谨记, 在你的网络拓扑中, 让不易变结点去使用zmq_bind()
, 让易变结点去使用zmq_connect
zmq socket是分类型的, 不同类型的socket提供了差异化的服务, socket的类型与结点在拓扑中的角色有关, 也影响着消息的出入, 以及缓存策略. 不同类型的socket之间, 有些可以互相连接, 但有些并不能, 这些规则, 以及如何在套路中为各个结点安排合适类型的socket, 都是后续我们将要讲到的内容.
如果从网络通讯的角度来讲, zmq是一个将传统传输层封装起来的网络库. 但从数据传输, 消息传输, 以及消息缓存这个角度来讲, zmq似乎又称得上是一个消息队列库. 总之, zmq是一个优秀的库, 优秀不是指它的实现, 它的性能, 而是它能解决的问题, 它的设计思路.
在第一章里, 我们接触到了两个有关消息收发的函数, zmq_send()
和zmq_recv()
, 现在, 我们需要把术语规范一下.
zmq_send()
与zmq_recv()
是用来传输"数据"的接口. 而"消息"这个术语, 在zmq中有指定含义, 传递消息的接口是zmq_msg_send()
与zmq_msg_recv()
当我们说起"数据"的时候, 我们指的是二进制串. 当我们说"消息"的时候, 指提是zmq中的一种特定结构体.
需要额外注意的是, 无论是调用zmq_send()
还是zmq_msg_send()
, 当调用返回时, 消息并没有真正被发送出去, 更没有被对方收到. 调用返回只代表zmq将你要发送的"消息"或"数据"放进了一个叫"发送缓冲区"的地方. 这是zmq实现收发异步且带缓冲队列的一个设计.
ZMQ底层封装了三种单播通讯协议, 分别是: 共享内存实现的线程间通讯(inproc), 进程间通信(ipc), 以及TCP/IP协议栈里的TCP协议(tcp). 另外ZMQ底层还封装了两种广播协议: PGM, EPGM. 多播我们在非常后面的章节才会介绍到, 在你了解它之前, 请不要使用多播协议, 即便你是在做一些类似于发布-订阅套路的东西.
对于多数场景来说, 底层协议选用tcp都是没什么问题的. 需要注意的是, zmq中的tcp, 被称为 "无连接的tcp协议", 而之所以起这么一个精神分裂的名字, 是因为zmq允许在对端不存在的情况下, 结点去zmq_connect()
. 你大致可以想象zmq做了多少额外工作, 但这些对于你来说, 对于上层应用程序来说, 是透明了, 你不必去关心具体实现.
IPC通讯类似于tcp, 也是"无连接"的, 目前, 这种方式不能在windows上使用, 很遗憾. 并且, 按照惯例, 在使用ipc作为通讯方式时, 我们一般给endpoint加上一个.ipc
的后缀. 另外, 在Unix操作系统上, 使用ipc连接还请格外的注意不同进程的权限问题, 特别是从属于两个不同用户的进程.
最后来说一下inproc, 也就是线程间通信, 它只能用于同一进程内的不同线程通讯. 比起tcp和ipc, 这种通讯方式快的飞起. 它与tcp和ipc最大的区别是: 在有客户端调用connect之前, 必须确保已经有一个服务端在对应的endpoint上调用了bind, 这个缺陷可能会在未来的某个版本被修正, 但就目前来讲, 请务必小心注意.
很遗憾的是, ZMQ对于其底层封装的网络协议是有侵入性的, 换句话说, 你没法使用ZMQ去实现一个HTTP服务器. HTTP作为一个五层协议, 使用TCP作为传输层协议, 对TCP里的报文格式是有规约限制的, 而ZMQ作为一个封装了TCP的4.5层协议, 其在数据交互时, 已经侵入了TCP的报文格式. 你无法让TCP里的报文既满足HTTP的格式要求, 还满足ZMQ的格式要求.
关心ZMQ到底是如何侵入它封装的通讯协议的, 这个在第三章, 当我们接触到ZMQ_ROUTER_RAW
这种socket配置项的时候才会深入讨论, 目前你只需要明白, ZMQ对其底层封装的通讯协议有侵入.
这意味着, 你无法无损的将ZMQ引入到一些现成的项目中. 这很遗憾.
我们先前提到过, ZMQ在后台使用独立的线程来实现异步I/O处理. 一般情况下吧, 一个I/O线程就应该足以处理当前进程的所有socket的I/O作业, 但是这个凡事总有个极限情况, 所以总会存在一些很荀的场景, 你需要多开几个I/O线程.
当你创建一个context的时候, ZMQ就在背后创建了一个I/O处理线程. 如果这么一个I/O线程不能满足你的需求, 那么就需要在创建context的时候加一些料, 让ZMQ多创建几个I/O处理线程. 一般有一个简单估算I/O线程数量的方法: 每秒你的程序有几个G字节的吞吐量, 你就开几个I/O线程.
下面是自定义I/O线程数量的方法:
int io_threads = 4;
void * context = zmq_ctx_new();
zmq_ctx_set(context, ZMQ_IO_THREADS, io_threads);
assert(zmq_ctx_get(context, ZMQ_IO_THREADS) == io_threads);
回想一下你用linux socket + epoll编写服务端应用程序的套路, 一般都是一个tcp连接专门开一个线程. ZMQ不一样, ZMQ允许你在一个进程里持有上千个连接(不一定是TCP哦), 但处理这上千个连接的I/O作业, 可能只有一个, 或者几个线程而已, 并且事实也证明这样做是可行的. 可能你的进程里只有十几个线程, 但就是能处理超过上千个连接.
当你的程序只使用inproc作为通讯手段的时候, 其实是不需要线程来处理异步I/O的, 因为inproc是通过共享内存实现通讯的. 这个时候你可以手动设置I/O线程的数量为0. 这是一个小小的优化手段, 嗯, 对性能的提升基本为0.
ZMQ的设计是亲套路的, ZMQ的核心其实在于路由与缓存, 这也是为什么作为一个网络库, 它更多的被人从消息队列这个角度了解到的原因. 要用ZMQ实现套路, 关键在于使用正确的socket类型, 然后把拓扑中的socket组装配对起来. 所以, 要懂套路, 就需要懂zmq里的socket类型.
zmq提供了你构建如下套路的手段:
我们在第一章中已经大致接触了套路, 除了一夫一妻没有接触到, 这章稍后些部分我们也将接触这种套路.要了解具体socket的各个类型都是干嘛用的, 可以去阅读zmq_socket()
的manpage, 我建议你去阅读, 并且仔细阅读, 反复阅读.下面列出的是可以互相组合的socket类型. 双方可以替换bind
与connect
操作.
后续你还会看到有XPUB与XSUB两种类型的socket. 就目前来说, 只有上面的socket配对连接是有效的, 其它没列出的组合的行为是未定义的, 但就目前的版本来说, 错误的组合socket类型并不会导致连接时出错, 甚至可能会碰巧按你的预期运行, 但强烈不建议你这个瞎jb搞. 在未来的版本中, 组合非法的socket类型可能会导致API调用出错.
libzmq有两套收发消息的API接口, 这个之前我们已经讲过. 并且在第一章里建议你多使用zmq_send()
与zmq_recv()
, 建议你规避zmq_msg_send()
与zmq_msg_recv()
. 但zmq_recv
有一个缺陷, 就是当你提供给zmq_recv()
接口的接收buffer不够长时, zmq_recv()
会把数据截断. 如果你无法预测你要收到的二进制数据的长度, 那么你只能使用zmq_msg_xxx()
接口.
从接口名上的msg
三个字母就能看出, 这个系列的接口是操纵结构体, 也就是"消息"(其实是帧, 后面会讲到), 而不是"数据", 而非缓冲区的接口, 实际上它们操纵的是zmq_msg_t
类型的结构. 这个系列的接口功能更为丰富, 但使用起来也请务必万分小心.
zmq_msg_init()
, zmq_msg_init_size()
, zmq_msg_init_data()
zmq_msg_send()
, zmq_msg_recv()
zmq_close()
zmq_msg_data()
, zmq_msg_size()
, zmq_msg_more()
zmq_msg_get()
, zmq_msg_set()
zmq_msg_copy()
, zmq_msg_move()
消息结构中封装的数据是二进制的, 依然由程序员自己解释. 关于zmq_msg_t
结构类型, 下面是你需要知道的基础知识:
zmq_msg_t *
. 也就是说这是一个内部实现不对外开放的类型, 创建, 传递, 都应当以指针类型进行操作.zmq_msg_init()
创建一个消息对象, 然后将这个消息对象传递给zmq_msg_recv()
接口zmq_msg_init_size()
创建一个数据容量指定的消息对象, 然后把你要写入的二进制数据通过内存拷贝函数, 比如memcpy()
写入消息中, 最后调用zmq_msg_send()
, 看到这里你应该明白, zmq_msg_init_size()
接口内部进行了内存分配.zmq_msg_t
其实是引用计数方式实现的共享对象类型, "释放"是指当前上下文放弃了对该消息的引用, 内部导致了实例的引用计数-1, 而"销毁"则是彻底把实例本身给free掉了. 当你"释放"一个消息的时候, 应当调用zmq_msg_close()
接口. 如果消息实例在释放后引用计数归0, 那么这个消息实例会被ZMQ自动销毁掉.zmq_msg_data()
接口, 要获取消息中数据的长度, 调用zmq_msg_size()
zmq_msg_move()
, zmq_msg_copy()
, zmq_msg_init_data()
这三个接口zmq_msg_send()
调用将消息发送给socket后, 这个消息内部包装的数据会被清零, 也就是zmq_msg_size() == 0
, 所以, 你不应该连续两次使用同一个zmq_msg_t *
值调用zmq_msg_send()
. 但需要注意的是, 这里的"清零", 并不代表消息被"释放", 也不代表消息被"销毁". 消息还是消息, 只是其中的数据被扔掉了.如果你想把同一段二进制数据发送多次, 正确的做法是下面这样:
zmq_msg_init_size()
, 创建第一个消息, 再通过memcpy
或类似函数将二进制数据写入消息中zmq_msg_init()
创建第二个消息, 再调用zmq_msg_copy()
从第一个消息将数据"复制"过来zmq_msg_send()
发送上面的多个消息ZMQ还支持所谓的"多帧消息", 这种消息允许你把多段二进制数据一次性发送给对端. 这个特性在第三章我们再讲. (P.S.: 这是一个很重要的特性, 路由代理等高级套路就严重依赖这种多帧消息.). ZMQ中的消息有三层逻辑概念: 消息, 帧, 二进制数据. 用户自定义的二进制数据被包装成帧, 然后一个或多个帧组成一个消息. 消息是ZMQ拓扑网络中两个结点收发的单位, 但在ZMQ底层的传输协议中, 最小单位是帧.
换一个角度来讲, ZMQ使用其底层的传输协议, 比如tcp, 比如inproc, 比如ipc来传输数据, 当ZMQ调用这些传输协议传递数据的时候, 最小单元是帧. 帧的完整性由传输协议来保证, 即是ZMQ本身不关心这个帧会不会破损, 帧的完整传输应当由这些传输协议去保证. 而在使用ZMQ构建应用程序的程序员眼中, 最小的传输单位是消息, 一个消息里可能会有多个帧, 程序员不去关心消息从一端到另一端是否会出现丢帧, 消息的完整性与原子性应当由ZMQ库去保证.
前面我们讲过, ZMQ对其底层的传输协议是有侵入性的. 如果要了解ZMQ到底是如何在传输协议的基础上规定帧传输格式的, 可以去阅读这个规范.
在我们到达第三章之前, 我们所讨论的消息中都仅包含一个帧. 这就是为什么在这一小节的描述中, 我们几乎有引导性的让你觉得, zmq_msg_t
类型, 就是"消息", 其实不是, 其实zmq_msg_t
消息只是"帧".
zmq_msg_t
对象zmq_msg_send()
, zmq_msg_recv()
, 你可以一帧一帧的发送数据. 可以用多次调用这些接口的方式来发送一个完整的消息, 或者接收一个完整的消息: 在发送时传入ZMQ_SNDMORE
参数, 或在接收时, 通过zmq_getsockopt()
来获取ZMQ_RCVMORE
选项的值. 更多关于如何使用低级API收发多帧消息的信息, 请参见相关接口的manpage关于消息或帧, 还有下面的一些特性:
zmq_send()
是一致的.zmq_msg_close()
接口来释放这个zmq_msg_t
对象最后再强调一下, 在你不理解zmq_msg_t
的原理之前, 不要使用zmq_msg_init_data()
接口, 这是一个0拷贝接口, 如果不熟悉zmq_msg_t
结构的原理, 瞎jb用, 是会core dump的
在先前的所有例子程序中, 大多程序里干的都是这样的事情
如果你接触过linux中的select, pselect, epoll等多路IO复用接口, 你一定会好奇, 在使用zmq的时候, 如何实现类似的效果呢? 毕竟ZMQ不光把linux socket的细节给你封装了, 连文件描述符都给你屏蔽封装掉了, 显然你没法直接调用类似于select, pselect, epoll这种接口了.
答案是, ZMQ自己搞了一个类似的玩意, zmq_poll()
了解一下.
我们先看一下, 如果没有多路IO接口, 如果我们要从两个socket上接收数据, 我们会怎样做. 下面是一个没什么卵用的示例程序, 它试图从两个socket上读取数据, 使用了异步I/O. (如果你有印象的话, 应该记得对应的两个endpoint实际上是我们在第一章写的两个示例程序的数据生产方: 天气预报程序与村口的大喇叭)
#include <zmq.h>
#include <stdio.h>
int main(void)
{
void * context = zmq_ctx_new();
void * receiver = zmq_socket(context, ZMQ_PULL);
zmq_connect(receiver, "tcp://localhost:5557");
void * subscriber = zmq_socket(context, ZMQ_SUB);
zmq_connect(subscriber, "tcp://localhost:5556");
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
while(1)
{
char msg[256];
while(1)
{
int size = zmq_recv(receiver, msg, 255, ZMQ_DONTWAIT);
if(size != -1)
{
// 接收数据成功
}
else
{
break;
}
}
while(1)
{
int size = zmq_recv(subscriber, msg, 255, ZMQ_DONTWAIT);
if(size == -1)
{
// 接收数据成功
}
else
{
break;
}
}
sleep(1); // 休息一下, 避免疯狂循环
}
zmq_close(receiver);
zmq_close(subscriber);
zmq_ctx_destroy(context);
return 0;
}
在没有多路IO手段之前, 这基本上就是你能做到的最好情形了. 大循环里的sleep()
让人浑身难受. 不加sleep()
吧, 在没有数据的时候, 这个无限空循环能把一个核心的cpu占满. 加上sleep()
吧, 收包又会有最坏情况下1秒的延时.
但有了zmq_poll()
接口就不一样了, 代码就会变成这样:
#include <zmq.h>
#include <stdio.h>
int main(void)
{
void * context = zmq_ctx_new();
void * receiver = zmq_socket(context, ZMQ_PULL);
zmq_connect(receiver, "tcp://localhost:5557");
void * subscriber = zmq_socket(context, ZMQ_SUB);
zmq_connect(subscriber, "tcp://localhost:5556");
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
while(1)
{
char msg[256];
zmq_pollitem_t items[] = {
{receiver, 0, ZMQ_POLLIN, 0},
{subscriber,0, ZMQ_POLLIN, 0},
};
zmq_poll(items, 2, -1);
if(items[0].revents & ZMQ_POLLIN)
{
int size = zmq_recv(receiver, msg, 255, 0);
if(size != -1)
{
// 接收消息成功
}
}
if(items[1].revents & ZMQ_POLLIN)
{
int size = zmq_recv(subscriber, msg, 255, 0);
if(size != -1)
{
// 接收消息成功
}
}
}
zmq_close(receiver);
zmq_close(subscriber);
zmq_ctx_destroy(context);
return 0;
}
zmq_pollitem_t
类型定义如下, 这个定义可以从zmq_poll()
的manpage里查到
typedef struct{
void * socket; // ZMQ的socket
int fd; // 是的, zmq_poll()还可以用来读写linux file descriptor
short events; // 要被监听的事件, 基础事件有 ZMQ_POLLIN 和 ZMQ_POLLOUT, 分别是可读可写
short revents; // 从zmq_poll()调用返回后, 这里存储着触发返回的事件
} zmq_pollitem_t;
我们之前提到过, 用户数据被包装成zmq_msg_t
对象, 也就是帧, 而在帧上, 还有一个逻辑概念叫"消息". 那么在具体编码中, 如何发送多帧消息呢? 而又如何接收多帧消息呢? 简单的讲, 两点:
zmq_msg_send()
传入ZMQ_SNDMORE
选项, 告诉发送接口, "我后面还有其它帧"zmq_msg_recv()
接收一个帧, 就调用一次zmq_msg_more()
或者zmq_getsockopt() + ZMQ_RCVMORE
来判断是否这是消息的最后一个帧发送示例:
zmq_msg_send(&msg, socket, ZMQ_SNDMORE);
zmq_msg_send(&msg, socket, ZMQ_SNDMORE);
zmq_msg_send(&msg, socket, 0); // 消息的最后一个帧
接收示例:
while(1)
{
zmq_msg_t msg;
zmq_msg_init(&msg);
zmq_msg_recv(&msg, socket, 0);
// 做处理
zmq_msg_close(&msg);
if(!zmq_msg_more(&msg)) // 注意, zmq_msg_more可以在zmq_msg_close后被安全的调用
{
break;
}
}
这里有一个需要注意的有趣小细节: 要判断一个收来的帧是不是消息的最后一个帧, 有两种途径, 一种是zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size)
, 另外一种是zmq_msg_more(&msg)
. 前一种途径的入参是socket, 后一种途径的入参是msg. 这真是很因缺思汀. 目前来说, 两种方法都可以, 不过我建议你使用zmq_getsockopt()
, 至于原因嘛, 因为在zmq_msg_recv()
的manpage中, 是这样建议的.
关于多帧消息, 你需要注意以下几点:
zmq_poll()
时, 当socket可读, 并且用zmq_msg_recv()
读出一个帧时, 代表着不用等待下一次循环, 你直接继续读取, 一定能读取能整个消息中剩余的其它所有帧zmq_msg_more()
或zmq_getsockopt() + ZMQ_RCVMORE
检查消息是否接收完整, 你一帧帧的收, 也会把整个消息里的所有帧收集齐. 所以从这个角度看, zmq_msg_more()
可以在把所有可读的帧从socket里统一接收到手之后, 再慢慢判断这些帧应该怎么拼装. 所以这样看, 它和zmq_getsockopt()
的功能也不算是完全重复.ZMQ的目标是建立去中心化的消息通信网络拓扑. 但不要误解"去中心"这三个字, 这并不意味着你的网络拓扑在中心圈内空无一物. 实际上, 用ZMQ搭建的网络拓扑中常常充满了各种非业务处理的网络结点, 我们把这些感知消息, 传递消息, 分发消息, 但不实际处理消息的结点称为"中介", 在ZMQ构建的网络中, 它们按应用场景有多个细化的名字, 比如"代理", "中继", "装置", "掮客"等.
这套逻辑在现实世界里也很常见, 中间人, 中介公司, 它们不实际生产社会价值, 表面上看它们的存在是在吸两头的血, 这些皮条客在社会中的存在意义在于: 它们减少了沟通的复杂度, 对通信双方进行了封装, 提高了社会运行效率.
当构建一个稍有规模的颁式系统的时候, 一个避不开的问题就是, 网络中的结点是如何感知其它结点的存在的? 结点会当机, 会扩容, 在这些变化发生的时候, 网络中的其它正在工作的结点如何感知这些变化, 并保持系统整体正常运行呢? 这就是经典的"动态探索问题".
动态探索问题有一系列很经典的解决方案, 最简单的解决方案就是把问题本身解决掉: 把网络拓扑设计死, 代码都写死, 别让它瞎jb来回变, 问题消灭了, done!. 这种解决方案的缺点就是如果网络拓扑要有变更, 比如业务规模扩展了, 或者有个结点当机了, 网络配置管理员会骂娘.
拓扑规模小的时候, 消灭问题的思路没什么坏处, 但拓扑稍微复杂一点, 显然这就是一个很可笑的解决方案.比如说, 网络中有一个发布者, 有100多个订阅者, 发布者bind到endpoint上, 订阅者connect到endpoint上. 如果代码是写死的, 如果发布者本身出了点什么问题, 或者发布者一台机器搞不住了, 需要横向扩容, 你就得改代码, 然后手动部署到100多台订阅者上. 这样的运维成本太大了.
这种场景, 你就需要一个"中介", 对发布者而言, 它从此无需关心订阅者是谁, 在哪, 有多少人, 只需要把消息给中介就行了. 对于订阅者而言, 它从此无需关注发布者有几个, 是否使用了多个endpoint, 在哪, 有多少人. 只需要向中介索取消息就行了. 虽然这时发布者身上的问题转嫁到的中介身上: 即中介是网络中最易碎的结点, 如果中介挂了整个拓扑就挂了, 但由于中介不处理业务逻辑, 只是一个类似于交换机的存在, 所以同样的机器性能, 中介在单位时间能转发的消息数量, 比发布者和订阅者能处理的消息高一个甚至几个数量级. 是的, 使用中介引入了新的问题, 但解决了老的问题.
中介并没有解决所有问题, 当你引入中介的时候, 中介又变成了网络中最易碎的点, 所以在实际应用中, 要控制中介的权重, 避免整个网络拓扑严重依赖于一个中介这种情况出现: ZMQ提倡去中心化, 不要把中介变成一个垄断市场的掮客.
对于发布者而言, 中介就是订阅者, 而对于订阅者而言, 中介就是发布者. 中介使用两种额外的socket类型: XPUB与XSUB. XSUB与真实的发布者连接, XPUB与真实的订阅者连接.
在我们之前写的请求-回应套路程序中, 我们有一个客户端, 一个服务端. 这是一个十分简化的例子, 实际应用场景中的请求-回应套路中, 一般会有多个客户端与多个服务端.
请求-应答模式有一个隐含的条件: 服务端是无状态的. 否则就不能称之为"请求-应答"套路, 而应该称之为"唠嗑套路".
要连接多个客户端与多个服务端, 有两种思路.
第一种暴力思路就是: 让N个客户端与M个服务端建立起N*M的全连接. 这确实是一个办法, 虽然不是很优雅. 在ZMQ中, 实现起来还轻松不少: 因为ZMQ的socket可以向多个endpoint发起连接, 这对于客户端来说, 编码难度降低了. 客户端应用程序中可以创建一个zmq_socket, 然后connect到多个服务端的endpoint上就行了. 这种思路做的话, 客户端数量扩张很容易, 直接部署就可以, 代码不用改. 但是缺陷有两个:
总的来说, 这是一种很暴力的解决办法, 不适合用于健壮的生产环境. 但是这确实是一个办法.
为了解决上面两个缺陷, 自然而然的我们就会想到: 为什么不能把服务端抽象出来呢? 让一个掮客来做那个唯一的endpoint, 以供所有客户端connect, 然后掮客在背后再把请求体分发给各个服务端, 服务端做出回应后掮客再代替服务端把回应返回给客户端, 这样就解决了上面的问题:
并且, 掮客还可以做到以下
所以, 在请求回应套路中加入掮客, 是一个很明智的选择, 这就是第二种思路, 这种思路不是没有缺陷, 有, 而且很明显: 掮客是整个系统中最脆弱的部分.
但这个缺陷可以在一定程度上克服掉:
ZMQ中, 有两个特殊的socket类型特别适合掮客使用:
关于这两种特殊的socket的特性, 后续我们会仔细深入, 目前来说, 你只需要了解
多说无益, 来看代码. 下面是在客户端与服务端中插入掮客的代码实例:
客户端
#include <zmq.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_REQ);
zmq_connect(socket, "tcp://localhost:5559");
for(int i = 0; i < 10; ++i)
{
s_send(socket, "Hello");
char * strRsp = s_recv(socket);
printf("Received reply %d [%s]\n", i, strRsp);
free(strRsp);
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
服务端
#include <zmq.h>
#include <unistd.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_REP);
zmq_connect(socket, "tcp://localhost:5560");
while(1)
{
char * strReq = s_recv(socket);
printf("Received request: [%s]\n", strReq);
free(strReq);
sleep(1);
s_send(socket, "World");
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
掮客
#include <zmq.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
void * socket_for_client = zmq_socket(context, ZMQ_ROUTER);
void * socket_for_server = zmq_socket(context, ZMQ_DEALER);
zmq_bind(socket_for_client, "tcp://*:5559");
zmq_bind(socket_for_server, "tcp://*:5560");
zmq_pollitem_t items[] = {
{ socket_for_client, 0, ZMQ_POLLIN, 0 },
{ socket_for_server, 0, ZMQ_POLLIN, 0 },
};
while(1)
{
zmq_msg_t message;
zmq_poll(items, 2, -1);
if(items[0].revents & ZMQ_POLLIN)
{
while(1)
{
zmq_msg_init(&message);
zmq_msg_recv(&message, socket_for_client, 0);
int more = zmq_msg_more(&message);
zmq_msg_send(&message, socket_for_server, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&message);
if(!more)
{
break;
}
}
}
if(items[1].revents & ZMQ_POLLIN)
{
while(1)
{
zmq_msg_init(&message);
zmq_msg_recv(&message, socket_for_server, 0);
int more = zmq_msg_more(&message);
zmq_msg_send(&message, socket_for_client, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&message);
if(!more)
{
break;
}
}
}
}
zmq_close(socket_for_client);
zmq_close(socket_for_server);
zmq_ctx_destroy(context);
return 0;
}
客户端和服务端由于掮客的存在, 代码都简单了不少, 对于掮客的代码, 有以下几点需要思考:
s_send
与s_recv
互相传递字符串, 但在掮客那里就需要用zmq_msg_t
进行转发呢?上面三点其实是同一个问题: 掮客是如何实现带会话追踪的转发消息的?
另外, 如果你先启动掮客, 再启动客户端, 再启动服务端. 你会看到在服务端正确启动后, 客户端显示它收到了回包.那么:
这就是有关掮客的第二个问题: 如何配置缓冲区.
本章目前暂时不会对这三个问题做出解答, 大家先思考一下. 我们将在下一章深入掮客的细节进行进一步探索.
在上面的掮客代码示例中, 核心代码就是zmq_poll
对两个socket的监听, 以及while(1)
循环. ZMQ将这两坨操作统一封装到了一个函数中, 省得大家每次都要写boring code.
int zmq_proxy (const void *frontend, const void *backend, const void *capture);
参数frontend
与backend
分别是与客户端相连的socket
和与服务端相连的socket
. 在使用zmq_proxy
函数之前, 这两个socket必须被正确配置好, 该调用connect就调用connect, 该调用bind就调用bind. 简单来讲, zmq_proxy
负责把frontend
与backend
之间的数据互相递送给对方. 而如果仅仅是单纯的递送的话, 第三个参数capture
就应当被置为NULL
, 而如果还想监听一下数据, 那么就再创建一个socket, 并将其值传递给capture
, 这样, frontend
与backend
之间的数据都会有一份拷贝被送到capture
上的socket.
当我们用zmq_proxy
重写上面的掮客代码的话, 代码会非常简洁, 会变成这样:
#include <zmq.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
void * socket_for_client = zmq_socket(context, ZMQ_ROUTER);
void * socket_for_server = zmq_socket(context, ZMQ_DEALER);
zmq_bind(socket_for_client, "tcp://*:5559");
zmq_bind(socket_for_server, "tcp://*:5560");
zmq_proxy(socket_for_client, socket_for_server, NULL);
zmq_close(socket_for_client);
zmq_close(socket_for_server);
zmq_ctx_destroy(context);
return 0;
}
桥接是服务器后端的一种常用技巧. 所谓的桥接有点类似于掮客, 但是解决问题的侧重点不一样. 掮客主要解决了三个问题:
而桥接解决的问题的侧重点主要在:
这种设计思路常用于后台服务的接口层. 接口层一方面连接着后端内部局域网, 另外一方面对公提供服务. 这种服务可以是请求-回应式的服务, 也可以是发布-订阅式的服务(显然发布方在后端内部的局域网里). 这个时候接口层其实就完成了桥接的工作.
其实这种应用场景里, 把这种技巧称为桥接
并不是很合适. 因为桥接
是一个计算机网络中硬件层的术语, 最初是用于线缆过长信号衰减时, 在线缆末端再加一个信号放大器之类的设备, 为通信续命用的.
原版ZMQ文档在这里提出bridging
这个术语, 也只是为了说明一下, zmq_proxy
的适用场景不仅局限于做掮客, 而是应该在理解上更宽泛一点, zmq_proxy
函数就是互相传递两个socket之间数据函数, 仅此而已, 而具体这个函数能应用在什么样的场景下, 掮客与桥接场景均可以使用, 但绝不局限于此. 写代码思维要活.
ZMQ库对待错误, 或者叫异常, 的设计哲学是: 见光死. 前文中写的多数示例代码, 都没有认真的检查ZMQ库函数调用的返回值, 也没有关心它们执行失败后会发生什么. 一般情况下, 这些函数都能正常工作, 但凡事总有个万一, 万一创建socket失败了, 万一bind或connect调用失败了, 会发生什么?
按照见光死的字面意思: 按我们上面写代码的风格, 一旦出错, 程序就挂掉退出了.
所以正确使用ZMQ库的姿势是: 生产环境运行的代码, 务必为每一个ZMQ库函数的调用检查返回值, 考虑调用失败的情况. ZMQ库函数的设计也继续了POSIX接口风格里的一些设计, 这些设计包括:
errno
中, 或zmq_errno()
中zmq_strerror()
可能获得真正健壮的代码, 应该像下面这样写, 是的, 它很啰嗦, 但它很健壮:
// ...
void * context = zmq_ctx_new();
assert(context);
void * socket = zmq_socket(context, ZMQ_REP);
assert(socket);
int rc = zmq_bind(socket, "tcp://*:5555");
if(rc == -1)
{
printf("E: bind failed: %s\n", strerror(errno));
return -1;
}
// ...
有两个比较例外的情况需要你注意一下:
ZMQ_DONTWAIT
的函数返回-1时, 一般情况下不是一个致命错误, 不应当导致程序退出. 比如在收包函数里带上这个标志, 那么语义只是说"没数据可收", 是的, 收包函数会返回-1, 并且会置error
值为EAGAIN
, 但这并不代表程序发生了不可逆转的错误.zmq_ctx_destroy()
时, 如果此时有其它线程在忙, 比如在写数据或者收数据什么的, 那么这会直接导致这些在干活的线程, 调用的这些阻塞式接口函数返回-1, 并且errno
被置为ETERM
. 这种情况在实际编码过程中不应当出现.下面我们写一个健壮的分治套路, 和我们在第一章中写过的类似, 不同的是, 这次, 在监理收到"所有工作均完成"的消息之后, 会发消息给各个工程队, 让工程队停止运行. 这个例子主要有两个目的:
原先的分治套路代码, 使用PUSH/PULL这两种socket类型, 将任务分发给多个工程队. 但在工作做完之后, 工程队的程序还在运行, 工程队的程序无法得知任务什么进修终止. 这里我们再掺入发布-订阅套路, 在工作做完之后, 监理向广大工程队, 通过PUB类型的socket发送"活干活了"的消息, 而工程队用SUB类型的socket一旦收到监理的消息, 就停止运行.
包工头ventilator的代码和上一章的一毛一样, 只是对所有的ZMQ库函数调用增加了错误处理. 照顾大家, 这里再帖一遍
#include <zmq.h>
#include <stdio.h>
#include <time.h>
#include <assert.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
assert(context);
void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
assert(socket_to_sink);
void * socket_to_worker = zmq_socket(context, ZMQ_PUSH);
assert(socket_to_worker);
if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1)
{
printf("E: connect failed: %s\n", strerror(errno));
return -1;
}
if(zmq_bind(socket_to_worker, "tcp://*:5557") == -1)
{
printf("E: bind failed: %s\n", strerror(errno));
return -1;
}
printf("Press Enter when all workers get ready:");
getchar();
printf("Sending tasks to workers...\n");
if(s_send(socket_to_sink, "Get ur ass up") == -1)
{
printf("E: s_send failed: %s\n", strerror(errno));
return -1;
}
srandom((unsigned)time(NULL));
int total_ms = 0;
for(int i = 0; i < 100; ++i)
{
int workload = randof(100) + 1;
total_ms += workload;
char string[10];
snprintf(string, sizeof(string), "%d", workload);
if(s_send(socket_to_worker, string) == -1)
{
printf("E: s_send failed: %s\n", strerror(errno));
return -1;
}
}
printf("Total expected cost: %d ms\n", total_ms);
zmq_close(socket_to_sink);
zmq_close(socket_to_worker);
zmq_ctx_destroy(context);
return 0;
}
接下来是工程队worker的代码, 这一版新增了一个socket_to_sink_of_control
来接收来自监理的停止消息:
#include <zmq.h>
#include <assert.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
assert(context);
void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL);
assert(socket_to_ventilator);
if(zmq_connect(socket_to_ventilator, "tcp://localhost:5557") == -1)
{
printf("E: connect failed: %s\n", strerror(errno));
return -1;
}
void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
assert(socket_to_sink);
if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1)
{
printf("E: connect failed: %s\n", strerror(errno));
return -1;
}
void * socket_to_sink_of_control = zmq_socket(context, ZMQ_SUB);
assert(socket_to_sink_of_control);
if(zmq_connect(socket_to_sink_of_control, "tcp://localhost:5559") == -1)
{
printf("E: connect failed: %s\n", strerror(errno));
return -1;
}
if(zmq_setsockopt(socket_to_sink_of_control, ZMQ_SUBSCRIBE, "", 0) == -1)
{
printf("E: setsockopt failed: %s\n", strerror(errno));
}
zmq_pollitem_t items [] = {
{ socket_to_ventilator, 0, ZMQ_POLLIN, 0 },
{ socket_to_sink_of_control, 0, ZMQ_POLLIN, 0 },
};
while(1)
{
if(zmq_poll(items, 2, -1) == -1)
{
printf("E: poll failed: %s\n", strerror(errno));
return -1;
}
if(items[0].revents & ZMQ_POLLIN)
{
char * strWork = s_recv(socket_to_ventilator);
assert(strWork);
printf("%s.", strWork);
fflush(stdout);
s_sleep(atoi(strWork));
free(strWork);
if(s_send(socket_to_sink, "") == -1)
{
printf("E: s_send failed %s\n", strerror(errno));
return -1;
}
}
if(items[1].revents & ZMQ_POLLIN)
{
break;
}
}
zmq_close(socket_to_ventilator);
zmq_close(socket_to_sink);
zmq_close(socket_to_sink_of_control);
zmq_ctx_destroy(context);
return 0;
}
接下来是监理的代码, 这一版新增了socket_to_worker_of_control
来在任务结束之后给工程队发布停止消息:
#include <zmq.h>
#include <assert.h>
#include <stdint.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
assert(context);
void * socket_to_worker = zmq_socket(context, ZMQ_PULL);
if(zmq_bind(socket_to_worker, "tcp://*:5558") == -1)
{
printf("E: bind failed: %s\n", strerror(errno));
return -1;
}
void * socket_to_worker_of_control = zmq_socket(context, ZMQ_PUB);
if(zmq_bind(socket_to_worker_of_control, "tcp://*:5559") == -1)
{
printf("E: bind failed: %s\n", strerror(errno));
return -1;
}
char * strBeginMsg = s_recv(socket_to_worker);
assert(strBeginMsg);
free(strBeginMsg);
int64_t i64StartTime = s_clock();
for(int i = 0; i < 100; ++i)
{
char * strRes = s_recv(socket_to_worker);
assert(strRes);
free(strRes);
if(i % 10 == 0)
{
printf(":");
}
else
{
printf(".");
}
fflush(stdout);
}
printf("Total elapsed time: %d msec\n", (int)(s_clock() - i64StartTime));
if(s_send(socket_to_worker_of_control, "STOP") == -1)
{
printf("E: s_send failed: %s\n", strerror(errno));
return -1;
}
zmq_close(socket_to_worker);
zmq_close(socket_to_worker_of_control);
zmq_ctx_destroy(context);
return 0;
}
这个例子也展示了如何将多种套路揉合在一个场景中. 所以说写代码, 思维要灵活.
一般情况下, Linux上的程序在接收到诸如SIGINT
和SIGTERM
这样的信号时, 其默认动作是让进程退出. 这种退出信号的默认行为, 只是简单的把进程干掉, 不会管什么缓冲区有没有正确刷新, 也不会管文件以及其它资源句柄是不是正确被释放了.
这对于实际应用场景中的程序来说是不可接受的, 所以在编写后台应用的时候一定要注意这一点: 要妥善的处理POSIX Signal. 限于篇幅, 这里不会对Signal进行进一步讨论, 如果对这部分内容不是很熟悉的话, 请参阅<Unix环境高级编程>(<Advanced Programming in the UNIX Environment>)第十章(chapter 10. Signals).
下面是妥善处理Signal的一个例子
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <assert.h>
#include <string.h>
#include <zmq.h>
#define S_NOTIFY_MSG " "
#define S_ERROR_MSG "Error while writing to self-pipe.\n"
static int s_fd;
static void s_signal_handler(int signal_value)
{
int rc = write(s_fd, S_NOTIFY_MSG, sizeof(S_NOTIFY_MSG));
if(rc != sizeof(S_NOTIFY_MSG))
{
write(STDOUT_FILENO, S_ERROR_MSG, sizeof(S_ERROR_MSG) - 1);
exit(1);
}
}
static void s_catch_signals(int fd)
{
s_fd = fd;
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(void)
{
int rc;
void * context = zmq_ctx_new();
assert(context);
void * socket = zmq_socket(context, ZMQ_REP);
assert(socket);
if(zmq_bind(socket, "tcp://*:5555") == -1)
{
printf("E: bind failed: %s\n", strerror(errno));
return -__LINE__;
}
int pipefds[2];
rc = pipe(pipefds);
if(rc != 0)
{
printf("E: creating self-pipe failed: %s\n", strerror(errno));
return -__LINE__;
}
for(int i = 0; i < 2; ++i)
{
int flags = fcntl(pipefds[0], F_GETFL, 0);
if(flags < 0)
{
printf("E: fcntl(F_GETFL) failed: %s\n", strerror(errno));
return -__LINE__;
}
rc = fcntl(pipefds[0], F_SETFL, flags | O_NONBLOCK);
if(rc != 0)
{
printf("E: fcntl(F_SETFL) failed: %s\n", strerror(errno));
return -__LINE__;
}
}
s_catch_signals(pipefds[1]);
zmq_pollitem_t items[] = {
{ 0, pipefds[0], ZMQ_POLLIN, 0 },
{ socket, 0, ZMQ_POLLIN, 0 },
};
while(1)
{
rc = zmq_poll(items, 2, -1);
if(rc == 0)
{
continue;
}
else if(rc < 0)
{
if(errno == EINTR)
{
continue;
}
else
{
printf("E: zmq_poll failed: %s\n", strerror(errno));
return -__LINE__;
}
}
// Signal pipe FD
if(items[0].revents & ZMQ_POLLIN)
{
char buffer[2];
read(pipefds[0], buffer, 2); // clear notifying bytes
printf("W: interrupt received, killing server...\n");
break;
}
// Read socket
if(items[1].revents & ZMQ_POLLIN)
{
char buffer[255];
rc = zmq_recv(socket, buffer, 255, ZMQ_NOBLOCK);
if(rc <