标签:timeout mode 9.1 hbase 包括 ack 不同的 很多 同步数据
什么zookeeper?
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有java和C两个版本,选举只有Java版本。
zookeeper的原理:
5、集群中大多数的机器得到响应并接受选出的Leader。
为什么zookeeper适合作为注册中心?
Zookeeper的数据模型很简单,有一系列被称为ZNode的数据节点组成,与传统的磁盘文件系统不同的是,zk将全量数据存储在内存中,可谓是高性能,而且支持集群,可谓高可用,另外支持事件监听。这些特点决定了zk特别适合作为注册中心(数据发布/订阅)。
下面介绍两种zookeeper客户端的实现,第一种使用zookeeper自带的原生客户端,第二种使用Apache Curator封装后的zookeeper客户端,第一种接近zookeeper底层的源码,它底层也是用了这些方法,用户使用起来较繁琐,推荐使用第二种,Apache Curator封装后简化了用户的使用。
maven引入:
<!--zookeeper自带的原生客户端依赖引入--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <!--Apache Curator封装后的zookeeper客户端使用依赖引入--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.9.0</version> </dependency>
先介绍第一种zookeeper自带的原生客户端:
package com.wenbing.zookeeper; import org.apache.zookeeper.*; //原生zookeeper客户端使用 public class zookeeperSelfTest { private static final String connectString = "192.168.159.128:2181,192.168.159.133:2181,192.168.159.134:2181"; private static final int sessionTimeout = 3000; public static void main(String[] args) throws Exception { // 创建一个与服务器的连接,需要(服务器的ip+端口)(session过期时间)(Watcher监听注册) ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { // 监听所有被触发的事件 @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听来了:" + watchedEvent.toString()); } }); System.out.println("OK!"); // 创建一个目录节点 /** * CreateMode: * PERSISTENT(持续的,相对于EPHEMERAL,不会随着client的断开而消失 * PERSISTENT_SEQUENTIAL(持久的且带顺序的) * EPHEMERAL (短暂的,生命周期依赖于client session) * EPHEMERAL_SEQUENTIAL (短暂的,带顺序的) */ if (zk.exists("/test01", false) == null) { zk.create("/test01", "goodboy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 创建一个子目录节点 zk.create("/test01/test01", "goodgirl".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(zk.getData("/test01", false,null).toString()); // 取出子目录节点列表 System.out.println(zk.getChildren("/test01", true)); // 创建另一个子目录节点 zk.create("/test01/test02", "goodgirl2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(zk.getChildren("/test01", true)); // 修改子目录节点数据 zk.setData("/test01/test01", "goodboy/boy02".getBytes(), -1); byte[] datas = zk.getData("/test01/test01", false, null); String str = new String(datas, "UTF-8"); System.out.println(str); // 删除整个子目录 -1代表version版本号,-1是删除所有版本 zk.delete("/test01/test01", -1); zk.delete("/test01/test02", -1); zk.delete("/test01", -1); System.out.println(str); Thread.sleep(15000); zk.close(); System.out.println("OK!结束!"); } }
Apache Curator封装的zookeeper客户端使用:
package com.wenbing.zookeeper; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooDefs; import java.util.List; //Apache Curator封装的zookeeperk客户端使用 public class CuratorTest { // psvm快捷键main方法生成 public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.159.128:2181", new RetryNTimes(10, 5000)); // 连接 client.start(); // 获取子节点,顺便监控子节点 List<String> children = client.getChildren().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent watchedEvent) throws Exception { System.out.println("监控:"+watchedEvent); } }).forPath("/"); System.out.println(children); // 创建节点 String result = client.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/test", "Data".getBytes()); System.out.println(result); // 设置节点数据 client.setData().forPath("/test", "111".getBytes()); client.setData().forPath("/test", "222".getBytes()); // 删除节点 System.out.println(client.checkExists().forPath("/test")); // client.delete().withVersion(-1).forPath("/test"); System.out.println(client.checkExists().forPath("/test")); Thread thread = new Thread(new Runnable(){ @Override public void run(){ } }); thread.sleep(Long.MAX_VALUE); client.close(); System.out.println("OK!"); } }
标签:timeout mode 9.1 hbase 包括 ack 不同的 很多 同步数据
原文地址:https://www.cnblogs.com/wenbinshen/p/9895468.html