码迷,mamicode.com
首页 > 编程语言 > 详细

六:ZooKeeper的java客户端api的使用

时间:2016-02-25 19:43:14      阅读:348      评论:0      收藏:0      [点我收藏+]

标签:

一:客户端链接测试

技术分享
 1 package com.yeepay.sxf.createConnection;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.zookeeper.ZooKeeper;
 6 import org.apache.zookeeper.ZooKeeper.States;
 7 
 8 /**
 9  * 测试Zookeeper的链接
10  * @author sxf
11  *
12  */
13 public class TestCreateSession {
14 
15     //zooKeeper实例
16     private static ZooKeeper zooKeeper;
17     
18     public static void main(String[] args) throws IOException, InterruptedException {
19         
20         //实例化zooKeeper的实例
21         //参数:(ip地址:端口号  ,当前会话超时时间,自定义事件监听器)
22         zooKeeper=new ZooKeeper("10.151.30.75:2181",5000, new MyWatcher());
23         
24         //获取链接状态
25         States states=zooKeeper.getState();
26         
27         //此链接为异步链接
28         System.out.println("TestCreateSession.main(链接状态):"+states.toString());//CONNECTING
29         
30         Thread.sleep(Integer.MAX_VALUE);
31     }
32 }
33 
34 
35 
36 
37 
38 package com.yeepay.sxf.createConnection;
39 
40 import org.apache.zookeeper.WatchedEvent;
41 import org.apache.zookeeper.Watcher;
42 import org.apache.zookeeper.Watcher.Event.KeeperState;
43 /**
44  * zookeeper实例过程中的事件监听器
45  * @author sxf
46  *
47  */
48 public class MyWatcher implements Watcher{
49 
50 
51     
52     //该方法可以做相关的逻辑代码
53     @Override
54     public void process(WatchedEvent event) {
55         //MyWatcher.process(接收到的事件:)WatchedEvent state:SyncConnected type:None path:null
56         System.out.println("MyWatcher.process(接收到的事件:)"+event);
57         
58         //如果链接成功可以做一些事情
59         if(event.getState()==KeeperState.SyncConnected){
60             System.out.println("MyWatcher.process(链接成功做一些事情:)");
61         }
62         
63     }
64 
65     
66 }
View Code

二:客户端创建节点测试

