码迷,mamicode.com
首页 > 其他好文 > 详细

Zookeeper原生客户端

时间:2019-08-21 23:19:23      阅读:153      评论:0      收藏:0      [点我收藏+]

标签:his   round   ram   tde   列表   标识   无限   结果   获取   

1.1.1.1. 客户端基本操作

package cn.enjoy.javaapi;

 

import org.apache.zookeeper.*;

 

import java.io.IOException;

import java.util.concurrent.CountDownLatch;

 

public class TestJavaApi implements Watcher {

 

    private static final int SESSION_TIMEOUT = 10000;

    private static final String CONNECTION_STRING = "192.168.30.10:2181";

    private static final String ZK_PATH = "/leader";

    private ZooKeeper zk = null;

 

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

 

    /**

     * 创建ZK连接

     *

     * @param connectString  ZK服务器地址列表

     * @param sessionTimeout Session超时时间

     */

    public void createConnection(String connectString, int sessionTimeout) {

        this.releaseConnection();

        try {

            zk = new ZooKeeper(connectString, sessionTimeout, this);

            connectedSemaphore.await();

        } catch (InterruptedException e) {

            System.out.println("连接创建失败,发生 InterruptedException");

            e.printStackTrace();

        } catch (IOException e) {

            System.out.println("连接创建失败,发生 IOException");

            e.printStackTrace();

        }

    }

 

    /**

     * 关闭ZK连接

     */

    public void releaseConnection() {

        if (null != this.zk) {

            try {

                this.zk.close();

            } catch (InterruptedException e) {

                // ignore

                e.printStackTrace();

            }

        }

    }

 

    /**

     * 创建节点

     *

     * @param path 节点path

     * @param data 初始数据内容

     * @return

     */

    public boolean createPath(String path, String data) {

        try {

            System.out.println("节点创建成功, Path: "

                    + this.zk.create(path, // 节点路径

                    data.getBytes(), // 节点内容

                    ZooDefs.Ids.OPEN_ACL_UNSAFE, //节点权限

                    CreateMode.EPHEMERAL) //节点类型

                    + ", content: " + data);

        } catch (KeeperException e) {

            System.out.println("节点创建失败,发生KeeperException");

            e.printStackTrace();

        } catch (InterruptedException e) {

            System.out.println("节点创建失败,发生 InterruptedException");

            e.printStackTrace();

        }

        return true;

    }

 

    /**

     * 读取指定节点数据内容

     *

     * @param path 节点path

     * @return

     */

    public String readData(String path) {

        try {

            System.out.println("获取数据成功,path" + path);

            return new String(this.zk.getData(path, false, null));

        } catch (KeeperException e) {

            System.out.println("读取数据失败,发生KeeperExceptionpath: " + path);

            e.printStackTrace();

            return "";

        } catch (InterruptedException e) {

            System.out.println("读取数据失败,发生 InterruptedExceptionpath: " + path);

            e.printStackTrace();

            return "";

        }

    }

 

    /**

     * 更新指定节点数据内容

     *

     * @param path 节点path

     * @param data 数据内容

     * @return

     */

    public boolean writeData(String path, String data) {

        try {

            System.out.println("更新数据成功,path" + path + ", stat: " +

                    this.zk.setData(path, data.getBytes(), -1));

        } catch (KeeperException e) {

            System.out.println("更新数据失败,发生KeeperExceptionpath: " + path);

            e.printStackTrace();

        } catch (InterruptedException e) {

            System.out.println("更新数据失败,发生 InterruptedExceptionpath: " + path);

            e.printStackTrace();

        }

        return false;

    }

 

    /**

     * 删除指定节点

     *

     * @param path 节点path

     */

    public void deleteNode(String path) {

        try {

            this.zk.delete(path, -1);

            System.out.println("删除节点成功,path" + path);

        } catch (KeeperException e) {

            System.out.println("删除节点失败,发生KeeperExceptionpath: " + path);

            e.printStackTrace();

        } catch (InterruptedException e) {

            System.out.println("删除节点失败,发生 InterruptedExceptionpath: " + path);

            e.printStackTrace();

        }

    }

 

    public static void main(String[] args) {

 

        TestJavaApi sample = new TestJavaApi();

        sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);

        if (sample.createPath(ZK_PATH, "我是节点初始内容")) {

            System.out.println();

            System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");

            sample.writeData(ZK_PATH, "更新后的数据");

            System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");

            sample.deleteNode(ZK_PATH);

        }

 

        sample.releaseConnection();

    }

 

    /**

     * 收到来自ServerWatcher通知后的处理。

     */

    @Override

    public void process(WatchedEvent event) {

        System.out.println("收到事件通知:" + event.getState() + "\n");

        if (Event.KeeperState.SyncConnected == event.getState()) {

            connectedSemaphore.countDown();

        }

 

    }

 

}

 

