码迷,mamicode.com
首页 > 编程语言 > 详细

zk系列-c++下zookeeper使用实例

时间:2015-08-12 10:24:27      阅读:383      评论:0      收藏:0      [点我收藏+]

标签:

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务。分布式应用可以使用它来实现诸如:统一命名服务、配置管理、分布式锁服务、集群管理等功能。公司常用到的是Java服务集群的管理。

1.函数介绍

[cpp]  view plain copy 技术分享 技术分享
  1. //create a handle to used communicate with zookeeper  
  2. zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t *clientid, void *context, int flags)  
  3. //create a node synchronously  
  4. int zoo_create(zhandle_t *zh, const char *path, const char *value,int valuelen,  
  5.    const struct ACL_vector *acl, int flags,char *path_buffer, int path_buffer_len);  
  6. //lists the children of a node synchronously.  
  7. int zoo_wget_children(zhandle_t *zh, const char *path, watcher_fn watcher, void* watcherCtx, struct String_vector *strings)  
  8. //close the zookeeper handle and free up any resources.  
  9. int zookeeper_close(zhandle_t *zh)  

2.实例

用上面3个函数,就能创建一个简单的集群管理。

数据存储结构为

技术分享

服务端向Zk注册服务

[cpp]  view plain copy 技术分享 技术分享
  1. //连接zk  
  2. void ZkRegistry::ConnectZK() {  
  3.   if (zhandle_) {  
  4.     zookeeper_close(zhandle_);  
  5.   }  
  6.   int count = 0;  
  7.   do {  
  8.     ++count;  
  9.     zhandle_ = zookeeper_init(zk_hosts_.c_str(),InitWatcher, timeout_, NULL, NULL, 0);  
  10.   } while (!connected_ && count < ZK_MAX_CONNECT_TIMES);  
  11.   
  12.   if (count >= ZK_MAX_CONNECT_TIMES) {  
  13.     SLOG_WARN("ZkRegistry::Init --> connect host " << zk_hosts_ << " over max times:" << count);  
  14.     return;  
  15.   }  
  16. }  
  17.   
  18. //发布服务,建立临时节点  
  19. void ZkRegistry::PublishService() {  
  20.   if (zhandle_ == NULL) {  
  21.     ConnectZK();  
  22.   }  
  23.   string server_path = PING_SERVER + "/" + PingConfig::instance().get_index() + "/"  
  24.     + GetIp() + ":" + PingConfig::instance().get_port();  
  25.   char res_path[128];  
  26.   int rc = zoo_create(zhandle_, server_path.c_str(), GetIp().c_str(), GetIp().size(),  
  27.       &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, res_path, 128);  
  28.   if (rc) {  
  29.     SLOG_INFO("ZkRegistry::PublishService --> zoo_create() path=" << server_path << "," << zerror(rc));  
  30.   }  
  31. }  
  32.   
  33. //每隔10s注册一次服务  
  34. void ZkRegistry::run() {  
  35.   while(true) {  
  36.     SLOG_INFO("publish service");  
  37.     ConnectZK();  
  38.     PublishService();  
  39.     sleep(10);  
  40.   }  
  41. }  
  42.   
  43. void ZkRegistry::InitWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcher_ctx) {  
  44.   if (state == ZOO_CONNECTED_STATE) {  
  45.     connected_ = true;  
  46.     SLOG_INFO("InitWatcher() ZOO_CONNECTED_STATE");  
  47.   } else if (state == ZOO_AUTH_FAILED_STATE) {  
  48.     SLOG_INFO("InitWatcher() ZOO_AUTH_FAILED_STATE");  
  49.   } else if (state == ZOO_EXPIRED_SESSION_STATE) {  
  50.     SLOG_INFO("InitWatcher() ZOO_EXPIRED_SESSION_STATE");  
  51.   } else if (state == ZOO_CONNECTING_STATE) {  
  52.     SLOG_INFO("InitWatcher() ZOO_CONNECTING_STATE");  
  53.   } else if (state == ZOO_ASSOCIATING_STATE) {  
  54.     SLOG_INFO("InitWatcher() ZOO_ASSOCIATING_STATE");  
  55.   }  
  56. }  

客户端获取服务列表

[cpp]  view plain copy 技术分享 技术分享
  1. //连接zk server  
  2. void ZkClient::ConnectZK() {  
  3.   cout << "ZkClient::ConnectZK" << endl;  
  4.   if (zhandle_) {  
  5.     zookeeper_close(zhandle_);  
  6.   }  
  7.   zhandle_ = NULL;  
  8.   connected_ = false;  
  9.   
  10.   int count = 0;  
  11.   do {  
  12.     ++count;  
  13.     zhandle_ = zookeeper_init(zk_hosts_.c_str(), InitWatcher, timeout_, NULL, NULL, 0);  
  14.     sleep(5 * ONE_SECONDS);  
  15.   } while (!connected_ && count < ZK_MAX_CONNECT_TIMES);  
  16.   
  17.   if (count >= ZK_MAX_CONNECT_TIMES){  
  18.     cout << "ZkClient::Init --> connecting zookeeper host: " << zk_hosts_ << " over times: " << count << endl;  
  19.   }  
  20. }  
  21. //更新服务列表,冷备和热备  
  22. void ZkClient::Update() {  
  23.   cout << "ZkClient::Update" << endl;  
  24.   if (zhandle_ == NULL || connected_ == false) {  
  25.     Init();  
  26.   }  
  27.   //获得服务份数  
  28.   struct String_vector str_vec;  
  29.   int ret = zoo_wget_children(zhandle_, PING_SERVER.c_str(), ServiceWatcher, NULL, &str_vec);  
  30.   if (ret) {  
  31.     cout << "Update --> read path:" << PING_SERVER << " wrong, " << zerror(ret) << endl;  
  32.     return;  
  33.   }  
  34.   
  35.   //获得各份服务ip:port  
  36.   for (int i = 0; i < str_vec.count; ++i) {  
  37.     struct String_vector node_vec;  
  38.     string path = PING_SERVER + "/" + str_vec.data[i];  
  39.     int ret = zoo_wget_children(zhandle_, path.c_str(), ServiceWatcher, NULL, &node_vec);  
  40.     cout << "Update --> path:" << path << ", ret:" << ret << ", node‘s size:" << node_vec.count << endl;  
  41.     if (ret || node_vec.count != 1) {  
  42.       continue;  
  43.     }  
  44.     ....  
  45.   }  
  46. }  
  47. //监控服务变化  
  48. void ZkClient::ServiceWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {  
  49.   cout << "type:" << type << endl;  
  50.   cout << "state:" << state << endl;  
  51.   cout << "path:" << path << endl;  
  52. //  cout << "watcherCtx:" << (char*)watcherCtx << endl;  
  53.   cout << "ZOO_CHILD_EVENT:" << ZOO_CHILD_EVENT << endl;  
  54.   if (ZOO_CHILD_EVENT == type) {  
  55.     cout << "ServiceWatcher ZOO_CHILD_EVENT" << endl;  
  56.     ZkClient::Instance().Update();//更新服务列表      
  57.   }  
  58. }  

服务端会每隔一段时间重新注册自己;

客户端在第一次与zk建立连接获取服务列表时,注册监听函数。zk当节点发生变化时,通知客户端,客户端重新获取服务列表,并注册事件。

zk系列-c++下zookeeper使用实例

标签:

原文地址:http://my.oschina.net/sexgirl/blog/491022

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!