技术分享
  1 package com.yeepay.sxf.createNode;
  2 
  3 import java.io.IOException;
  4 import java.security.NoSuchAlgorithmException;
  5 import java.util.ArrayList;
  6 import java.util.List;
  7 
  8 import org.apache.zookeeper.CreateMode;
  9 import org.apache.zookeeper.KeeperException;
 10 import org.apache.zookeeper.WatchedEvent;
 11 import org.apache.zookeeper.Watcher;
 12 import org.apache.zookeeper.ZooKeeper;
 13 import org.apache.zookeeper.Watcher.Event.KeeperState;
 14 import org.apache.zookeeper.ZooDefs.Ids;
 15 import org.apache.zookeeper.ZooDefs.Perms;
 16 import org.apache.zookeeper.data.ACL;
 17 import org.apache.zookeeper.data.Id;
 18 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 19 /**
 20  * zooKeeper同步创建节点测试
 21  * @author sxf
 22  *
 23  */
 24 public class TestCreateNodeSyn implements Watcher {
 25 
 26     private static ZooKeeper zooKeeper;
 27     
 28     public static void main(String[] args) throws IOException, InterruptedException, NoSuchAlgorithmException {
 29 //        //实例化zooKeeper链接
 30 //        zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestCreateNodeSyn());
 31 //        //创建链接
 32 //        Thread.sleep(Integer.MAX_VALUE);
 33         
 34         System.out.println(DigestAuthenticationProvider.generateDigest("shangxiaofei:shangxiaofei"));
 35     }
 36 
 37     /**
 38      *权限模式(scheme): ip,digest
 39      *授权对象(ID):
 40      *            ip权限模式:具体的ip地址
 41      *            digest权限模式:username:Base64(SHA-1(username:password))
 42      *
 43      *权限(permission):CREATE(C),DELETE(D),READ(R),WRITE(W),ADMIN(A)
 44      *        注:单个权限,完全权限,复合权限
 45      *
 46      *权限组合:scheme+ID+permission
 47      */
 48     
 49     public void process(WatchedEvent event) {
 50         //链接成功
 51                 if(event.getState()==KeeperState.SyncConnected){
 52                     //同步创建节点
 53                     try {
 54                         //基于ip的权限,意味着这个ip的客户端对此节点有读取权限
 55                         ACL ipacl=new ACL(Perms.READ, new Id("ip", "10.151.30.75"));
 56                         //基于digest的权限,意味着只有这个用户名和密码的客户端才能读取和写的权限
 57                         ACL digetacl=new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("shangxiaofei:shangxiaofei")));
 58                     
 59                         List<ACL> myaclAcls=new ArrayList<ACL>();
 60                         myaclAcls.add(ipacl);
 61                         myaclAcls.add(digetacl);
 62                         
 63                         String path=zooKeeper.create("/node_128", "shangxiaofei".getBytes(), myaclAcls, CreateMode.PERSISTENT);
 64                         System.out.println("MyWatcher2.process(创建节点返回的路径:)"+path);
 65                     }catch (NoSuchAlgorithmException e){
 66                         e.printStackTrace();
 67                     } catch (KeeperException e) {
 68                         // TODO Auto-generated catch block
 69                         e.printStackTrace();
 70                     } catch (InterruptedException e) {
 71                         // TODO Auto-generated catch block
 72                         e.printStackTrace();
 73                     }
 74                 }
 75                 
 76     }
 77     
 78     
 79     
 80     
 81 }
 82 
 83 
 84 
 85 
 86 
 87 
 88 package com.yeepay.sxf.createNode;
 89 
 90 import java.io.IOException;
 91 
 92 import org.apache.zookeeper.AsyncCallback;
 93 import org.apache.zookeeper.CreateMode;
 94 import org.apache.zookeeper.WatchedEvent;
 95 import org.apache.zookeeper.Watcher;
 96 import org.apache.zookeeper.Watcher.Event.KeeperState;
 97 import org.apache.zookeeper.ZooDefs.Ids;
 98 import org.apache.zookeeper.ZooKeeper;
 99 /**
100  * zooKeeper异步创建节点测试
101  * @author sxf
102  *
103  */
104 public class TestCreateNodeAsyn implements Watcher {
105 
106     private static ZooKeeper zooKeeper;
107     
108     public static void main(String[] args) throws IOException, InterruptedException {
109         //实例化zooKeeper链接
110         zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestCreateNodeAsyn());
111         //创建链接
112         Thread.sleep(Integer.MAX_VALUE);
113     }
114 
115     @Override
116     public void process(WatchedEvent event) {
117         //链接成功
118                 if(event.getState()==KeeperState.SyncConnected){
119                         //异步创建节点
120                         zooKeeper.create("/node_124", "shangxiaoshuai".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,new ISstringCallBack(),"sxf创建");
121                 }
122                 
123     }
124     
125     
126     //该类数据异步回调接口实现
127     static class ISstringCallBack implements AsyncCallback.StringCallback{
128 
129         //创建成功rc=0
130         //path,创建节点的路径
131         //ctx,创建节点的传入的上下位
132         //name,创建节点的名字
133         @Override
134         public void processResult(int rc, String path, Object ctx, String name) {
135             StringBuffer sb=new StringBuffer();
136             sb.append("rc="+rc).append("\n");
137             sb.append("path="+path).append("\n");
138             sb.append("ctx="+ctx).append("\n");
139             sb.append("name="+name);
140             System.out.println(sb.toString());
141             /**
142              *rc=0
143              *path=/node_124
144              *ctx=sxf创建
145              *name=/node_124
146              *
147              */
148         }
149         
150     }
151     
152 }
View Code

三:客户端删除节点测试