1.1.1.2. Watch机制

package cn.enjoy.javaapi;

 

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

 

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.atomic.AtomicInteger;

 

public class ZooKeeperWatcher implements Watcher  {

 

    /** 定义原子变量 */

    AtomicInteger seq = new AtomicInteger();

    /** 定义session失效时间 */

    private static final int SESSION_TIMEOUT = 10000;

    /** zookeeper服务器地址 */

    private static final String CONNECTION_ADDR = "192.168.30.10:2181";

    /** zk父路径设置 */

    private static final String PARENT_PATH = "/testWatch";

    /** zk子路径设置 */

    private static final String CHILDREN_PATH = "/testWatch/children";

    /** 进入标识 */

    private static final String LOG_PREFIX_OF_MAIN = "Main";

    /** zk变量 */

    private ZooKeeper zk = null;

    /** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

 

    /**

     * 创建ZK连接

     * @param connectAddr ZK服务器地址列表

     * @param sessionTimeout Session超时时间

     */

    public void createConnection(String connectAddr, int sessionTimeout) {

        this.releaseConnection();

        try {

            zk = new ZooKeeper(connectAddr, sessionTimeout, this);

            System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");

            connectedSemaphore.await();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

 

    /**

     * 关闭ZK连接

     */

    public void releaseConnection() {

        if (this.zk != null) {

            try {

                this.zk.close();

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

 

    /**

     * 创建节点

     * @param path 节点路径

     * @param data 数据内容

     * @return

     */

    public boolean createPath(String path, String data) {

        try {

            //设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)

            this.zk.exists(path, true);

            System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +

                    this.zk.create( /**路径*/

                            path,

                            /**数据*/

                            data.getBytes(),

                            /**所有可见*/

                            ZooDefs.Ids.OPEN_ACL_UNSAFE,

                            /**永久存储*/

                            CreateMode.PERSISTENT ) +

                    ", content: " + data);

        } catch (Exception e) {

            e.printStackTrace();

            return false;

        }

        return true;

    }

 

    /**

     * 读取指定节点数据内容

     * @param path 节点路径

     * @return

     */

    public String readData(String path, boolean needWatch) {

        try {

            return new String(this.zk.getData(path, needWatch, null));

        } catch (Exception e) {

            e.printStackTrace();

            return "";

        }

    }

 

    /**

     * 更新指定节点数据内容

     * @param path 节点路径

     * @param data 数据内容

     * @return

     */

    public boolean writeData(String path, String data) {

        try {

            System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path" + path + ", stat: " +

                    this.zk.setData(path, data.getBytes(), -1));

        } catch (Exception e) {

            e.printStackTrace();

        }

        return false;

    }

 

    /**

     * 删除指定节点

     *

     * @param path

     *            节点path

     */

    public void deleteNode(String path) {

        try {

            this.zk.delete(path, -1);

            System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path" + path);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

 

    /**

     * 判断指定节点是否存在

     * @param path 节点路径

     */

    public Stat exists(String path, boolean needWatch) {

        try {

            return this.zk.exists(path, needWatch);

        } catch (Exception e) {

            e.printStackTrace();

            return null;

        }

    }

 

    /**

     * 获取子节点

     * @param path 节点路径

     */

    private List<String> getChildren(String path, boolean needWatch) {

        try {

            return this.zk.getChildren(path, needWatch);

        } catch (Exception e) {

            e.printStackTrace();

            return null;

        }

    }

 

    /**

     * 删除所有节点

     */

    public void deleteAllTestPath() {

        if(this.exists(CHILDREN_PATH, false) != null){

            this.deleteNode(CHILDREN_PATH);

        }

        if(this.exists(PARENT_PATH, false) != null){

            this.deleteNode(PARENT_PATH);

        }

    }

 

    /**

     * 收到来自ServerWatcher通知后的处理。

     */

    @Override

    public void process(WatchedEvent event) {

 

        System.out.println("进入 process 。。。。。event = " + event);

 

        try {

            Thread.sleep(200);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

 

        if (event == null) {

            return;

        }

 

        // 连接状态

        Watcher.Event.KeeperState keeperState = event.getState();

        // 事件类型

        Watcher.Event.EventType eventType = event.getType();

        // 受影响的path

        String path = event.getPath();

 

        String logPrefix = "Watcher-" + this.seq.incrementAndGet() + "";

 

        System.out.println(logPrefix + "收到Watcher通知");

        System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());

        System.out.println(logPrefix + "事件类型:\t" + eventType.toString());

 

        if (Event.KeeperState.SyncConnected == keeperState) {

            // 成功连接上ZK服务器

            if (Event.EventType.None == eventType) {

                System.out.println(logPrefix + "成功连接上ZK服务器");

                connectedSemaphore.countDown();

            }

            //创建节点

            else if (Event.EventType.NodeCreated == eventType) {

                System.out.println(logPrefix + "节点创建");

                try {

                    Thread.sleep(100);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

                this.exists(path, true);

            }

            //更新节点

            else if (Event.EventType.NodeDataChanged == eventType) {

                System.out.println(logPrefix + "节点数据更新");

                try {

                    Thread.sleep(100);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

                System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));

            }

            //更新子节点

            else if (Event.EventType.NodeChildrenChanged == eventType) {

                System.out.println(logPrefix + "子节点变更");

                try {

                    Thread.sleep(3000);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

                System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));

            }

            //删除节点

            else if (Event.EventType.NodeDeleted == eventType) {

                System.out.println(logPrefix + "节点 " + path + " 被删除");

            }

        }

        else if (Watcher.Event.KeeperState.Disconnected == keeperState) {

            System.out.println(logPrefix + "ZK服务器断开连接");

        }

        else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {

            System.out.println(logPrefix + "权限检查失败");

        }

        else if (Watcher.Event.KeeperState.Expired == keeperState) {

            System.out.println(logPrefix + "会话失效");

        }

 

        System.out.println("--------------------------------------------");

 

    }

 

 

    public static void main(String[] args) throws Exception {

 

        //建立watcher

        ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();

        //创建连接

        zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);

        //System.out.println(zkWatch.zk.toString());

 

        Thread.sleep(1000);

 

        // 清理节点

        zkWatch.deleteAllTestPath();

 

        if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {

 

 

            // 读取数据,在操作节点数据之前先调用zookeepergetData()方法是为了可以watch到对节点的操作。watch是一次性的,

            // 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。

             System.out.println("---------------------- read parent ----------------------------");

            zkWatch.readData(PARENT_PATH, true);

            // 更新数据

           zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");

 

 

 

            /** 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged

             也就是说创建子节点时没有watch

             如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1watch,输出c1NodeChildrenChanged

             而不会输出创建c2时的NodeChildrenChanged,如果watchc2NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,

             其中path="/p/c1"

             */

            System.out.println("---------------------- read children path ----------------------------");

            zkWatch.getChildren(PARENT_PATH, true);

 

 

           Thread.sleep(1000);

 

            // 创建子节点,同理如果想要watchNodeChildrenChanged状态,需要调用getChildren(CHILDREN_PATH, true)

            zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");

 

            Thread.sleep(1000);

 

            zkWatch.readData(CHILDREN_PATH, true);

            zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");

        }

 

        Thread.sleep(50000);

        // 清理节点

        zkWatch.deleteAllTestPath();

        Thread.sleep(1000);

        zkWatch.releaseConnection();

    }

 

}

 

 

1.1.1.3. ZK认证机制

package cn.enjoy.javaapi;

 

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Stat;

 

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.atomic.AtomicInteger;

 

public class TestZookeeperAuth implements Watcher {

 

    /** 连接地址 */

    final static String CONNECT_ADDR = "192.168.30.10:2181";

    /** 测试路径 */

    final static String PATH = "/testAuth";

    final static String PATH_DEL = "/testAuth/delNode";

    /** 认证类型 */

    final static String authentication_type = "digest";

    /** 认证正确方法 */

    final static String correctAuthentication = "123456";

    /** 认证错误方法 */

    final static String badAuthentication = "654321";

 

    static ZooKeeper zk = null;

    /** 计时器 */

    AtomicInteger seq = new AtomicInteger();

    /** 标识 */

    private static final String LOG_PREFIX_OF_MAIN = "Main";

 

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

 

    @Override

    public void process(WatchedEvent event) {

        try {

            Thread.sleep(200);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        if (event==null) {

            return;

        }

        // 连接状态

        Event.KeeperState keeperState = event.getState();

        // 事件类型

        Event.EventType eventType = event.getType();

        // 受影响的path

        String path = event.getPath();

 

        String logPrefix = "Watcher-" + this.seq.incrementAndGet() + "";

 

        System.out.println(logPrefix + "收到Watcher通知");

        System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());

        System.out.println(logPrefix + "事件类型:\t" + eventType.toString());

        if (Event.KeeperState.SyncConnected == keeperState) {

            // 成功连接上ZK服务器

            if (Event.EventType.None == eventType) {

                System.out.println(logPrefix + "成功连接上ZK服务器");

                connectedSemaphore.countDown();

            }

        } else if (Event.KeeperState.Disconnected == keeperState) {

            System.out.println(logPrefix + "ZK服务器断开连接");

        } else if (Event.KeeperState.AuthFailed == keeperState) {

            System.out.println(logPrefix + "权限检查失败");

        } else if (Event.KeeperState.Expired == keeperState) {

            System.out.println(logPrefix + "会话失效");

        }

        System.out.println("--------------------------------------------");

    }

    /**

     * 创建ZK连接

     *

     * @param connectString

     *            ZK服务器地址列表

     * @param sessionTimeout

     *            Session超时时间

     */

    public void createConnection(String connectString, int sessionTimeout) {

        this.releaseConnection();

        try {

            zk = new ZooKeeper(connectString, sessionTimeout, this);

            //添加节点授权

            zk.addAuthInfo(authentication_type,correctAuthentication.getBytes());

            System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");

            //倒数等待

            connectedSemaphore.await();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

 

    /**

     * 关闭ZK连接

     */

    public void releaseConnection() {

        if (this.zk!=null) {

            try {

                this.zk.close();

            } catch (InterruptedException e) {

            }

        }

    }

 

    /**

     *

     * @param args

     * @throws Exception

     */

    public static void main(String[] args) throws Exception {

 

        TestZookeeperAuth testAuth = new TestZookeeperAuth();

        testAuth.createConnection(CONNECT_ADDR,2000);

 

        List<ACL> acls = new ArrayList<ACL>(1);

        for (ACL ids_acl : ZooDefs.Ids.CREATOR_ALL_ACL) {

            acls.add(ids_acl);

        }

 

        try {

            zk.create(PATH, "init content".getBytes(), acls, CreateMode.PERSISTENT);

            System.out.println("使用授权key" + correctAuthentication + "创建节点:"+ PATH + ", 初始内容是: init content");

        } catch (Exception e) {

            e.printStackTrace();

        }

        try {

            zk.create(PATH_DEL, "will be deleted! ".getBytes(), acls, CreateMode.PERSISTENT);

            System.out.println("使用授权key" + correctAuthentication + "创建节点:"+ PATH_DEL + ", 初始内容是: init content");

        } catch (Exception e) {

            e.printStackTrace();

        }

 

        // 获取数据

        getDataByNoAuthentication();

        getDataByBadAuthentication();

        getDataByCorrectAuthentication();

 

        // 更新数据

        updateDataByNoAuthentication();

        updateDataByBadAuthentication();

        updateDataByCorrectAuthentication();

 

        // 删除数据

        deleteNodeByBadAuthentication();

        deleteNodeByNoAuthentication();

        deleteNodeByCorrectAuthentication();

        //

        Thread.sleep(1000);

 

        deleteParent();

        //释放连接

        testAuth.releaseConnection();

    }

    /** 获取数据:采用错误的密码 */

    static void getDataByBadAuthentication() {

        String prefix = "[使用错误的授权信息]";

        try {

            ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);

            //授权

            badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());

            Thread.sleep(2000);

            System.out.println(prefix + "获取数据:" + PATH);

            System.out.println(prefix + "成功获取数据:" + badzk.getData(PATH, false, null));

        } catch (Exception e) {

            System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());

        }

    }

 

    /** 获取数据:不采用密码 */

    static void getDataByNoAuthentication() {

        String prefix = "[不使用任何授权信息]";

        try {

            System.out.println(prefix + "获取数据:" + PATH);

            ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);

            Thread.sleep(2000);

            System.out.println(prefix + "成功获取数据:" + nozk.getData(PATH, false, null));

        } catch (Exception e) {

            System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());

        }

    }

 

