标签:
1、jar包引入,演示版本为3.4.6,非maven项目,可以下载jar包导入到项目中
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
2、创建zookeeper连接
ZooKeeper(java.lang.String connectString, int sessionTimeout, org.apache.zookeeper.Watcher watcher)
ZooKeeper zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new MyWatcher());
示例代码:
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; /** * * zookeeper连接 */ public class CreateSession implements Watcher{ private static ZooKeeper zooKeeper; @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { doBus(); } System.out.println("接收内容:"+watchedEvent.toString()); } private void doBus() { System.out.println("做业务!"); } public static void main(String[] args) { try { zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new CreateSession()); System.out.println(zooKeeper.getState()); Thread.sleep(5000); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
ps:此处没有新创建新的java类实现Watcher,而是直接在本类中实现Watcher接口并重写process方法
3、同步创建
create(java.lang.String path, byte[] data, java.util.List<org.apache.zookeeper.data.ACL> acl, org.apache.zookeeper.CreateMode createMode)
ACL aclIp = new ACL(ZooDefs.Perms.READ,new Id("ip","127.0.0.1")); ACL aclDigest = new ACL(ZooDefs.Perms.READ| ZooDefs.Perms.WRITE, new Id("digest", DigestAuthenticationProvider.generateDigest("id:pass")));
zk.addAuthInfo("digest", "id:pass".getBytes());
示例代码:
import org.apache.zookeeper.*; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import java.io.IOException; import java.security.NoSuchAlgorithmException; /** * 创建节点(同步) * Created by scot on 2016/6/8. */ public class CreateSessionSync implements Watcher{ private static ZooKeeper zooKeeper; @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { doBus(); } System.out.println("接收内容:"+watchedEvent.toString()); } private void doBus() { try { if(null != zooKeeper.exists("/note_scot/note_scot_a",false)) { System.out.println("/note_scot/note_scot_a 节点已存在"); return; } String path = zooKeeper.create("/note_scot/note_scot_a","aa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); /*权限相关 try { ACL aclIp = new ACL(ZooDefs.Perms.READ,new Id("ip","127.0.0.1")); ACL aclDigest = new ACL(ZooDefs.Perms.READ| ZooDefs.Perms.WRITE, new Id("digest", DigestAuthenticationProvider.generateDigest("id:pass"))); zooKeeper.addAuthInfo("digest", "id:pass".getBytes()); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); }*/ System.out.println("zookeeper return:" + path); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { try { zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new CreateSessionSync()); System.out.println(zooKeeper.getState()); Thread.sleep(5000); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
4、异步创建
create(java.lang.String path, byte[] data, java.util.List<org.apache.zookeeper.data.ACL> acl,
org.apache.zookeeper.CreateMode createMode, org.apache.zookeeper.AsyncCallback.StringCallback cb, java.lang.Object ctx)
示例代码:
import org.apache.zookeeper.*; import java.io.IOException; /** * 创建节点(异步) * Created by scot on 2016/6/8. */ public class CreateSessionASync implements Watcher{ private static ZooKeeper zooKeeper; @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { doBus(); } System.out.println("接收内容:"+watchedEvent.toString()); } private void doBus() { zooKeeper.create("/note_scot/note_scot_b","aa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL,new IStringCallBack(),"testAsync"); } public static void main(String[] args) { try { zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new CreateSessionASync()); System.out.println(zooKeeper.getState()); Thread.sleep(5000); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } static class IStringCallBack implements AsyncCallback.StringCallback { @Override public void processResult(int i, String s, Object o, String s2) { System.out.println("i="+i);//创建成功返回0 System.out.println("s="+s);//自定义节点名称 System.out.println("o="+o);//自定义回调数据 System.out.println("s2="+s2);//最终节点名称(顺序节点最终名称与自定义名称不同) } } }
标签:
原文地址:http://www.cnblogs.com/shengkejava/p/5611671.html