技术分享
  1 package com.yeepay.sxf.deleteNode;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.zookeeper.WatchedEvent;
  6 import org.apache.zookeeper.Watcher;
  7 import org.apache.zookeeper.Watcher.Event.EventType;
  8 import org.apache.zookeeper.Watcher.Event.KeeperState;
  9 import org.apache.zookeeper.ZooKeeper;
 10 
 11 /**
 12  *同步删除节点中存取的值
 13  * @author sxf
 14  *
 15  */
 16 public class TestDeleteNodeSyn implements Watcher{
 17 
 18     private static ZooKeeper zooKeeper;
 19     
 20     
 21     public static void main(String[] args) throws IOException, InterruptedException {
 22                 //实例化zooKeeper链接
 23                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestDeleteNodeSyn());
 24                 //创建链接
 25                 Thread.sleep(Integer.MAX_VALUE);
 26     }
 27 
 28     /**
 29      * event
 30      * (1)zookeeper链接状态 event.getState()
 31      * (2)zookeeper事件类型 event.getType()
 32      * (3)zookeeper事件触发的节点路径 event.getPath()
 33      */
 34     @Override
 35     public void process(WatchedEvent event) {
 36         try{
 37             //已经连接做一件事情
 38             if(event.getState()==KeeperState.SyncConnected){
 39                 
 40                 if(event.getType()==EventType.None&&event.getPath()==null){
 41                     //删除一个节点
 42                     //第一个参数:删除节点的全路径
 43                     //第二个参素:节点的版本(类似乐观锁),当为-1时,对版本无限制
 44                     zooKeeper.delete("/node_127", -1);
 45                 }else if(event.getType()==EventType.NodeDataChanged){
 46                     
 47                 }
 48                 
 49             }
 50             
 51         }catch(Exception e){
 52             e.printStackTrace();
 53         }
 54         
 55         
 56         
 57     }
 58     
 59     
 60 }
 61 
 62 
 63 
 64 
 65 package com.yeepay.sxf.deleteNode;
 66 
 67 import java.io.IOException;
 68 
 69 import org.apache.zookeeper.AsyncCallback;
 70 import org.apache.zookeeper.WatchedEvent;
 71 import org.apache.zookeeper.Watcher;
 72 import org.apache.zookeeper.Watcher.Event.EventType;
 73 import org.apache.zookeeper.Watcher.Event.KeeperState;
 74 import org.apache.zookeeper.ZooKeeper;
 75 
 76 /**
 77  *异步删除节点
 78  * @author sxf
 79  *
 80  */
 81 public class TestDeleteNodeAsyn implements Watcher{
 82 
 83     private static ZooKeeper zooKeeper;
 84     
 85     
 86     public static void main(String[] args) throws IOException, InterruptedException {
 87                 //实例化zooKeeper链接
 88                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestDeleteNodeAsyn());
 89                 //创建链接
 90                 Thread.sleep(Integer.MAX_VALUE);
 91     }
 92 
 93     /**
 94      * event
 95      * (1)zookeeper链接状态 event.getState()
 96      * (2)zookeeper事件类型 event.getType()
 97      * (3)zookeeper事件触发的节点路径 event.getPath()
 98      */
 99     @Override
100     public void process(WatchedEvent event) {
101         try{
102             //已经连接做一件事情
103             if(event.getState()==KeeperState.SyncConnected){
104                 
105                 if(event.getType()==EventType.None&&event.getPath()==null){
106                     //第一个参数:要删除的节点的全路径
107                     //第二个参数:要删除节点的版本号(类似乐观锁)
108                     //第三个参数:异步删除的回调实现类
109                     //第四个参数:删除传入的上下文
110                     zooKeeper.delete("/node_126", -1, new IsDeleteCallBack(), "删除节点sxf");
111                     
112                 }
113                 
114             }
115             
116         }catch(Exception e){
117             e.printStackTrace();
118         }
119         
120         
121         
122     }
123 
124     /**
125      * 异步删除回调接口实现类
126      * @author sxf
127      *
128      */
129     static class IsDeleteCallBack implements AsyncCallback.VoidCallback{
130         
131         //第一个参数:返回删除成功rc
132         @Override
133         public void processResult(int rc, String path, Object ctx) {
134             StringBuffer sb=new StringBuffer();
135             sb.append("rc="+rc).append("\n");
136             sb.append("path="+path).append("\n");
137             sb.append("ctx="+ctx).append("\n");
138             System.out.println(sb.toString());
139             /**
140              * rc=0
141              *    path=/node_126
142              *    ctx=删除节点sxf
143              */
144         }
145         
146     }
147     
148 }
View Code

四:客户端判断节点是否存在测试