    /** 采用正确的密码 */

    static void getDataByCorrectAuthentication() {

        String prefix = "[使用正确的授权信息]";

        try {

            System.out.println(prefix + "获取数据:" + PATH);

 

            System.out.println(prefix + "成功获取数据:" + zk.getData(PATH, false, null));

        } catch (Exception e) {

            System.out.println(prefix + "获取数据失败,原因:" + e.getMessage());

        }

    }

 

    /**

     * 更新数据:不采用密码

     */

    static void updateDataByNoAuthentication() {

 

        String prefix = "[不使用任何授权信息]";

 

        System.out.println(prefix + "更新数据: " + PATH);

        try {

            ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);

            Thread.sleep(2000);

            Stat stat = nozk.exists(PATH, false);

            if (stat!=null) {

                nozk.setData(PATH, prefix.getBytes(), -1);

                System.out.println(prefix + "更新成功");

            }

        } catch (Exception e) {

            System.err.println(prefix + "更新失败,原因是:" + e.getMessage());

        }

    }

 

    /**

     * 更新数据:采用错误的密码

     */

    static void updateDataByBadAuthentication() {

 

        String prefix = "[使用错误的授权信息]";

 

        System.out.println(prefix + "更新数据:" + PATH);

        try {

            ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);

            //授权

            badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());

            Thread.sleep(2000);

            Stat stat = badzk.exists(PATH, false);

            if (stat!=null) {

                badzk.setData(PATH, prefix.getBytes(), -1);

                System.out.println(prefix + "更新成功");

            }

        } catch (Exception e) {

            System.err.println(prefix + "更新失败,原因是:" + e.getMessage());

        }

    }

 

    /**

     * 更新数据:采用正确的密码

     */

    static void updateDataByCorrectAuthentication() {

 

        String prefix = "[使用正确的授权信息]";

 

        System.out.println(prefix + "更新数据:" + PATH);

        try {

            Stat stat = zk.exists(PATH, false);

            if (stat!=null) {

                zk.setData(PATH, prefix.getBytes(), -1);

                System.out.println(prefix + "更新成功");

            }

        } catch (Exception e) {

            System.err.println(prefix + "更新失败,原因是:" + e.getMessage());

        }

    }

 

    /**

     * 不使用密码 删除节点

     */

    static void deleteNodeByNoAuthentication() throws Exception {

 

        String prefix = "[不使用任何授权信息]";

 

        try {

            System.out.println(prefix + "删除节点:" + PATH_DEL);

            ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);

            Thread.sleep(2000);

            Stat stat = nozk.exists(PATH_DEL, false);

            if (stat!=null) {

                nozk.delete(PATH_DEL,-1);

                System.out.println(prefix + "删除成功");

            }

        } catch (Exception e) {

            System.err.println(prefix + "删除失败,原因是:" + e.getMessage());

        }

    }

 

    /**

     * 采用错误的密码删除节点

     */

    static void deleteNodeByBadAuthentication() throws Exception {

 

        String prefix = "[使用错误的授权信息]";

 

        try {

            System.out.println(prefix + "删除节点:" + PATH_DEL);

            ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);

            //授权

            badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());

            Thread.sleep(2000);

            Stat stat = badzk.exists(PATH_DEL, false);

            if (stat!=null) {

                badzk.delete(PATH_DEL, -1);

                System.out.println(prefix + "删除成功");

            }

        } catch (Exception e) {

            System.err.println(prefix + "删除失败,原因是:" + e.getMessage());

        }

    }

 

    /**

     * 使用正确的密码删除节点

     */

    static void deleteNodeByCorrectAuthentication() throws Exception {

 

        String prefix = "[使用正确的授权信息]";

 

        try {

            System.out.println(prefix + "删除节点:" + PATH_DEL);

            Stat stat = zk.exists(PATH_DEL, false);

            if (stat!=null) {

                zk.delete(PATH_DEL, -1);

                System.out.println(prefix + "删除成功");

            }

        } catch (Exception e) {

            System.out.println(prefix + "删除失败,原因是:" + e.getMessage());

        }

    }

 

    /**

     * 使用正确的密码删除节点

     */

    static void deleteParent() throws Exception {

        try {

            Stat stat = zk.exists(PATH_DEL, false);

            if (stat == null) {

                zk.delete(PATH, -1);

            }

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

 

}

 

 

1.1.1. ZkClient

1.1.1.1. 基本操作

 package cn.enjoy.zkclient;

 

import org.I0Itec.zkclient.ZkClient;

import org.I0Itec.zkclient.ZkConnection;

 

import java.util.List;

 

/**

 * Created by VULCAN on 2018/11/7.

 */

public class ZkClientOperator {

 

    /** zookeeper地址 */

    static final String CONNECT_ADDR = "192.168.30.10:2181";

    /** session超时时间 */

    static final int SESSION_OUTTIME = 10000;//ms

 

 

    public static void main(String[] args) throws Exception {

       // ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

        ZkClient zkc = new ZkClient(CONNECT_ADDR, SESSION_OUTTIME);

 

        //1. create and delete方法

        zkc.createEphemeral("/temp");

        zkc.createPersistent("/super/c1", true);

        Thread.sleep(10000);

        zkc.delete("/temp");

        zkc.deleteRecursive("/super");

 

        //2. 设置pathdata 并且读取子节点和每个节点的内容

        zkc.createPersistent("/super", "1234");

        zkc.createPersistent("/super/c1", "c1内容");

        zkc.createPersistent("/super/c2", "c2内容");

        List<String> list = zkc.getChildren("/super");

        for(String p : list){

            System.out.println(p);

            String rp = "/super/" + p;

            String data = zkc.readData(rp);

            System.out.println("节点为:" + rp + ",内容为: " + data);

        }

 

        //3. 更新和判断节点是否存在

        zkc.writeData("/super/c1", "新内容");

        System.out.println(zkc.readData("/super/c1").toString());

        System.out.println(zkc.exists("/super/c1"));

 

// 4.递归删除/super内容

        zkc.deleteRecursive("/super");

    }

}

 

1.1.1.2. 监听机制

package cn.enjoy.zkclient;

 

import org.I0Itec.zkclient.IZkChildListener;

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

import org.I0Itec.zkclient.ZkConnection;

import org.junit.Test;

 

import java.util.List;

 

public class TestZkClientWatcher {

 

 

    /** zookeeper地址 */

    static final String CONNECT_ADDR = "192.168.30.10:2181";

    /** session超时时间 */

    static final int SESSION_OUTTIME = 10000;//ms

 

 

    @Test

    /**

     * subscribeChildChanges方法 订阅子节点变化

     */

    public  void testZkClientWatcher1() throws Exception {

        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

 

        //对父节点添加监听子节点变化。

        zkc.subscribeChildChanges("/super", new IZkChildListener() {

            @Override

            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {

                System.out.println("parentPath: " + parentPath);

                System.out.println("currentChilds: " + currentChilds);

            }

        });

 

        Thread.sleep(3000);

 

        zkc.createPersistent("/super");

        Thread.sleep(1000);

 

        zkc.createPersistent("/super" + "/" + "c1", "c1内容");

        Thread.sleep(1000);

 

        zkc.createPersistent("/super" + "/" + "c2", "c2内容");

        Thread.sleep(1000);

 

        zkc.delete("/super/c2");

        Thread.sleep(1000);

 

        zkc.deleteRecursive("/super");

        Thread.sleep(Integer.MAX_VALUE);

 

    }

 

    @Test

    /**

     * subscribeDataChanges 订阅内容变化

     */

    public void testZkClientWatcher2() throws Exception {

        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

 

        zkc.createPersistent("/super", "1234");

 

        //对父节点添加监听子节点变化。

        zkc.subscribeDataChanges("/super", new IZkDataListener() {

            @Override

            public void handleDataDeleted(String path) throws Exception {

                System.out.println("删除的节点为:" + path);

            }

 

            @Override

            public void handleDataChange(String path, Object data) throws Exception {

                System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);

            }

        });

 

        Thread.sleep(3000);

        zkc.writeData("/super", "456", -1);

        Thread.sleep(1000);

 

        zkc.delete("/super");

        Thread.sleep(Integer.MAX_VALUE);

 

    }

 

}

 

