package cn.enjoy.javaapi;


import org.apache.zookeeper.*;


import java.io.IOException;

import java.util.concurrent.CountDownLatch;


public class TestJavaApi implements Watcher {


    private static final int SESSION_TIMEOUT = 10000;

    private static final String CONNECTION_STRING = "";

    private static final String ZK_PATH = "/leader";

    private ZooKeeper zk = null;


    private CountDownLatch connectedSemaphore = new CountDownLatch(1);



     * 创建ZK连接


     * @param connectString  ZK服务器地址列表

     * @param sessionTimeout Session超时时间


    public void createConnection(String connectString, int sessionTimeout) {


        try {

            zk = new ZooKeeper(connectString, sessionTimeout, this);


        } catch (InterruptedException e) {

            System.out.println("连接创建失败,发生 InterruptedException");


        } catch (IOException e) {

            System.out.println("连接创建失败,发生 IOException");






     * 关闭ZK连接


    public void releaseConnection() {

        if (null != this.zk) {

            try {


            } catch (InterruptedException e) {

                // ignore







     * 创建节点


     * @param path 节点path

     * @param data 初始数据内容

     * @return


    public boolean createPath(String path, String data) {

        try {

            System.out.println("节点创建成功, Path: "

                    + this.zk.create(path, // 节点路径

                    data.getBytes(), // 节点内容

                    ZooDefs.Ids.OPEN_ACL_UNSAFE, //节点权限

                    CreateMode.EPHEMERAL) //节点类型

                    + ", content: " + data);

        } catch (KeeperException e) {



        } catch (InterruptedException e) {

            System.out.println("节点创建失败,发生 InterruptedException");



        return true;




     * 读取指定节点数据内容


     * @param path 节点path

     * @return


    public String readData(String path) {

        try {

            System.out.println("获取数据成功,path" + path);

            return new String(this.zk.getData(path, false, null));

        } catch (KeeperException e) {

            System.out.println("读取数据失败,发生KeeperExceptionpath: " + path);


            return "";

        } catch (InterruptedException e) {

            System.out.println("读取数据失败,发生 InterruptedExceptionpath: " + path);


            return "";





     * 更新指定节点数据内容


     * @param path 节点path

     * @param data 数据内容

     * @return


    public boolean writeData(String path, String data) {

        try {

            System.out.println("更新数据成功,path" + path + ", stat: " +

                    this.zk.setData(path, data.getBytes(), -1));

        } catch (KeeperException e) {

            System.out.println("更新数据失败,发生KeeperExceptionpath: " + path);


        } catch (InterruptedException e) {

            System.out.println("更新数据失败,发生 InterruptedExceptionpath: " + path);



        return false;




     * 删除指定节点


     * @param path 节点path


    public void deleteNode(String path) {

        try {

            this.zk.delete(path, -1);

            System.out.println("删除节点成功,path" + path);

        } catch (KeeperException e) {

            System.out.println("删除节点失败,发生KeeperExceptionpath: " + path);


        } catch (InterruptedException e) {

            System.out.println("删除节点失败,发生 InterruptedExceptionpath: " + path);





    public static void main(String[] args) {


        TestJavaApi sample = new TestJavaApi();

        sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);

        if (sample.createPath(ZK_PATH, "我是节点初始内容")) {


            System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");

            sample.writeData(ZK_PATH, "更新后的数据");

            System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");








     * 收到来自ServerWatcher通知后的处理。



    public void process(WatchedEvent event) {

        System.out.println("收到事件通知:" + event.getState() + "\n");

        if (Event.KeeperState.SyncConnected == event.getState()) {






} Watch机制

package cn.enjoy.javaapi;


import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;


import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.atomic.AtomicInteger;


public class ZooKeeperWatcher implements Watcher  {


    /** 定义原子变量 */

    AtomicInteger seq = new AtomicInteger();

    /** 定义session失效时间 */

    private static final int SESSION_TIMEOUT = 10000;

    /** zookeeper服务器地址 */

    private static final String CONNECTION_ADDR = "";

    /** zk父路径设置 */

    private static final String PARENT_PATH = "/testWatch";

    /** zk子路径设置 */

    private static final String CHILDREN_PATH = "/testWatch/children";

    /** 进入标识 */

    private static final String LOG_PREFIX_OF_MAIN = "Main";

    /** zk变量 */

    private ZooKeeper zk = null;

    /** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);



     * 创建ZK连接

     * @param connectAddr ZK服务器地址列表

     * @param sessionTimeout Session超时时间


    public void createConnection(String connectAddr, int sessionTimeout) {


        try {

            zk = new ZooKeeper(connectAddr, sessionTimeout, this);

            System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");


        } catch (Exception e) {






     * 关闭ZK连接


    public void releaseConnection() {

        if (this.zk != null) {

            try {


            } catch (InterruptedException e) {







     * 创建节点

     * @param path 节点路径

     * @param data 数据内容

     * @return


    public boolean createPath(String path, String data) {

        try {

            //设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)

            this.zk.exists(path, true);

            System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +

                    this.zk.create( /**路径*/







                            CreateMode.PERSISTENT ) +

                    ", content: " + data);

        } catch (Exception e) {


            return false;


        return true;




     * 读取指定节点数据内容

     * @param path 节点路径

     * @return


    public String readData(String path, boolean needWatch) {

        try {

            return new String(this.zk.getData(path, needWatch, null));

        } catch (Exception e) {


            return "";





     * 更新指定节点数据内容

     * @param path 节点路径

     * @param data 数据内容

     * @return


    public boolean writeData(String path, String data) {

        try {

            System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path" + path + ", stat: " +

                    this.zk.setData(path, data.getBytes(), -1));

        } catch (Exception e) {



        return false;




     * 删除指定节点


     * @param path

     *            节点path


    public void deleteNode(String path) {

        try {

            this.zk.delete(path, -1);

            System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path" + path);

        } catch (Exception e) {






     * 判断指定节点是否存在

     * @param path 节点路径


    public Stat exists(String path, boolean needWatch) {

        try {

            return this.zk.exists(path, needWatch);

        } catch (Exception e) {


            return null;





     * 获取子节点

     * @param path 节点路径


    private List<String> getChildren(String path, boolean needWatch) {

        try {

            return this.zk.getChildren(path, needWatch);

        } catch (Exception e) {


            return null;





     * 删除所有节点


    public void deleteAllTestPath() {

        if(this.exists(CHILDREN_PATH, false) != null){



        if(this.exists(PARENT_PATH, false) != null){






     * 收到来自ServerWatcher通知后的处理。



    public void process(WatchedEvent event) {


        System.out.println("进入 process 。。。。。event = " + event);


        try {


        } catch (InterruptedException e) {




        if (event == null) {




        // 连接状态

        Watcher.Event.KeeperState keeperState = event.getState();

        // 事件类型

        Watcher.Event.EventType eventType = event.getType();

        // 受影响的path

        String path = event.getPath();


        String logPrefix = "Watcher-" + this.seq.incrementAndGet() + "";


        System.out.println(logPrefix + "收到Watcher通知");

        System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());

        System.out.println(logPrefix + "事件类型:\t" + eventType.toString());


        if (Event.KeeperState.SyncConnected == keeperState) {

            // 成功连接上ZK服务器

            if (Event.EventType.None == eventType) {

                System.out.println(logPrefix + "成功连接上ZK服务器");




            else if (Event.EventType.NodeCreated == eventType) {

                System.out.println(logPrefix + "节点创建");

                try {


                } catch (InterruptedException e) {



                this.exists(path, true);



            else if (Event.EventType.NodeDataChanged == eventType) {

                System.out.println(logPrefix + "节点数据更新");

                try {


                } catch (InterruptedException e) {



                System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));



            else if (Event.EventType.NodeChildrenChanged == eventType) {

                System.out.println(logPrefix + "子节点变更");

                try {


                } catch (InterruptedException e) {



                System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));



            else if (Event.EventType.NodeDeleted == eventType) {

                System.out.println(logPrefix + "节点 " + path + " 被删除");



        else if (Watcher.Event.KeeperState.Disconnected == keeperState) {

            System.out.println(logPrefix + "ZK服务器断开连接");


        else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {

            System.out.println(logPrefix + "权限检查失败");


        else if (Watcher.Event.KeeperState.Expired == keeperState) {

            System.out.println(logPrefix + "会话失效");








    public static void main(String[] args) throws Exception {



        ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();


        zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);





        // 清理节点



        if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {



            // 读取数据,在操作节点数据之前先调用zookeepergetData()方法是为了可以watch到对节点的操作。watch是一次性的,

            // 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。

             System.out.println("---------------------- read parent ----------------------------");

            zkWatch.readData(PARENT_PATH, true);

            // 更新数据

           zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");




            /** 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged


             如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1watch,输出c1NodeChildrenChanged

             而不会输出创建c2时的NodeChildrenChanged,如果watchc2NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,



            System.out.println("---------------------- read children path ----------------------------");

            zkWatch.getChildren(PARENT_PATH, true);





            // 创建子节点,同理如果想要watchNodeChildrenChanged状态,需要调用getChildren(CHILDREN_PATH, true)

            zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");




            zkWatch.readData(CHILDREN_PATH, true);

            zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");




        // 清理节点






} ZK认证机制

package cn.enjoy.javaapi;


import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Stat;


import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.atomic.AtomicInteger;


public class TestZookeeperAuth implements Watcher {


    /** 连接地址 */

    final static String CONNECT_ADDR = "";

    /** 测试路径 */

    final static String PATH = "/testAuth";

    final static String PATH_DEL = "/testAuth/delNode";

    /** 认证类型 */

    final static String authentication_type = "digest";

    /** 认证正确方法 */

    final static String correctAuthentication = "123456";

    /** 认证错误方法 */

    final static String badAuthentication = "654321";


    static ZooKeeper zk = null;

    /** 计时器 */

    AtomicInteger seq = new AtomicInteger();

    /** 标识 */

    private static final String LOG_PREFIX_OF_MAIN = "Main";


    private CountDownLatch connectedSemaphore = new CountDownLatch(1);



    public void process(WatchedEvent event) {

        try {


        } catch (InterruptedException e) {



        if (event==null) {



        // 连接状态

        Event.KeeperState keeperState = event.getState();

        // 事件类型

        Event.EventType eventType = event.getType();

        // 受影响的path

        String path = event.getPath();


        String logPrefix = "Watcher-" + this.seq.incrementAndGet() + "";


        System.out.println(logPrefix + "收到Watcher通知");

        System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());

        System.out.println(logPrefix + "事件类型:\t" + eventType.toString());

        if (Event.KeeperState.SyncConnected == keeperState) {

            // 成功连接上ZK服务器

            if (Event.EventType.None == eventType) {

                System.out.println(logPrefix + "成功连接上ZK服务器");



        } else if (Event.KeeperState.Disconnected == keeperState) {

            System.out.println(logPrefix + "ZK服务器断开连接");

        } else if (Event.KeeperState.AuthFailed == keeperState) {

            System.out.println(logPrefix + "权限检查失败");

        } else if (Event.KeeperState.Expired == keeperState) {

            System.out.println(logPrefix + "会话失效");





     * 创建ZK连接


     * @param connectString

     *            ZK服务器地址列表

     * @param sessionTimeout

     *            Session超时时间


    public void createConnection(String connectString, int sessionTimeout) {


        try {

            zk = new ZooKeeper(connectString, sessionTimeout, this);



            System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");



        } catch (Exception e) {






     * 关闭ZK连接


    public void releaseConnection() {

        if (this.zk!=null) {

            try {


            } catch (InterruptedException e) {







     * @param args

     * @throws Exception


    public static void main(String[] args) throws Exception {


        TestZookeeperAuth testAuth = new TestZookeeperAuth();



        List<ACL> acls = new ArrayList<ACL>(1);

        for (ACL ids_acl : ZooDefs.Ids.CREATOR_ALL_ACL) {




        try {

            zk.create(PATH, "init content".getBytes(), acls, CreateMode.PERSISTENT);

            System.out.println("使用授权key" + correctAuthentication + "创建节点:"+ PATH + ", 初始内容是: init content");

        } catch (Exception e) {



        try {

            zk.create(PATH_DEL, "will be deleted! ".getBytes(), acls, CreateMode.PERSISTENT);

            System.out.println("使用授权key" + correctAuthentication + "创建节点:"+ PATH_DEL + ", 初始内容是: init content");

        } catch (Exception e) {




        // 获取数据





        // 更新数据





        // 删除数据











    /** 获取数据:采用错误的密码 */

    static void getDataByBadAuthentication() {

        String prefix = "[使用错误的授权信息]";

        try {

            ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);




            System.out.println(prefix + "获取数据:" + PATH);

            System.out.println(prefix + "成功获取数据:" + badzk.getData(PATH, false, null));

        } catch (Exception e) {

            System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());




    /** 获取数据:不采用密码 */

    static void getDataByNoAuthentication() {

        String prefix = "[不使用任何授权信息]";

        try {

            System.out.println(prefix + "获取数据:" + PATH);

            ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);


            System.out.println(prefix + "成功获取数据:" + nozk.getData(PATH, false, null));

        } catch (Exception e) {

            System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());




    /** 采用正确的密码 */

    static void getDataByCorrectAuthentication() {

        String prefix = "[使用正确的授权信息]";

        try {

            System.out.println(prefix + "获取数据:" + PATH);


            System.out.println(prefix + "成功获取数据:" + zk.getData(PATH, false, null));

        } catch (Exception e) {

            System.out.println(prefix + "获取数据失败,原因:" + e.getMessage());





     * 更新数据:不采用密码


    static void updateDataByNoAuthentication() {


        String prefix = "[不使用任何授权信息]";


        System.out.println(prefix + "更新数据: " + PATH);

        try {

            ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);


            Stat stat = nozk.exists(PATH, false);

            if (stat!=null) {

                nozk.setData(PATH, prefix.getBytes(), -1);

                System.out.println(prefix + "更新成功");


        } catch (Exception e) {

            System.err.println(prefix + "更新失败,原因是:" + e.getMessage());





     * 更新数据:采用错误的密码


    static void updateDataByBadAuthentication() {


        String prefix = "[使用错误的授权信息]";


        System.out.println(prefix + "更新数据:" + PATH);

        try {

            ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);




            Stat stat = badzk.exists(PATH, false);

            if (stat!=null) {

                badzk.setData(PATH, prefix.getBytes(), -1);

                System.out.println(prefix + "更新成功");


        } catch (Exception e) {

            System.err.println(prefix + "更新失败,原因是:" + e.getMessage());





     * 更新数据:采用正确的密码


    static void updateDataByCorrectAuthentication() {


        String prefix = "[使用正确的授权信息]";


        System.out.println(prefix + "更新数据:" + PATH);

        try {

            Stat stat = zk.exists(PATH, false);

            if (stat!=null) {

                zk.setData(PATH, prefix.getBytes(), -1);

                System.out.println(prefix + "更新成功");


        } catch (Exception e) {

            System.err.println(prefix + "更新失败,原因是:" + e.getMessage());





     * 不使用密码 删除节点


    static void deleteNodeByNoAuthentication() throws Exception {


        String prefix = "[不使用任何授权信息]";


        try {

            System.out.println(prefix + "删除节点:" + PATH_DEL);

            ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);


            Stat stat = nozk.exists(PATH_DEL, false);

            if (stat!=null) {


                System.out.println(prefix + "删除成功");


        } catch (Exception e) {

            System.err.println(prefix + "删除失败,原因是:" + e.getMessage());





     * 采用错误的密码删除节点


    static void deleteNodeByBadAuthentication() throws Exception {


        String prefix = "[使用错误的授权信息]";


        try {

            System.out.println(prefix + "删除节点:" + PATH_DEL);

            ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);




            Stat stat = badzk.exists(PATH_DEL, false);

            if (stat!=null) {

                badzk.delete(PATH_DEL, -1);

                System.out.println(prefix + "删除成功");


        } catch (Exception e) {

            System.err.println(prefix + "删除失败,原因是:" + e.getMessage());





     * 使用正确的密码删除节点


    static void deleteNodeByCorrectAuthentication() throws Exception {


        String prefix = "[使用正确的授权信息]";


        try {

            System.out.println(prefix + "删除节点:" + PATH_DEL);

            Stat stat = zk.exists(PATH_DEL, false);

            if (stat!=null) {

                zk.delete(PATH_DEL, -1);

                System.out.println(prefix + "删除成功");


        } catch (Exception e) {

            System.out.println(prefix + "删除失败,原因是:" + e.getMessage());





     * 使用正确的密码删除节点


    static void deleteParent() throws Exception {

        try {

            Stat stat = zk.exists(PATH_DEL, false);

            if (stat == null) {

                zk.delete(PATH, -1);


        } catch (Exception e) {








1.1.1. ZkClient 基本操作

 package cn.enjoy.zkclient;


import org.I0Itec.zkclient.ZkClient;

import org.I0Itec.zkclient.ZkConnection;


import java.util.List;



 * Created by VULCAN on 2018/11/7.


public class ZkClientOperator {


    /** zookeeper地址 */

    static final String CONNECT_ADDR = "";

    /** session超时时间 */

    static final int SESSION_OUTTIME = 10000;//ms



    public static void main(String[] args) throws Exception {

       // ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

        ZkClient zkc = new ZkClient(CONNECT_ADDR, SESSION_OUTTIME);


        //1. create and delete方法


        zkc.createPersistent("/super/c1", true);





        //2. 设置pathdata 并且读取子节点和每个节点的内容

        zkc.createPersistent("/super", "1234");

        zkc.createPersistent("/super/c1", "c1内容");

        zkc.createPersistent("/super/c2", "c2内容");

        List<String> list = zkc.getChildren("/super");

        for(String p : list){


            String rp = "/super/" + p;

            String data = zkc.readData(rp);

            System.out.println("节点为:" + rp + ",内容为: " + data);



        //3. 更新和判断节点是否存在

        zkc.writeData("/super/c1", "新内容");




// 4.递归删除/super内容



} 监听机制

package cn.enjoy.zkclient;


import org.I0Itec.zkclient.IZkChildListener;

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

import org.I0Itec.zkclient.ZkConnection;

import org.junit.Test;


import java.util.List;


public class TestZkClientWatcher {



    /** zookeeper地址 */

    static final String CONNECT_ADDR = "";

    /** session超时时间 */

    static final int SESSION_OUTTIME = 10000;//ms





     * subscribeChildChanges方法 订阅子节点变化


    public  void testZkClientWatcher1() throws Exception {

        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);



        zkc.subscribeChildChanges("/super", new IZkChildListener() {


            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {

                System.out.println("parentPath: " + parentPath);

                System.out.println("currentChilds: " + currentChilds);









        zkc.createPersistent("/super" + "/" + "c1", "c1内容");



        zkc.createPersistent("/super" + "/" + "c2", "c2内容");













     * subscribeDataChanges 订阅内容变化


    public void testZkClientWatcher2() throws Exception {

        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);


        zkc.createPersistent("/super", "1234");



        zkc.subscribeDataChanges("/super", new IZkDataListener() {


            public void handleDataDeleted(String path) throws Exception {

                System.out.println("删除的节点为:" + path);




            public void handleDataChange(String path, Object data) throws Exception {

                System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);





        zkc.writeData("/super", "456", -1);










1.1.2. Curator  基本操作

package cn.enjoy.curator;


import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.api.BackgroundCallback;

import org.apache.curator.framework.api.CuratorEvent;

import org.apache.curator.framework.api.CuratorListener;

import org.apache.curator.framework.api.transaction.CuratorOp;

import org.apache.curator.framework.api.transaction.CuratorTransactionResult;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.data.Stat;

import org.junit.Before;

import org.junit.Test;



import java.util.List;


import static com.sun.xml.internal.ws.dump.LoggingDumpTube.Position.Before;



 *  测试Apache Curator框架的基本用法


public class OperatorTest {


    private static final String SERVER = "";



    private final int SESSION_TIMEOUT = 30000;



    private final int CONNECTION_TIMEOUT = 5000;



    private CuratorFramework client = null;



     * baseSleepTimeMs:初始的重试等待时间

     * maxRetries:最多重试次数



     * ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增

     * RetryNTimes:重试N

     * RetryOneTime:重试一次

     * RetryUntilElapsed:重试一定时间


    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);



    public void init(){

        //创建 CuratorFrameworkImpl实例

        client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);







     * 测试创建节点

     * @throws Exception



    public void testCreate() throws Exception{


        client.create().forPath("/curator","/curator data".getBytes());



        client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential","/curator_sequential data".getBytes());




                .forPath("/curator/ephemeral","/curator/ephemeral data".getBytes());




                .forPath("/curator/ephemeral_path1","/curator/ephemeral_path1 data".getBytes());







     * 测试检查某个节点是否存在

     * @throws Exception



    public void testCheck() throws Exception{

        Stat stat1 = client.checkExists().forPath("/curator");

        Stat stat2 = client.checkExists().forPath("/curator2");


        System.out.println("‘/curator‘是否存在: " + (stat1 != null ? true : false));

        System.out.println("‘/curator2‘是否存在: " + (stat2 != null ? true : false));





     * 测试异步设置节点数据

     * @throws Exception



    public void testSetDataAsync() throws Exception{


        CuratorListener listener = new CuratorListener() {



            public void eventReceived(CuratorFramework client, CuratorEvent event)

                    throws Exception {


















     * 测试另一种异步执行获取通知的方式

     * @throws Exception



    public void testSetDataAsyncWithCallback() throws Exception{

        BackgroundCallback callback = new BackgroundCallback() {



            public void processResult(CuratorFramework client, CuratorEvent event)

                    throws Exception {






        client.setData().inBackground(callback).forPath("/curator","/curator modified data with Callback".getBytes());








     * 测试删除节点

     * @throws Exception



    public void testDelete() throws Exception{



                .forPath("/curator/del_key1","/curator/del_key1 data".getBytes());



                .forPath("/curator/del_key2","/curator/del_key2 data".getBytes());


        client.create().forPath("/curator/del_key2/test_key","test_key data".getBytes());












     * 测试事务管理:碰到异常,事务会回滚

     * @throws Exception



    public void testTransaction() throws Exception{


        CuratorOp createOp = client.transactionOp().create()

                .forPath("/curator/one_path","some data".getBytes());


        CuratorOp setDataOp = client.transactionOp().setData()

                .forPath("/curator","other data".getBytes());


        CuratorOp deleteOp = client.transactionOp().delete()




        List<CuratorTransactionResult> results = client.transaction()




        for(CuratorTransactionResult result : results){

            System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());





package cn.enjoy.curator;


import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.api.CuratorEvent;

import org.apache.curator.framework.api.CuratorListener;

import org.apache.curator.framework.recipes.cache.*;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.junit.Test;


import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;



public class EventTest {



    private static final String SERVER = "";



    private final int SESSION_TIMEOUT = 30000;



    private final int CONNECTION_TIMEOUT = 5000;



    private CuratorFramework client = null;



     * baseSleepTimeMs:初始的重试等待时间

     * maxRetries:最多重试次数



     * ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增

     * RetryNTimes:重试N

     * RetryOneTime:重试一次

     * RetryUntilElapsed:重试一定时间


    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);



    public void init(){

        //创建 CuratorFrameworkImpl实例

        client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);








     * @描述:第一种监听器的添加方式: 对指定的节点进行添加操作

     * 仅仅能监控指定的本节点的数据修改,删除 操作 并且只能监听一次 --->不好




    public  void TestListenterOne() throws Exception{



        // 注册观察者,当节点变动时触发

        byte[] data = client.getData().usingWatcher(new Watcher() {


            public void process(WatchedEvent event) {

                System.out.println("获取 test 节点 监听器 : " + event);








        System.out.println("节点数据: "+ new String(data));







     * @描述:第二种监听器的添加方式: Cache 的三种实现

     *   Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。

     *                  产生的事件会传递给注册的PathChildrenCacheListener

     *  Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。

     *  Tree CachePath CacheNode Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。



    //1.path Cache  连接  路径  是否获取数据

    //能监听所有的字节点 且是无限监听的模式 但是 指定目录下节点的子节点不再监听


    public void setListenterTwoOne() throws Exception{

        ExecutorService pool = Executors.newCachedThreadPool();

        PathChildrenCache childrenCache = new PathChildrenCache(client, "/test", true);

        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {


            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {


                ChildData data = event.getData();

                switch (event.getType()) {

                    case CHILD_ADDED:

                        System.out.println("CHILD_ADDED : "+ data.getPath() +"  数据:"+ data.getData());


                    case CHILD_REMOVED:

                        System.out.println("CHILD_REMOVED : "+ data.getPath() +"  数据:"+ data.getData());


                    case CHILD_UPDATED:

                        System.out.println("CHILD_UPDATED : "+ data.getPath() +"  数据:"+ data.getData());


                    case INITIALIZED:

                        System.out.println("INITIALIZED : "+ data.getPath() +"  数据:"+ data.getData());








        System.out.println("Register zk watcher successfully!");
















    //2.Node Cache  监控本节点的变化情况   连接 目录 是否压缩

    //监听本节点的变化  节点可以进行修改操作  删除节点后会再次创建(空节点)


    public void setListenterTwoTwo() throws Exception{

        ExecutorService pool = Executors.newCachedThreadPool();


        final NodeCache nodeCache = new NodeCache(client, "/test", false);

        nodeCache.getListenable().addListener(new NodeCacheListener() {


            public void nodeChanged() throws Exception {

                System.out.println("the test node is change and result is :");

                System.out.println("path : "+nodeCache.getCurrentData().getPath());

                System.out.println("data : "+new String(nodeCache.getCurrentData().getData()));

                System.out.println("stat : "+nodeCache.getCurrentData().getStat());










    //3.Tree Cache

    // 监控 指定节点和节点下的所有的节点的变化--无限监听  可以进行本节点的删除(不在创建)


    public void TestListenterTwoThree() throws Exception{

        ExecutorService pool = Executors.newCachedThreadPool();


        TreeCache treeCache = new TreeCache(client, "/test");


        treeCache.getListenable().addListener(new TreeCacheListener() {


            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {

                ChildData data = event.getData();

                if(data !=null){

                    switch (event.getType()) {

                        case NODE_ADDED:

                            System.out.println("NODE_ADDED : "+ data.getPath() +"  数据:"+ new String(data.getData()));


                        case NODE_REMOVED:

                            System.out.println("NODE_REMOVED : "+ data.getPath() +"  数据:"+ new String(data.getData()));


                        case NODE_UPDATED:

                            System.out.println("NODE_UPDATED : "+ data.getPath() +"  数据:"+ new String(data.getData()));







                    System.out.println( "data is null : "+ event.getType());
























