标签:des style blog http color io os 使用 ar
双星模式是一对具有主从机制的高可靠节点。任一时间,某个节点会充当主机,接收所有客户端的请求;另一个则作为一种备机存在。两个节点会互相监控对方,当主机从网络中消失时,备机会替代主机的位置。
双星模式由Pieter Hintjens和Martin Sustrik设计,应用在iMatix的OpenAMQ服务器中。它的设计理念是:
假设我们有一组双星模式的服务器,以下是可能发生的故障:
恢复步骤如下:
恢复过程是人工进行的,惨痛的经验告诉我们自动恢复是很可怕的:
故障的发生会造成10-30秒之间的服务暂停,如果这是一个真正的突发状况,那最好还是让主机暂停服务的好,因为立刻重启服务可能造成另一个10-30秒的暂停,不如让用户停止使用。
当有紧急状况发生时,可以在修复的过程中记录故障发生原因,而不是让系统自动恢复,管理员因此无法用其经验抵御下一次突发状况。
最后,如果自动恢复确实成功了,管理员将无从得知故障的发生原因,因而无法进行分析。
双星模式的故障恢复过程是:在修复了主机的问题后,将备机做关闭处理,稍后再重新开启:
双星模式的关闭过程有两种:
关闭时,间隔时间要比故障切换时间短,否则会导致应用程序失去连接、重新连接、并再次失去连接,导致用户投诉。
双星模式可以非常简单,但能工作得很出色。事实上,这里的实现方法已经历经三个版本了,之前的版本都过于复杂,想要做太多的事情,因而被我们抛弃。我们需要的只是最基本的功能,能够提供易理解、易开发、高可靠的解决方法就可以了。
以下是该架构的详细需求:
我们做如下架假设:
双星模式不会用到:
以下是双星模式中的几个术语:
配置双星模式的步骤:
比较重要的配置是应让两台机器间隔多久检查一次对方的状态,以及多长时间后采取行动。在我们的示例中,故障恢复时间设置为2000毫秒,超过这个时间备机就会代替主机的位置。但若你将主机的服务包裹在一个shell脚本中进行重启,就需要延长这个时间,否则备机可能在主机恢复连接的过程中转换成master。
要让客户端应用程序和双星模式配合,你需要做的是:
这不是件容易的事,所以我们一般会将其封装成一个API,供程序员使用。
双星模式的主要限制有:
“精神分裂”症状指的是一个集群中的不同部分同时认为自己是master,从而停止对对方的检测。双星模式中的算法会降低这种症状的发生几率:主备机在决定自己是否为master时会检测自身是否收到了应用程序的请求,以及对方是否已经从网络中消失。
但在某些情况下,双星模式也会发生精神分裂。比如说,主备机被配置在两幢大楼里,每幢大楼的局域网中又分布了一些应用程序。这样,当两幢大楼的网络通信被阻断,双星模式的主备机就会分别在两幢大楼里接受和处理请求。
为了防止精神分裂,我们必须让主备机使用专用的网络进行连接,最简单的方法当然是用一根双绞线将他们相连。
我们不能将双星部署在两个不同的岛屿上,为各自岛屿的应用程序服务。这种情况下,我们会使用诸如联邦模式的机制进行可靠性设计。
最好但最夸张的做法是,将两台机器之间的连接和应用程序的连接完全隔离开来,甚至是使用不同的网卡,而不仅仅是不同的端口。这样做也是为了日后排查错误时更为明确。
闲话少说,下面是双星模式的服务端代码:
bstarsrv: Binary Star server in C
// // 双星模式 - 服务端 // #include "czmq.h" // 发送状态信息的间隔时间 // 如果对方在两次心跳过后都没有应答,则视为断开 #define HEARTBEAT 1000 // In msecs // 服务器状态枚举 typedef enum { STATE_PRIMARY = 1, // 主机,等待同伴连接 STATE_BACKUP = 2, // 备机,等待同伴连接 STATE_ACTIVE = 3, // 激活态,处理应用程序请求 STATE_PASSIVE = 4 // 被动态,不接收请求 } state_t; // 对话节点事件 typedef enum { PEER_PRIMARY = 1, // 主机 PEER_BACKUP = 2, // 备机 PEER_ACTIVE = 3, // 激活态 PEER_PASSIVE = 4, // 被动态 CLIENT_REQUEST = 5 // 客户端请求 } event_t; // 有限状态机 typedef struct { state_t state; // 当前状态 event_t event; // 当前事件 int64_t peer_expiry; // 判定节点死亡的时限 } bstar_t; // 执行有限状态机(将事件绑定至状态); // 发生异常时返回TRUE。 static Bool s_state_machine (bstar_t *fsm) { Bool exception = FALSE; // 主机等待同伴连接 // 该状态下接收CLIENT_REQUEST事件 if (fsm->state == STATE_PRIMARY) { if (fsm->event == PEER_BACKUP) { printf ("I: 已连接至备机(slave),可以作为master运行。\n"); fsm->state = STATE_ACTIVE; } else if (fsm->event == PEER_ACTIVE) { printf ("I: 已连接至备机(master),可以作为slave运行。\n"); fsm->state = STATE_PASSIVE; } } else // 备机等待同伴连接 // 该状态下拒绝CLIENT_REQUEST事件 if (fsm->state == STATE_BACKUP) { if (fsm->event == PEER_ACTIVE) { printf ("I: 已连接至主机(master),可以作为slave运行。\n"); fsm->state = STATE_PASSIVE; } else if (fsm->event == CLIENT_REQUEST) exception = TRUE; } else // 服务器处于激活态 // 该状态下接受CLIENT_REQUEST事件 if (fsm->state == STATE_ACTIVE) { if (fsm->event == PEER_ACTIVE) { // 若出现两台master,则抛出异常 printf ("E: 严重错误:双master。正在退出。\n"); exception = TRUE; } } else // 服务器处于被动态 // 若同伴已死,CLIENT_REQUEST事件将触发故障恢复 if (fsm->state == STATE_PASSIVE) { if (fsm->event == PEER_PRIMARY) { // 同伴正在重启 - 转为激活态,同伴将转为被动态。 printf ("I: 主机(slave)正在重启,可作为master运行。\n"); fsm->state = STATE_ACTIVE; } else if (fsm->event == PEER_BACKUP) { // 同伴正在重启 - 转为激活态,同伴将转为被动态。 printf ("I: 备机(slave)正在重启,可作为master运行。\n"); fsm->state = STATE_ACTIVE; } else if (fsm->event == PEER_PASSIVE) { // 若出现两台slave,集群将无响应 printf ("E: 严重错误:双slave。正在退出\n"); exception = TRUE; } else if (fsm->event == CLIENT_REQUEST) { // 若心跳超时,同伴将成为master; // 此行为由客户端请求触发。 assert (fsm->peer_expiry > 0); if (zclock_time () >= fsm->peer_expiry) { // 同伴已死,转为激活态。 printf ("I: 故障恢复,可作为master运行。\n"); fsm->state = STATE_ACTIVE; } else // 同伴还在,拒绝请求。 exception = TRUE; } } return exception; } int main (int argc, char *argv []) { // 命令行参数可以为: // -p 作为主机启动, at tcp://localhost:5001 // -b 作为备机启动, at tcp://localhost:5002 zctx_t *ctx = zctx_new (); void *statepub = zsocket_new (ctx, ZMQ_PUB); void *statesub = zsocket_new (ctx, ZMQ_SUB); void *frontend = zsocket_new (ctx, ZMQ_ROUTER); bstar_t fsm = { 0 }; if (argc == 2 && streq (argv [1], "-p")) { printf ("I: 主机master,等待备机(slave)连接。\n"); zsocket_bind (frontend, "tcp://*:5001"); zsocket_bind (statepub, "tcp://*:5003"); zsocket_connect (statesub, "tcp://localhost:5004"); fsm.state = STATE_PRIMARY; } else if (argc == 2 && streq (argv [1], "-b")) { printf ("I: 备机slave,等待主机(master)连接。\n"); zsocket_bind (frontend, "tcp://*:5002"); zsocket_bind (statepub, "tcp://*:5004"); zsocket_connect (statesub, "tcp://localhost:5003"); fsm.state = STATE_BACKUP; } else { printf ("Usage: bstarsrv { -p | -b }\n"); zctx_destroy (&ctx); exit (0); } // 设定下一次发送状态的时间 int64_t send_state_at = zclock_time () + HEARTBEAT; while (!zctx_interrupted) { zmq_pollitem_t items [] = { { frontend, 0, ZMQ_POLLIN, 0 }, { statesub, 0, ZMQ_POLLIN, 0 } }; int time_left = (int) ((send_state_at - zclock_time ())); if (time_left < 0) time_left = 0; int rc = zmq_poll (items, 2, time_left * ZMQ_POLL_MSEC); if (rc == -1) break; // 上下文对象被关闭 if (items [0].revents & ZMQ_POLLIN) { // 收到客户端请求 zmsg_t *msg = zmsg_recv (frontend); fsm.event = CLIENT_REQUEST; if (s_state_machine (&fsm) == FALSE) // 返回应答 zmsg_send (&msg, frontend); else zmsg_destroy (&msg); } if (items [1].revents & ZMQ_POLLIN) { // 收到状态消息,作为事件处理 char *message = zstr_recv (statesub); fsm.event = atoi (message); free (message); if (s_state_machine (&fsm)) break; // 错误,退出。 fsm.peer_expiry = zclock_time () + 2 * HEARTBEAT; } // 定时发送状态信息 if (zclock_time () >= send_state_at) { char message [2]; sprintf (message, "%d", fsm.state); zstr_send (statepub, message); send_state_at = zclock_time () + HEARTBEAT; } } if (zctx_interrupted) printf ("W: 中断\n"); // 关闭套接字和上下文 zctx_destroy (&ctx); return 0; }
下面是客户端代码:
bstarcli: Binary Star client in C
// // 双星模式 - 客户端 // #include "czmq.h" #define REQUEST_TIMEOUT 1000 // 毫秒 #define SETTLE_DELAY 2000 // 超时时间 int main (void) { zctx_t *ctx = zctx_new (); char *server [] = { "tcp://localhost:5001", "tcp://localhost:5002" }; uint server_nbr = 0; printf ("I: 正在连接服务器 %s...\n", server [server_nbr]); void *client = zsocket_new (ctx, ZMQ_REQ); zsocket_connect (client, server [server_nbr]); int sequence = 0; while (!zctx_interrupted) { // 发送请求并等待应答 char request [10]; sprintf (request, "%d", ++sequence); zstr_send (client, request); int expect_reply = 1; while (expect_reply) { // 轮询套接字 zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC); if (rc == -1) break; // 中断 // 处理应答 if (items [0].revents & ZMQ_POLLIN) { // 审核应答编号 char *reply = zstr_recv (client); if (atoi (reply) == sequence) { printf ("I: 服务端应答正常 (%s)\n", reply); expect_reply = 0; sleep (1); // 每秒发送一个请求 } else { printf ("E: 错误的应答内容: %s\n", reply); } free (reply); } else { printf ("W: 服务器无响应,正在重试\n"); // 重开套接字 zsocket_destroy (ctx, client); server_nbr = (server_nbr + 1) % 2; zclock_sleep (SETTLE_DELAY); printf ("I: 正在连接服务端 %s...\n", server [server_nbr]); client = zsocket_new (ctx, ZMQ_REQ); zsocket_connect (client, server [server_nbr]); // 使用新套接字重发请求 zstr_send (client, request); } } } zctx_destroy (&ctx); return 0; }
运行以下命令进行测试,顺序随意:
bstarsrv -p # Start primary
bstarsrv -b # Start backup
bstarcli
可以将主机进程杀掉,测试故障恢复机制;再开启主机,杀掉备机,查看还原机制。要注意是由客户端触发这两个事件的。
下图展现了服务进程的状态图。绿色状态下会接收客户端请求,粉色状态会拒绝请求。事件指的是同伴的状态,所以“同伴激活态”指的是同伴机器告知我们它处于激活态。“客户请求”表示我们从客户端获得了请求,“客户投票”则指我们从客户端获得了请求并且同伴已经超时死亡。
需要注意的是,服务进程使用PUB-SUB套接字进行状态交换,其它类型的套接字在这里不适用。比如,PUSH和DEALER套接字在没有节点相连的时候会发生阻塞;PAIR套接字不会在节点断开后进行重连;ROUTER套接字需要地址才能发送消息。
These are the main limitations of the Binary Star pattern:
我们可以将双星模式打包成一个类似反应堆的类,供以后复用。在C语言中,我们使用czmq的zloop类,其他语言应该会有相应的实现。以下是C语言版的bstar接口:
// 创建双星模式实例,使用本地(绑定)和远程(连接)端点来设置节点对。
bstar_t *bstar_new (int primary, char *local, char *remote);
// 销毁实例
void bstar_destroy (bstar_t **self_p);
// 返回底层的zloop反应堆,用以添加定时器、读取器、注册和取消等功能。
zloop_t *bstar_zloop (bstar_t *self);
// 注册投票读取器
int bstar_voter (bstar_t *self, char *endpoint, int type,
zloop_fn handler, void *arg);
// 注册状态机处理器
void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);
// 开启反应堆,当回调函数返回-1,或进程收到SIGINT、SIGTERM信号时中止。
int bstar_start (bstar_t *self);
以下是类的实现:
bstar: Binary Star core class in C
/* =====================================================================
bstar - Binary Star reactor
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "bstar.h"
// 服务器状态枚举
typedef enum {
STATE_PRIMARY = 1, // 主机,等待同伴连接
STATE_BACKUP = 2, // 备机,等待同伴连接
STATE_ACTIVE = 3, // 激活态,处理应用程序请求
STATE_PASSIVE = 4 // 被动态,不接收请求
} state_t;
// 对话节点事件
typedef enum {
PEER_PRIMARY = 1, // 主机
PEER_BACKUP = 2, // 备机
PEER_ACTIVE = 3, // 激活态
PEER_PASSIVE = 4, // 被动态
CLIENT_REQUEST = 5 // 客户端请求
} event_t;
// 发送状态信息的间隔时间
// 如果对方在两次心跳过后都没有应答,则视为断开
#define BSTAR_HEARTBEAT 1000 // In msecs
// 类结构
struct _bstar_t {
zctx_t *ctx; // 私有上下文
zloop_t *loop; // 反应堆循环
void *statepub; // 状态发布者
void *statesub; // 状态订阅者
state_t state; // 当前状态
event_t event; // 当前事件
int64_t peer_expiry; // 判定节点死亡的时限
zloop_fn *voter_fn; // 投票套接字处理器
void *voter_arg; // 投票处理程序的参数
zloop_fn *master_fn; // 成为master时回调
void *master_arg; // 参数
zloop_fn *slave_fn; // 成为slave时回调
void *slave_arg; // 参数
};
// ---------------------------------------------------------------------
// 执行有限状态机(将事件绑定至状态);
// 发生异常时返回-1,正确时返回0。
static int
s_execute_fsm (bstar_t *self)
{
int rc = 0;
// 主机等待同伴连接
// 该状态下接收CLIENT_REQUEST事件
if (self->state == STATE_PRIMARY) {
if (self->event == PEER_BACKUP) {
zclock_log ("I: 已连接至备机(slave),可以作为master运行。");
self->state = STATE_ACTIVE;
if (self->master_fn)
(self->master_fn) (self->loop, NULL, self->master_arg);
}
else
if (self->event == PEER_ACTIVE) {
zclock_log ("I: 已连接至备机(master),可以作为slave运行。");
self->state = STATE_PASSIVE;
if (self->slave_fn)
(self->slave_fn