1.1.2. Curator

1.1.2.1.  基本操作

package cn.enjoy.curator;

 

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.api.BackgroundCallback;

import org.apache.curator.framework.api.CuratorEvent;

import org.apache.curator.framework.api.CuratorListener;

import org.apache.curator.framework.api.transaction.CuratorOp;

import org.apache.curator.framework.api.transaction.CuratorTransactionResult;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.data.Stat;

import org.junit.Before;

import org.junit.Test;

 

 

import java.util.List;

 

import static com.sun.xml.internal.ws.dump.LoggingDumpTube.Position.Before;

 

/**

 *  测试Apache Curator框架的基本用法

 */

public class OperatorTest {

    //ZooKeeper服务地址

    private static final String SERVER = "192.168.30.10:2181";

 

    //会话超时时间

    private final int SESSION_TIMEOUT = 30000;

 

    //连接超时时间

    private final int CONNECTION_TIMEOUT = 5000;

 

    //创建连接实例

    private CuratorFramework client = null;

 

    /**

     * baseSleepTimeMs:初始的重试等待时间

     * maxRetries:最多重试次数

     *

     *

     * ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增

     * RetryNTimes:重试N

     * RetryOneTime:重试一次

     * RetryUntilElapsed:重试一定时间

     */

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

 

@org.junit.Before