技术分享
  1 package com.yeepay.sxf.existsNode;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.zookeeper.WatchedEvent;
  6 import org.apache.zookeeper.Watcher;
  7 import org.apache.zookeeper.Watcher.Event.EventType;
  8 import org.apache.zookeeper.Watcher.Event.KeeperState;
  9 import org.apache.zookeeper.ZooKeeper;
 10 import org.apache.zookeeper.data.Stat;
 11 
 12 /**
 13  *同步判断节点是否存在
 14  * @author sxf
 15  *
 16  */
 17 public class TestExistsNodeSyn implements Watcher{
 18 
 19     private static ZooKeeper zooKeeper;
 20     
 21     
 22     public static void main(String[] args) throws IOException, InterruptedException {
 23                 //实例化zooKeeper链接
 24                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestExistsNodeSyn());
 25                 //创建链接
 26                 Thread.sleep(Integer.MAX_VALUE);
 27     }
 28 
 29     /**
 30      * event
 31      * (1)zookeeper链接状态 event.getState()
 32      * (2)zookeeper事件类型 event.getType()
 33      * (3)zookeeper事件触发的节点路径 event.getPath()
 34      */
 35     @Override
 36     public void process(WatchedEvent event) {
 37         try{
 38             //已经连接做一件事情
 39             if(event.getState()==KeeperState.SyncConnected){
 40                 
 41                 if(event.getType()==EventType.None&&event.getPath()==null){
 42                     //第一个参数:判断节点的全路径
 43                     //第二个参数:是否注册监听器
 44                     Stat stat=zooKeeper.exists("/node_123",true);
 45                     System.out.println(stat);
 46                 }else if(event.getType()==EventType.NodeDataChanged){
 47                     //节点数据改变
 48                     Stat stat=zooKeeper.exists(event.getPath(),true);
 49                     System.out.println("节点数据改变=>"+stat);
 50                 }else if(event.getType()==EventType.NodeCreated){
 51                     //节点被创建
 52                     Stat stat=zooKeeper.exists(event.getPath(),true);
 53                     System.out.println("节点被创建=>"+stat);
 54                 }else if(event.getType()==EventType.NodeDeleted){
 55                     //节点被删除
 56                     Stat stat=zooKeeper.exists(event.getPath(),true);
 57                     System.out.println("节点被删除=>"+stat);
 58                 }
 59                 
 60             }
 61             
 62         }catch(Exception e){
 63             e.printStackTrace();
 64         }
 65         
 66         
 67         
 68     }
 69     
 70     
 71 }
 72 
 73 
 74 
 75 
 76 package com.yeepay.sxf.existsNode;
 77 
 78 import java.io.IOException;
 79 
 80 import org.apache.zookeeper.AsyncCallback;
 81 import org.apache.zookeeper.WatchedEvent;
 82 import org.apache.zookeeper.Watcher;
 83 import org.apache.zookeeper.Watcher.Event.EventType;
 84 import org.apache.zookeeper.Watcher.Event.KeeperState;
 85 import org.apache.zookeeper.ZooKeeper;
 86 import org.apache.zookeeper.data.Stat;
 87 
 88 /**
 89  *异步判断节点是否存在
 90  * @author sxf
 91  *
 92  */
 93 public class TestExistsNodeAsyn implements Watcher{
 94 
 95     private static ZooKeeper zooKeeper;
 96     
 97     
 98     public static void main(String[] args) throws IOException, InterruptedException {
 99                 //实例化zooKeeper链接
100                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestExistsNodeAsyn());
101                 //创建链接
102                 Thread.sleep(Integer.MAX_VALUE);
103     }
104 
105     /**
106      * event
107      * (1)zookeeper链接状态 event.getState()
108      * (2)zookeeper事件类型 event.getType()
109      * (3)zookeeper事件触发的节点路径 event.getPath()
110      */
111     @Override
112     public void process(WatchedEvent event) {
113         try{
114             //已经连接做一件事情
115             if(event.getState()==KeeperState.SyncConnected){
116                 
117                 if(event.getType()==EventType.None&&event.getPath()==null){
118                         zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在");
119                 }else if(event.getType()==EventType.NodeDataChanged){
120                     //节点数据改变
121                     zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在");
122                     System.out.println("TestExistsNodeAsyn.process(节点数据改变)");
123                 }else if(event.getType()==EventType.NodeCreated){
124                     //节点被创建
125                     zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在");
126                     System.out.println("TestExistsNodeAsyn.process(节点被创建)");
127                 }else if(event.getType()==EventType.NodeDeleted){
128                     //节点被删除
129                     zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在");
130                     System.out.println("TestExistsNodeAsyn.process(节点被删除)");
131                 }
132                 
133             }
134             
135         }catch(Exception e){
136             e.printStackTrace();
137         }
138         
139         
140         
141     }
142     
143     
144     //节点是否存在的异步回调接口实现类
145     static class IsStatCallBack implements AsyncCallback.StatCallback{
146 
147         @Override
148         public void processResult(int rc, String path, Object ctx, Stat stat) {
149             StringBuffer sb=new StringBuffer();
150             sb.append("rc="+rc).append("\n");
151             sb.append("path="+path).append("\n");
152             sb.append("ctx="+ctx).append("\n");
153             sb.append("stat="+stat).append("\n");
154             System.out.println(sb.toString());
155         }
156         
157     }
158     
159 }
View Code

