使用Watcher需要先实现Watcher接口,并将实现类对象传递到指定方法中,如getChildren, exist等。Zookeeper允许在构造Zookeeper对象时候指定一个默认Watcher对象.getChildren和exit方法可以使用这个默认的Watcher对象,也可以指定一个新Watcher对象。
Code 1: Watcher接口
public interface Watcher { /** * Event的状态 */ public interface Event { /** * 在事件发生时,ZooKeeper的状态 */ public enum KeeperState { @Deprecated Unknown (-1), Disconnected (0), @Deprecated NoSyncConnected (1), SyncConnected (3), AuthFailed (4), ConnectedReadOnly (5), SaslAuthenticated(6), Expired (-112); private final int intValue; KeeperState( int intValue) { this.intValue = intValue; } ...... } /** * ZooKeeper中的事件 */ public enum EventType { None (-1), NodeCreated (1), NodeDeleted (2), NodeDataChanged (3), NodeChildrenChanged (4); private final int intValue; // Integer representation of value // for sending over wire EventType( int intValue) { this.intValue = intValue; } ...... } } //Watcher的回调方法 abstract public void process(WatchedEvent event); }
Code 2: Zookeeper.getChildren(final String, Watcher)方法
public List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils. validatePath(clientPath); WatchRegistration wcb = null; //如果watcher不等于null, 构建WatchRegistration对象, //该对象描述了watcher和path之间的关系 if (watcher != null) { wcb = new ChildWatchRegistration(watcher, clientPath); } //在传入的path加上root path前缀,构成服务器端的绝对路径 final String serverPath = prependChroot(clientPath); //构建RequestHeader对象 RequestHeader h = new RequestHeader(); //设置操作类型为OpCode. getChildren h.setType(ZooDefs.OpCode. getChildren); //构建GetChildrenRequest对象 GetChildrenRequest request = new GetChildrenRequest(); //设置path request.setPath(serverPath); //设置是否使用watcher request.setWatch(watcher != null); //构建GetChildrenResponse对象 GetChildrenResponse response = new GetChildrenResponse(); //提交请求,并阻塞等待结果 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code. get(r.getErr()), clientPath); } return response.getChildren(); }
FinalRequestProcessor.processRequest方法最终会调用ZKDatabase中的读操作方法(如statNode和getData方法), 而ZKDatabase的这些方法会最终调用DataTree类的方法来获取指定path的znode信息并返回给Client端,同时也会设置Watcher。
Code 3: FinalRequestProcessor对OpCode.getData请求的处理
case OpCode. getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream. byteBuffer2Record(request.request, getDataRequest); //获得znode对象 DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); //n为null, 抛出NoNodeException异常 if (n == null) { throw new KeeperException.NoNodeException(); } Long aclL; synchronized(n) { aclL = n. acl; } //检查是否有读权限 PrepRequestProcessor. checkACL(zks, zks.getZKDatabase().convertLong(aclL), ZooDefs.Perms. READ, request. authInfo); //构建状态对象stat Stat stat = new Stat(); //获得指定path的znode数据, //如果GetDataRequest.getWatcher()返回true, 将ServerCnxn类型对象cnxn传递进去。 //ServerCnxn是实现了Watcher接口 byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest. getWatch() ? cnxn : null); //构建GetDataResponse对象 rsp = new GetDataResponse(b, stat); break; }
Code 4: DataTree.getData()方法
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { //从nodes map中获取指定path的DataNode对象 DataNode n = nodes.get(path); //如果n为null, 则抛出NoNodeException异常 if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { //将n的状态copy到stat中 n.copyStat(stat); //如果watcher不会null, 则将(path, watcher)键值对放入dataWatchers Map里 if (watcher != null) { dataWatches.addWatch(path, watcher); } //返回节点数据 return n.data ; } }
Code 5: DataTree.setData()方法
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); //根据path, 获得DataNode对象n DataNode n = nodes.get(path); //如果n为null, 则抛出NoNodeException异常 if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n. data; n. data = data; n. stat.setMtime(time); n. stat.setMzxid(zxid); n. stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix = getMaxPrefixWithQuota(path); if(lastPrefix != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length )); } //触发Watcher dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
Code 6: WatchManage.triggerWatcher()方法,触发Watcher。
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState. SyncConnected, path); HashSet<Watcher> watchers; synchronized (this ) { //从watchTable删除掉path对于的watcher watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG .isTraceEnabled()) { ZooTrace. logTraceMessage(LOG, ZooTrace. EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } //循环处理所有关于path的Watcher, 这里Watcher对象实际上就是ServerCnxn类型对象 for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
Code 7: NIOServerCnxn.process方法,发送notification给Client端
synchronized public void process (WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG .isTraceEnabled()) { ZooTrace. logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK , "Deliver event " + event + " to 0x" + Long. toHexString(this. sessionId) + " through " + this ); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); //发送notification给Client端 sendResponse(h, e, "notification"); }
Watcher具有one-time trigger的特性,在代码中我们也可以看到一个watcher被处理后会立即从watchTable中删掉。