    public void init(){

        //创建 CuratorFrameworkImpl实例

        client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);

 

        //启动

        client.start();

    }

 

    /**

     * 测试创建节点

     * @throws Exception

     */

    @Test

    public void testCreate() throws Exception{

        //创建永久节点

        client.create().forPath("/curator","/curator data".getBytes());

 

        //创建永久有序节点

        client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential","/curator_sequential data".getBytes());

 

        //创建临时节点

        client.create().withMode(CreateMode.EPHEMERAL)

                .forPath("/curator/ephemeral","/curator/ephemeral data".getBytes());

 

        //创建临时有序节点

        client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

                .forPath("/curator/ephemeral_path1","/curator/ephemeral_path1 data".getBytes());

 

    }

 

 

 

    /**

     * 测试检查某个节点是否存在

     * @throws Exception

     */

    @Test

    public void testCheck() throws Exception{

        Stat stat1 = client.checkExists().forPath("/curator");

        Stat stat2 = client.checkExists().forPath("/curator2");

 

        System.out.println("‘/curator‘是否存在: " + (stat1 != null ? true : false));

        System.out.println("‘/curator2‘是否存在: " + (stat2 != null ? true : false));

    }

 

 

    /**

     * 测试异步设置节点数据

     * @throws Exception

     */

    @Test

    public void testSetDataAsync() throws Exception{

        //创建监听器

        CuratorListener listener = new CuratorListener() {

 

            @Override

            public void eventReceived(CuratorFramework client, CuratorEvent event)

                    throws Exception {

                System.out.println(event.getPath());

            }

        };

 

        //添加监听器

        client.getCuratorListenable().addListener(listener);

 

        //异步设置某个节点数据

        client.setData().inBackground().forPath("/curator","sync".getBytes());

 

        //为了防止单元测试结束从而看不到异步执行结果,因此暂停10

        Thread.sleep(10000);

    }

 

 

 

    /**

     * 测试另一种异步执行获取通知的方式

     * @throws Exception

     */

    @Test

    public void testSetDataAsyncWithCallback() throws Exception{

        BackgroundCallback callback = new BackgroundCallback() {

 

            @Override

            public void processResult(CuratorFramework client, CuratorEvent event)

                    throws Exception {

                System.out.println(event.getPath());

            }

        };

 

        //异步设置某个节点数据

        client.setData().inBackground(callback).forPath("/curator","/curator modified data with Callback".getBytes());

 

        //为了防止单元测试结束从而看不到异步执行结果,因此暂停10

        Thread.sleep(10000);

    }

 

 

    /**

     * 测试删除节点

     * @throws Exception

     */

    @Test

    public void testDelete() throws Exception{

        //创建测试节点

        client.create().orSetData().creatingParentsIfNeeded()

                .forPath("/curator/del_key1","/curator/del_key1 data".getBytes());

 

        client.create().orSetData().creatingParentsIfNeeded()

                .forPath("/curator/del_key2","/curator/del_key2 data".getBytes());

 

        client.create().forPath("/curator/del_key2/test_key","test_key data".getBytes());

 

 

        //删除该节点

        client.delete().forPath("/curator/del_key1");

 

        //级联删除子节点

        client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2");

    }

 

 

    /*

     * 测试事务管理:碰到异常,事务会回滚

     * @throws Exception

     */

    @Test

    public void testTransaction() throws Exception{

        //定义几个基本操作

        CuratorOp createOp = client.transactionOp().create()

                .forPath("/curator/one_path","some data".getBytes());

 

        CuratorOp setDataOp = client.transactionOp().setData()

                .forPath("/curator","other data".getBytes());

 

        CuratorOp deleteOp = client.transactionOp().delete()

                .forPath("/curator");

 

        //事务执行结果

        List<CuratorTransactionResult> results = client.transaction()

                .forOperations(createOp,setDataOp,deleteOp);

 

        //遍历输出结果

        for(CuratorTransactionResult result : results){

            System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());

        }

    }

 

}

 

 

 