五:客户端获取节点列表测试

技术分享
  1 package com.yeepay.sxf.getNode;
  2 
  3 import java.io.IOException;
  4 import java.util.List;
  5 
  6 import org.apache.zookeeper.WatchedEvent;
  7 import org.apache.zookeeper.Watcher;
  8 import org.apache.zookeeper.Watcher.Event.EventType;
  9 import org.apache.zookeeper.Watcher.Event.KeeperState;
 10 import org.apache.zookeeper.ZooKeeper;
 11 
 12 /**
 13  * 同步获取节点列表node
 14  * @author sxf
 15  *
 16  */
 17 public class TestGetNodeSyn implements Watcher{
 18 
 19     private static ZooKeeper zooKeeper;
 20     
 21     public static void main(String[] args) throws IOException, InterruptedException {
 22                 //实例化zooKeeper链接
 23                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestGetNodeSyn());
 24                 //创建链接
 25                 Thread.sleep(Integer.MAX_VALUE);
 26     }
 27 
 28     /**
 29      * event
 30      * (1)zookeeper链接状态 event.getState()
 31      * (2)zookeeper事件类型 event.getType()
 32      * (3)zookeeper事件触发的节点路径 event.getPath()
 33      */
 34     @Override
 35     public void process(WatchedEvent event) {
 36         try{
 37             //已经连接做一件事情
 38             if(event.getState()==KeeperState.SyncConnected){
 39                 
 40                 if(event.getType()==EventType.None&&event.getPath()==null){
 41                     //获取制定节点的子节点列表
 42                     //第一参数:节点路径
 43                     //第二个参数:是否需要关注该节点的子节点列表变化。关注:true  不关注:false  (事件监听器)
 44                     List<String> nodesList=zooKeeper.getChildren("/", true);
 45                     System.out.println("/ 下的子节点列表==>"+nodesList);
 46                     /**
 47                      * / 下的子节点列表==>[node_123, node_124, zookeeper]
 48                      */
 49                 }else if(event.getType()==EventType.NodeChildrenChanged){
 50                     //如果节点发生变化,则会触发该事件.当别的地方添加一个节点,则该方法被调用
 51                     System.out.println("TestGetNodeSyn.process()变化的地址:"+event.getPath());//TestGetNodeSyn.process()变化的地址:/
 52                     List<String> nodesList=zooKeeper.getChildren(event.getPath(), true);
 53                     System.out.println("TestGetNodeSyn.process(节点发生变化==>)"+nodesList);
 54                     /**
 55                      * TestGetNodeSyn.process(节点发生变化==>)[node_123, node_124, zookeeper, node_125]
 56                      */
 57                 }
 58                 
 59             }
 60             
 61         }catch(Exception e){
 62             e.printStackTrace();
 63         }
 64         
 65         
 66         
 67     }
 68     
 69     
 70 }
 71 
 72 
 73 
 74 package com.yeepay.sxf.getNode;
 75 
 76 import java.io.IOException;
 77 import java.util.List;
 78 
 79 import org.apache.zookeeper.AsyncCallback;
 80 import org.apache.zookeeper.WatchedEvent;
 81 import org.apache.zookeeper.Watcher;
 82 import org.apache.zookeeper.Watcher.Event.EventType;
 83 import org.apache.zookeeper.Watcher.Event.KeeperState;
 84 import org.apache.zookeeper.ZooKeeper;
 85 import org.apache.zookeeper.data.Stat;
 86 
 87 /**
 88  * 异步获取节点列表
 89  * @author sxf
 90  *
 91  */
 92 public class TestGetNodeAsyn implements Watcher {
 93 
 94     private static ZooKeeper zooKeeper;
 95     
 96     public static void main(String[] args) throws IOException, InterruptedException {
 97         //实例化zooKeeper链接
 98         zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestGetNodeAsyn());
 99         //创建链接
100         Thread.sleep(Integer.MAX_VALUE);
101     }
102 
103     /**
104      * event
105      * (1)zookeeper链接状态 event.getState()
106      * (2)zookeeper事件类型 event.getType()
107      * (3)zookeeper事件触发的节点路径 event.getPath()
108      */
109     
110     @Override
111     public void process(WatchedEvent event) {
112         try{
113             //已经连接做一件事情
114             if(event.getState()==KeeperState.SyncConnected){
115                 
116                 if(event.getType()==EventType.None&&event.getPath()==null){
117                     //获取制定节点的子节点列表
118                     //第一参数:节点路径
119                     //第二个参数:是否需要关注该节点的子节点列表变化。关注:true  不关注:false  (事件监听器)
120                     //第三个参数:异步回调函数
121                     //第四个参数:异步获取传入上下文参数
122                     zooKeeper.getChildren("/", true, new ISChildrenCallback(), "SXF获取子节点列表");
123                     /**    
124                      * / 下的子节点列表==>[node_123, node_124, zookeeper]
125                      */
126                 }else if(event.getType()==EventType.NodeChildrenChanged){
127                     zooKeeper.getChildren(event.getPath(), true, new ISChildrenCallback(), "SXF获取子节点列表");
128                 }
129                 
130             }
131             
132         }catch(Exception e){
133             e.printStackTrace();
134         }
135     }
136     
137     
138     //异步获取子节点列表的回调
139     static class ISChildrenCallback implements AsyncCallback.Children2Callback{
140 
141         @Override
142         public void processResult(int rc, String path, Object ctx,List<String> children, Stat stat) {
143             StringBuffer sb=new StringBuffer();
144             sb.append("rc="+rc).append("\n");
145             sb.append("path="+path).append("\n");
146             sb.append("ctx="+ctx).append("\n");
147             sb.append("children="+children).append("\n");
148             sb.append("stat="+stat).append("\n");
149             System.out.println(sb.toString());
150             /**
151              * rc=0
152              *    path=/
153              *    ctx=SXF获取子节点列表
154              *    children=[node_126, node_127, node_123, node_124, zookeeper, node_125]
155              *    stat=0,0,0,0,0,8,0,0,0,6,4294967354
156              */
157             
158         }
159         
160     }
161     
162 }
View Code

