标签:code ber RoCE abstract exists efault 源码 步骤 类型
1.watcher原理框架
由图看出,zk的watcher由客户端,客户端WatchManager,zk服务器组成。整个过程涉及了消息通信及数据存储。
有木有看到小红旗?加入小红旗是一个watcher,当小红旗被创建并注册到node1节点(会有相应的API实现)后,就会监听node1+node_a+node_b或node_a+node_b。这里两种情况是因为在创建watcher注册时会有多种途径。并且watcher不能监听到孙节点。注意,watcher设置后,一旦触发一次后就会失效,如果要想一直监听,需要在process回调函数里重新注册相同的 watcher。
2.通知状态与事件
public class WatcherTest implements Watcher { @Override public void process(WatchedEvent event) { // TODO Auto-generated method stub WatcherTest w = new WatcherTest(); ZooKeeper zk = new ZooKeeper(wx.getZkpath(),10000, w); } public static void main(String[] args){ WatcherTest w = new WatcherTest(); ZooKeeper zk = new ZooKeeper(wx.getZkpath(), 10000, w); } }
上面例子是把异常处理,逻辑处理等都省掉。watcher的应用很简单,主要有两步:继承 Watcher 接口,重写 process 回调函数。
当然注册方式有很多,有默认和重新覆盖方式,可以一次触发失效也可以一直有效触发。这些都可以通过代码实现。
2.1 KeeperStatus通知状态
KeeperStatus完整的类名是org.apache.zookeeper.Watcher.Event.KeeperState。
2.2 EventType事件类型
EventType完整的类名是org.apache.zookeeper.Watcher.Event.EventType。
此图是zookeeper常用的通知状态与对应事件类型的对应关系。除了客户端与服务器连接状态下,有多种事件的变化,其他状态的事件都是None。这也是符合逻辑的,因为没有连接服务器肯定不能获取获取到当前的状态,也就无法发送对应的事件类型了。
这里重点说下几个重要而且容易迷惑的事件:
客户端只能收到服务器发过来的相关事件通知,并不能获取到对应数据节点的原始数据及变更后的新数据。因此,如果业务需要知道变更前的数据或者变更后的新数据,需要业务保存变更前的数据(本机数据结构、文件等)和调用接口获取新的数据
3.watcher注册过程
3.1涉及接口
创建zk客户端对象实例时注册:
通过这种方式注册的watcher将会作为整个zk会话期间的默认watcher,会一直被保存在客户端ZK WatchManager 的 defaultWatcher 中,如果这个被创建的节点在其它时候被创建watcher并注册,则这个默认的watcher会被覆盖。注意,watcher触发一次就会失效,不管是创建节点时的 watcher 还是以后创建的 watcher。
其他注册watcher的API:
举栗子
这就是watcher的简单例子,zk的实际应用集群管理,发布订阅等复杂功能其实就在这个小例子上拓展的。
3.2客户端注册
这里的客户端注册主要是把上面第一点的zookeeper原理框架的注册步骤展开,简单来说就是zk客户端在注册时会先向zk服务器请求注册,服务器会返回请求响应,如果响应成功则zk服务端把watcher对象放到客户端的WatchManager管理并返回响应给客户端。
3.3服务器端注册
FinalRequestProcessor:
/** * This Request processor actually applies any transaction associated with a * request and services any queries. It is always at the end of a * RequestProcessor chain (hence the name), so it does not have a nextProcessor * member. * * This RequestProcessor counts on ZooKeeperServer to populate the * outstandingRequests member of ZooKeeperServer. */ public class FinalRequestProcessor implements RequestProcessor
由源码注释得知,FinalRequestProcessor类实际是任何事务请求和任何查询的的最终处理类。也就是我们客户端对节点的set/get/delete/create/exists等操作最终都会运行到这里。
以exists函数为例子:
case OpCode.exists: { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! ExistsRequest existsRequest = new ExistsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); String path = existsRequest.getPath(); if (path.indexOf(‘\0‘) != -1) { throw new KeeperException.BadArgumentsException(); } Stat stat = zks.getZKDatabase().statNode(path, existsRequest .getWatch() ? cnxn : null); rsp = new ExistsResponse(stat); break; }
existsRequest.getWatch() ? cnxn : null此句是在调用exists API时,判断是否注册watcher,若是就返回 cnxn,cnxn是由此句代码ServerCnxn cnxn = request.cnxn;创建的。
/** * Interface to a Server connection - represents a connection from a client * to the server. */ public abstract class ServerCnxn implements Stats, Watcher
通过ServerCnxn类的源码注释得知,ServerCnxn是维持服务器与客户端的tcp连接与实现了 watcher。总的来说,ServerCnxn类创建的对象cnxn即包含了连接信息又包含watcher信息。
同时仔细看ServerCnxn类里面的源码,发现有以下这个函数,process函数正是watcher的回调函数啊。
public abstract class ServerCnxn implements Stats, Watcher { . . public abstract void process(WatchedEvent event); Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null); //getZKDatabase实际上是获取是在zookeeper运行时的数据库。请看下面 . . }
ZKDatabase:
/** * This class maintains the in memory database of zookeeper * server states that includes the sessions, datatree and the * committed logs. It is booted up after reading the logs * and snapshots from the disk. */ public class ZKDatabase
通过源码注释得知ZKDatabase是在zookeeper运行时的数据库,在FinalRequestProcessor的case exists中会把existsRequest(exists请求传递给ZKDatabase)。
/** * the datatree for this zkdatabase * @return the datatree for this zkdatabase */ public DataTree getDataTree() { return this.dataTree; }
ZKDatabase里面有这关键的一个函数是从zookeeper运行时展开的节点数型结构中搜索到合适的节点返回。
watchManager
class WatchManager { private final Map<String, Set<Watcher>> watchTable = new HashMap<String, Set<Watcher>>(); private final Map<Watcher, Set<String>> watch2Paths = new HashMap<Watcher, Set<String>>(); Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null);} }
watcher触发
public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix = getMaxPrefixWithQuota(path); if(lastPrefix != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); //触发事件 return s; }
客户端回调watcher步骤:
标签:code ber RoCE abstract exists efault 源码 步骤 类型
原文地址:https://www.cnblogs.com/ericz2j/p/11168669.html