码迷,mamicode.com
首页 > 其他好文 > 详细

Zookeeper原生客户端

时间:2019-08-21 23:19:23      阅读:79      评论:0      收藏:0      [点我收藏+]

标签:his   round   ram   tde   列表   标识   无限   结果   获取   

1.1.1.1. 客户端基本操作

package cn.enjoy.javaapi;

 

import org.apache.zookeeper.*;

 

import java.io.IOException;

import java.util.concurrent.CountDownLatch;

 

public class TestJavaApi implements Watcher {

 

    private static final int SESSION_TIMEOUT = 10000;

    private static final String CONNECTION_STRING = "192.168.30.10:2181";

    private static final String ZK_PATH = "/leader";

    private ZooKeeper zk = null;

 

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

 

    /**

     * 创建ZK连接

     *

     * @param connectString  ZK服务器地址列表

     * @param sessionTimeout Session超时时间

     */

    public void createConnection(String connectString, int sessionTimeout) {

        this.releaseConnection();

        try {

            zk = new ZooKeeper(connectString, sessionTimeout, this);

            connectedSemaphore.await();

        } catch (InterruptedException e) {

            System.out.println("连接创建失败,发生 InterruptedException");

            e.printStackTrace();

        } catch (IOException e) {

            System.out.println("连接创建失败,发生 IOException");

            e.printStackTrace();

        }

    }

 

    /**

     * 关闭ZK连接

     */

    public void releaseConnection() {

        if (null != this.zk) {

            try {

                this.zk.close();

            } catch (InterruptedException e) {

                // ignore

                e.printStackTrace();

            }

        }

    }

 

    /**

     * 创建节点

     *

     * @param path 节点path

     * @param data 初始数据内容

     * @return

     */

    public boolean createPath(String path, String data) {

        try {

            System.out.println("节点创建成功, Path: "

                    + this.zk.create(path, // 节点路径

                    data.getBytes(), // 节点内容

                    ZooDefs.Ids.OPEN_ACL_UNSAFE, //节点权限

                    CreateMode.EPHEMERAL) //节点类型

                    + ", content: " + data);

        } catch (KeeperException e) {

            System.out.println("节点创建失败,发生KeeperException");

            e.printStackTrace();

        } catch (InterruptedException e) {

            System.out.println("节点创建失败,发生 InterruptedException");

            e.printStackTrace();

        }

        return true;

    }

 

    /**

     * 读取指定节点数据内容

     *

     * @param path 节点path

     * @return

     */

    public String readData(String path) {

        try {

            System.out.println("获取数据成功,path" + path);

            return new String(this.zk.getData(path, false, null));

        } catch (KeeperException e) {

            System.out.println("读取数据失败,发生KeeperExceptionpath: " + path);

            e.printStackTrace();

            return "";

        } catch (InterruptedException e) {

            System.out.println("读取数据失败,发生 InterruptedExceptionpath: " + path);

            e.printStackTrace();

            return "";

        }

    }

 

    /**

     * 更新指定节点数据内容

     *

     * @param path 节点path

     * @param data 数据内容

     * @return

     */

    public boolean writeData(String path, String data) {

        try {

            System.out.println("更新数据成功,path" + path + ", stat: " +

                    this.zk.setData(path, data.getBytes(), -1));

        } catch (KeeperException e) {

            System.out.println("更新数据失败,发生KeeperExceptionpath: " + path);

            e.printStackTrace();

        } catch (InterruptedException e) {

            System.out.println("更新数据失败,发生 InterruptedExceptionpath: " + path);

            e.printStackTrace();

        }

        return false;

    }

 

    /**

     * 删除指定节点

     *

     * @param path 节点path

     */

    public void deleteNode(String path) {

        try {

            this.zk.delete(path, -1);

            System.out.println("删除节点成功,path" + path);

        } catch (KeeperException e) {

            System.out.println("删除节点失败,发生KeeperExceptionpath: " + path);

            e.printStackTrace();

        } catch (InterruptedException e) {

            System.out.println("删除节点失败,发生 InterruptedExceptionpath: " + path);

            e.printStackTrace();

        }

    }

 

    public static void main(String[] args) {

 

        TestJavaApi sample = new TestJavaApi();

        sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);

        if (sample.createPath(ZK_PATH, "我是节点初始内容")) {

            System.out.println();

            System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");

            sample.writeData(ZK_PATH, "更新后的数据");

            System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");

            sample.deleteNode(ZK_PATH);

        }

 

        sample.releaseConnection();

    }

 

    /**

     * 收到来自ServerWatcher通知后的处理。

     */

    @Override

    public void process(WatchedEvent event) {

        System.out.println("收到事件通知:" + event.getState() + "\n");

        if (Event.KeeperState.SyncConnected == event.getState()) {

            connectedSemaphore.countDown();

        }

 

    }

 

}

 

1.1.1.2. Watch机制

package cn.enjoy.javaapi;

 

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

 

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.atomic.AtomicInteger;

 

public class ZooKeeperWatcher implements Watcher  {

 

    /** 定义原子变量 */

    AtomicInteger seq = new AtomicInteger();

    /** 定义session失效时间 */

    private static final int SESSION_TIMEOUT = 10000;

    /** zookeeper服务器地址 */

    private static final String CONNECTION_ADDR = "192.168.30.10:2181";

    /** zk父路径设置 */

    private static final String PARENT_PATH = "/testWatch";

    /** zk子路径设置 */

    private static final String CHILDREN_PATH = "/testWatch/children";

    /** 进入标识 */

    private static final String LOG_PREFIX_OF_MAIN = "Main";

    /** zk变量 */

    private ZooKeeper zk = null;

    /** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

 

    /**

     * 创建ZK连接

     * @param connectAddr ZK服务器地址列表

     * @param sessionTimeout Session超时时间