六:客户端获取节点值测试

技术分享
  1 package com.yeepay.sxf.getNodeValue;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.zookeeper.WatchedEvent;
  6 import org.apache.zookeeper.Watcher;
  7 import org.apache.zookeeper.Watcher.Event.EventType;
  8 import org.apache.zookeeper.Watcher.Event.KeeperState;
  9 import org.apache.zookeeper.ZooKeeper;
 10 import org.apache.zookeeper.data.Stat;
 11 
 12 /**
 13  *同步获取节点中存取的值
 14  * @author sxf
 15  *
 16  */
 17 public class TestGetNodeValueSyn implements Watcher{
 18 
 19     private static ZooKeeper zooKeeper;
 20     
 21     
 22     public static void main(String[] args) throws IOException, InterruptedException {
 23                 //实例化zooKeeper链接
 24                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestGetNodeValueSyn());
 25                 //创建链接
 26                 Thread.sleep(Integer.MAX_VALUE);
 27     }
 28 
 29     /**
 30      * event
 31      * (1)zookeeper链接状态 event.getState()
 32      * (2)zookeeper事件类型 event.getType()
 33      * (3)zookeeper事件触发的节点路径 event.getPath()
 34      */
 35     @Override
 36     public void process(WatchedEvent event) {
 37         try{
 38             //已经连接做一件事情
 39             if(event.getState()==KeeperState.SyncConnected){
 40                 
 41                 if(event.getType()==EventType.None&&event.getPath()==null){
 42                     //如果没有权限,可以注册权限
 43                     zooKeeper.addAuthInfo("digest", "shangxiaofei:shangxiaofei".getBytes());
 44                     
 45                     
 46                     //第一个参数:要获取节点存储值的全路径
 47                     //第二个参数:是否注册一个事件监听器
 48                     //第三个参数:一个状态实例
 49                     byte[] nodeValueByte=zooKeeper.getData("/node_128", true, new Stat());
 50                     System.out.println("TestGetNodeValueSyn.process(获取/node_128的值:)"+new String(nodeValueByte));
 51                     //TestGetNodeValueSyn.process(获取/node_127的值:)aaaaaaaa
 52                 }else if(event.getType()==EventType.NodeDataChanged){
 53                     //当关注的节点的值发生了变化则通知
 54                     byte[] nodeValueByte=zooKeeper.getData(event.getPath(), true, new Stat());
 55                     System.out.println("TestGetNodeValueSyn.process(/node_127的值发生了变化:)"+new String(nodeValueByte));
 56                     //TestGetNodeValueSyn.process(/node_127的值发生了变化:)shuihongjie
 57                 }
 58                 
 59             }
 60             
 61         }catch(Exception e){
 62             e.printStackTrace();
 63         }
 64         
 65         
 66         
 67     }
 68     
 69     
 70 }
 71 
 72 
 73 
 74 
 75 package com.yeepay.sxf.getNodeValue;
 76 
 77 import java.io.IOException;
 78 
 79 import org.apache.zookeeper.AsyncCallback;
 80 import org.apache.zookeeper.WatchedEvent;
 81 import org.apache.zookeeper.Watcher;
 82 import org.apache.zookeeper.Watcher.Event.EventType;
 83 import org.apache.zookeeper.Watcher.Event.KeeperState;
 84 import org.apache.zookeeper.ZooKeeper;
 85 import org.apache.zookeeper.data.Stat;
 86 
 87 /**
 88  * 异步获取节点中存取的值
 89  * @author sxf
 90  *
 91  */
 92 public class TestGetNodeValueASyn implements Watcher{
 93 
 94     private static ZooKeeper zooKeeper;
 95     
 96     
 97     public static void main(String[] args) throws IOException, InterruptedException {
 98                 //实例化zooKeeper链接
 99                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestGetNodeValueASyn());
100                 //创建链接
101                 Thread.sleep(Integer.MAX_VALUE);
102     }
103 
104     /**
105      * event
106      * (1)zookeeper链接状态 event.getState()
107      * (2)zookeeper事件类型 event.getType()
108      * (3)zookeeper事件触发的节点路径 event.getPath()
109      */
110     @Override
111     public void process(WatchedEvent event) {
112         try{
113             //已经连接做一件事情
114             if(event.getState()==KeeperState.SyncConnected){
115                 
116                 if(event.getType()==EventType.None&&event.getPath()==null){
117                     //异步获取节点中的值
118                     zooKeeper.getData("/node_127", true, new IsDataCallBack(), "sxf修改");
119                 }else if(event.getType()==EventType.NodeDataChanged){
120                     zooKeeper.getData(event.getPath(), true, new IsDataCallBack(), "sxf修改");
121                 }
122                 
123             }
124             
125         }catch(Exception e){
126             e.printStackTrace();
127         }
128         
129         
130         
131     }
132     
133     
134     //异步获取数据的回调函数
135     static class IsDataCallBack implements  AsyncCallback.DataCallback{
136 
137         @Override
138         public void processResult(int rc, String path, Object ctx, byte[] data,
139                 Stat stat) {
140             StringBuffer sb=new StringBuffer();
141             sb.append("rc="+rc).append("\n");
142             sb.append("path="+path).append("\n");
143             sb.append("ctx="+ctx).append("\n");
144             sb.append("data="+new String(data)).append("\n");
145             sb.append("stat="+stat).append("\n");
146             System.out.println(sb.toString());
147             /**
148              * rc=0
149              *path=/node_127
150              *ctx=sxf修改
151              *data=tianxiabangchang
152              *stat=4294967354,4294967367,1456371531643,1456373368574,4,0,0,0,16,0,4294967354
153              */
154         }
155         
156     }
157     
158 }
View Code