1.1.2.2. 监听机制

package cn.enjoy.curator;

 

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.api.CuratorEvent;

import org.apache.curator.framework.api.CuratorListener;

import org.apache.curator.framework.recipes.cache.*;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.junit.Test;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

 

public class EventTest {

 

    //ZooKeeper服务地址

    private static final String SERVER = "192.168.30.10:2181";

 

    //会话超时时间

    private final int SESSION_TIMEOUT = 30000;

 

    //连接超时时间

    private final int CONNECTION_TIMEOUT = 5000;

 

    //创建连接实例

    private CuratorFramework client = null;

 

    /**

     * baseSleepTimeMs:初始的重试等待时间

     * maxRetries:最多重试次数

     *

     *

     * ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增

     * RetryNTimes:重试N

     * RetryOneTime:重试一次

     * RetryUntilElapsed:重试一定时间

     */

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

 

    @org.junit.Before

    public void init(){

        //创建 CuratorFrameworkImpl实例

        client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);

 

        //启动

        client.start();

    }

 

    /**

     *

     * @描述:第一种监听器的添加方式: 对指定的节点进行添加操作

     * 仅仅能监控指定的本节点的数据修改,删除 操作 并且只能监听一次 --->不好

     */

 

    @Test

    public  void TestListenterOne() throws Exception{

        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

 

        // 注册观察者,当节点变动时触发

        byte[] data = client.getData().usingWatcher(new Watcher() {

            @Override

            public void process(WatchedEvent event) {

                System.out.println("获取 test 节点 监听器 : " + event);

            }

        }).forPath("/test");

 

        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

        Thread.sleep(1000);

        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

        Thread.sleep(1000);

        System.out.println("节点数据: "+ new String(data));

        Thread.sleep(10000);

    }

 

 

    /**

     *

     * @描述:第二种监听器的添加方式: Cache 的三种实现

     *   Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。

     *                  产生的事件会传递给注册的PathChildrenCacheListener

     *  Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。

     *  Tree CachePath CacheNode Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

     */

 

    //1.path Cache  连接  路径  是否获取数据

    //能监听所有的字节点 且是无限监听的模式 但是 指定目录下节点的子节点不再监听

    @Test

    public void setListenterTwoOne() throws Exception{

        ExecutorService pool = Executors.newCachedThreadPool();

        PathChildrenCache childrenCache = new PathChildrenCache(client, "/test", true);

        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {

            @Override

            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

                System.out.println("开始进行事件分析:-----");

                ChildData data = event.getData();

                switch (event.getType()) {

                    case CHILD_ADDED:

                        System.out.println("CHILD_ADDED : "+ data.getPath() +"  数据:"+ data.getData());

                        break;

                    case CHILD_REMOVED:

                        System.out.println("CHILD_REMOVED : "+ data.getPath() +"  数据:"+ data.getData());

                        break;

                    case CHILD_UPDATED:

                        System.out.println("CHILD_UPDATED : "+ data.getPath() +"  数据:"+ data.getData());

                        break;

                    case INITIALIZED:

                        System.out.println("INITIALIZED : "+ data.getPath() +"  数据:"+ data.getData());

                        break;

                    default:

                        break;

                }

            }

        };

        childrenCache.getListenable().addListener(childrenCacheListener);

        System.out.println("Register zk watcher successfully!");

        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

 

        //创建一个节点

        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

 

        client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());

        Thread.sleep(1000);

        client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","deer".getBytes());

        Thread.sleep(1000);

        client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","demo".getBytes());

        Thread.sleep(1000);

        client.delete().forPath("/test/node02");

        Thread.sleep(10000);

    }

 

    //2.Node Cache  监控本节点的变化情况   连接 目录 是否压缩

    //监听本节点的变化  节点可以进行修改操作  删除节点后会再次创建(空节点)

    @Test

    public void setListenterTwoTwo() throws Exception{

        ExecutorService pool = Executors.newCachedThreadPool();

        //设置节点的cache

        final NodeCache nodeCache = new NodeCache(client, "/test", false);

        nodeCache.getListenable().addListener(new NodeCacheListener() {

            @Override

            public void nodeChanged() throws Exception {

                System.out.println("the test node is change and result is :");

                System.out.println("path : "+nodeCache.getCurrentData().getPath());

                System.out.println("data : "+new String(nodeCache.getCurrentData().getData()));

                System.out.println("stat : "+nodeCache.getCurrentData().getStat());

            }

        });

        nodeCache.start();

 

        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

        Thread.sleep(1000);

        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","enjoy".getBytes());

        Thread.sleep(10000);

    }

    //3.Tree Cache

    // 监控 指定节点和节点下的所有的节点的变化--无限监听  可以进行本节点的删除(不在创建)

    @Test

    public void TestListenterTwoThree() throws Exception{

        ExecutorService pool = Executors.newCachedThreadPool();

        //设置节点的cache

        TreeCache treeCache = new TreeCache(client, "/test");

        //设置监听器和处理过程

        treeCache.getListenable().addListener(new TreeCacheListener() {

            @Override

            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {

                ChildData data = event.getData();

                if(data !=null){

                    switch (event.getType()) {

                        case NODE_ADDED:

                            System.out.println("NODE_ADDED : "+ data.getPath() +"  数据:"+ new String(data.getData()));

                            break;

                        case NODE_REMOVED:

                            System.out.println("NODE_REMOVED : "+ data.getPath() +"  数据:"+ new String(data.getData()));

                            break;

                        case NODE_UPDATED:

                            System.out.println("NODE_UPDATED : "+ data.getPath() +"  数据:"+ new String(data.getData()));

                            break;

 

                        default:

                            break;

                    }

                }else{

                    System.out.println( "data is null : "+ event.getType());

                }

            }

        });

        //开始监听

        treeCache.start();

 

        //创建一个节点

        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

 

        Thread.sleep(1000);

        client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());

        Thread.sleep(1000);

        client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","deer".getBytes());

 

        Thread.sleep(1000);

        client.create().orSetData().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test/node02/node02_2","deer".getBytes());

 

        Thread.sleep(10000);

 

    }

 

}

Zookeeper原生客户端

标签:his   round   ram   tde   列表   标识   无限   结果   获取   

原文地址:https://www.cnblogs.com/Soy-technology/p/11391701.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有 京ICP备13008772号-2
迷上了代码!