七:客户端修改节点数据测试

技术分享
  1 package com.yeepay.sxf.updateNodeData;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.zookeeper.WatchedEvent;
  6 import org.apache.zookeeper.Watcher;
  7 import org.apache.zookeeper.Watcher.Event.EventType;
  8 import org.apache.zookeeper.Watcher.Event.KeeperState;
  9 import org.apache.zookeeper.ZooKeeper;
 10 import org.apache.zookeeper.data.Stat;
 11 
 12 /**
 13  *同步修改节点数据
 14  * @author sxf
 15  *
 16  */
 17 public class TestUpdateNodeValueSyn implements Watcher{
 18 
 19     private static ZooKeeper zooKeeper;
 20     
 21     
 22     public static void main(String[] args) throws IOException, InterruptedException {
 23                 //实例化zooKeeper链接
 24                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestUpdateNodeValueSyn());
 25                 //创建链接
 26                 Thread.sleep(Integer.MAX_VALUE);
 27     }
 28 
 29     /**
 30      * event
 31      * (1)zookeeper链接状态 event.getState()
 32      * (2)zookeeper事件类型 event.getType()
 33      * (3)zookeeper事件触发的节点路径 event.getPath()
 34      */
 35     @Override
 36     public void process(WatchedEvent event) {
 37         try{
 38             //已经连接做一件事情
 39             if(event.getState()==KeeperState.SyncConnected){
 40                 
 41                 if(event.getType()==EventType.None&&event.getPath()==null){
 42                     //第一个参数:要修改的节点的全路径
 43                     //第二个参数:要修改的节点的数据内容
 44                     //第三个参数:版本号
 45                     Stat stat=zooKeeper.setData("/node_124", "sxfupdatenodedata".getBytes(), -1);
 46                     System.out.println("TestUpdateNodeValueSyn.process()"+stat);
 47                 }
 48                 
 49             }
 50             
 51         }catch(Exception e){
 52             e.printStackTrace();
 53         }
 54         
 55         
 56         
 57     }
 58     
 59     
 60 }
 61 
 62 
 63 package com.yeepay.sxf.updateNodeData;
 64 
 65 import java.io.IOException;
 66 
 67 import org.apache.zookeeper.AsyncCallback;
 68 import org.apache.zookeeper.WatchedEvent;
 69 import org.apache.zookeeper.Watcher;
 70 import org.apache.zookeeper.Watcher.Event.EventType;
 71 import org.apache.zookeeper.Watcher.Event.KeeperState;
 72 import org.apache.zookeeper.ZooKeeper;
 73 import org.apache.zookeeper.data.Stat;
 74 
 75 /**
 76  *异步修改节点数据
 77  * @author sxf
 78  *
 79  */
 80 public class TestUpdateNodeValueAsyn implements Watcher{
 81 
 82     private static ZooKeeper zooKeeper;
 83     
 84     
 85     public static void main(String[] args) throws IOException, InterruptedException {
 86                 //实例化zooKeeper链接
 87                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestUpdateNodeValueAsyn());
 88                 //创建链接
 89                 Thread.sleep(Integer.MAX_VALUE);
 90     }
 91 
 92     /**
 93      * event
 94      * (1)zookeeper链接状态 event.getState()
 95      * (2)zookeeper事件类型 event.getType()
 96      * (3)zookeeper事件触发的节点路径 event.getPath()
 97      */
 98     @Override
 99     public void process(WatchedEvent event) {
100         try{
101             //已经连接做一件事情
102             if(event.getState()==KeeperState.SyncConnected){
103                 
104                 if(event.getType()==EventType.None&&event.getPath()==null){
105                     //第一个参数:要修改的节点的全路径
106                     //第二个参数:要修改的节点的数据内容
107                     //第三个参数:版本号
108                     zooKeeper.setData("/node_124", "yangkai".getBytes(), -1, new ISupdateDataCallBack(), "sxf修改节点数据");
109                 }
110                 
111             }
112             
113         }catch(Exception e){
114             e.printStackTrace();
115         }
116         
117         
118         
119     }
120     
121     /**
122      * 异步修改节点数据的回调接口的实现
123      * @author sxf
124      *
125      */
126     static class ISupdateDataCallBack implements AsyncCallback.StatCallback{
127 
128         @Override
129         public void processResult(int rc, String path, Object ctx, Stat stat) {
130             StringBuffer sb=new StringBuffer();
131             sb.append("rc="+rc).append("\n");
132             sb.append("path="+path).append("\n");
133             sb.append("ctx="+ctx).append("\n");
134             sb.append("stat="+stat).append("\n");
135             System.out.println(sb.toString());
136         }
137         
138     }
139 }
View Code

 

六:ZooKeeper的java客户端api的使用

标签:

原文地址:http://www.cnblogs.com/shangxiaofei/p/5218078.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!