码迷,mamicode.com
首页 > 其他好文 > 详细

zookeeper watcher学习笔记

时间:2015-03-17 18:20:13      阅读:182      评论:0      收藏:0      [点我收藏+]

标签:

zookeeper watcher学习笔记


All of the read operations in ZooKeeper - getData(), getChildren(), and exists() - have the option of setting a watch as a side effect. Here is ZooKeeper‘s definition of a watch: a watch event is one-time trigger, sent to the client that set the watch, which occurs when the data for which the watch was set changes。


getData,getChildren(),exists()这三个方法可以针对参数中的path设置watcher,当path对应的Node 有相应变化时,server端会给对应的设置了watcher的client 发送一个一次性的触发通知事件。客户端在收到这个触发通知事件后,可以根据自己的业务逻辑进行相应地处理。


注意这个watcher的功能是一次性的,如果还想继续得到watcher通知,在处理完事件后,要重新register。


请求的处理流程:

下面以 public byte[] getData(final String path, Watcher watcher, Stat stat) 接口为例进行说明:


1. 在有Watcher的情况下

对照右上图的类图,把watcher,path包装成一个DataWatchRegistration对象。整个WatchRegistration的结 构主要是利用了继承的多态性,不同子类的getWatches方法返回不同的结果集,而这些对调用方是屏蔽的。


2. 构造请求头

不同方法请求头的主要区别是type不一样


3. 构造request,reponse

从类图中也可以看出,不同的方法对象于不同的request,response对象,对getData方法来说,是对应于 GetDataRequest,GetDataResponse。

GetDataRequest对象中的属性是:path,以及一个boolean 值 watch用来表示这个请求是否有watch。


4. 构造响应头

这时候,ReplyHeader还是一个空对象,这里的内容要等nio返回内容时进行填充。


5. 构造 Packet对象

包含:RequestHeader,ReplyHeader,request,response, watchRegistration

在这里把 requestHeader,request对象进行序列化,放入packet对象的Bytebuffer 属性中。

而replyHeader,response对象目前都是空,内容要等nio返回内容,解释出Bytebuffer中内容进行填充。

同时在一开始就构造的DataWatchRegistration对象赋值给packet对象中的watchRegistration属性,这属性会在收 到packet对象时有作用,下面再介绍。


6. packet对象准备好后,把整个对象放入一个outgoingQueue

LinkedList中,就等着通过nio把packet对象中byteBuffer中的内容 发送给server端面。之所以使用LinkedList,是因为它提供了操作头,尾的方法。


7. packet被放入outgoingQueue中,等待SendThread把packet对应的内容发送给server。


8.如果是带callback的异步调用,则整个调用过程就结束,如果是同步调用的话,判断packet的finished状态是否为true,如果为false,进行wait,等待nio得到response后,把packet的状态改成finished为true,调用notify通知当前等待。


后续NIO操作,和server进行数据传输就交给专门的SendThread来处理。


整个nio主要是是围绕Selector,SocketChannel,SelectionKey,ByteBuffer这四个对象进行操作。


在这里围绕主流程来描述:

1. 当SelectionKey  处于isWritable状态时

A——从 outgoingQueue 中取出一个packet中的byteBuffer内容,写入socketChannel。

B——从outgoingQueue 中remove 第一个packet,

C——如果这个packet对应的头不是ping, auth类型,把这个packet放入pendingQueue。因为这两个请求的返回不需要额外处理,因此也就不需要放在等待返回的对列中。   pendingQueue的作用就是:因为采用了NIO返回是异步的,当结果返回时,要能找到原来请求的对象,所以要维护这么一个列队来保存已经被发送, 但还没收到返回的Packet对象。


响应的流程分析

响应的数据类型:sendthread接收来自server的response类型:

1)针对心跳的ping请求的resp

2)针对auth请求的resp

3)一般接口请求的resp

4)如果接口请求要求了watcher,当watcher关注的内容有变化时的notification


一般接口请求的resp处理:

1)  判断从ByteBuffer中反序列化出来的replyHeader中的xid和 pendingQueue中第一个packet维护的xid是否相同。Zookeeper是保证发发送的packet会发收到response,在这里是对这个有序性进行验证。

2)  反序列化出 response对象的内容。

3)  这个时候已经拿到了packet的响应内容,但为了对callback,watcher功能的支持,还需要额外的处理:

4)  A:如果这个packet包含了watcher,将这个请求对象的watcher注册到watcherManager,这是为了当针对watcher的notification响应到达的时候,能找到对应的watcher。

5)  B: 如果接口是同步调用的话,这时设置packet的finished为ture,并且通过notify进行通知。

6)  C: 如果是带callback的异步调用,则将packet放入eventThread,让eventThread异步调用callback接口。


针对watcher对应notification 的resp处理

如果sendthread分析出当前的response是针对watcher的notification,

1)  将reponse反序列化成WatcherEvent

2)  WatcherEvent转化成WatchedEvent

3)  之前说过如果请求带watcher,在返回时,会在watcherManager中注册对应的watcher。当收到WatchedEvent后,就可以根据event的数据从watcherManger中取到对应的watcher集合。

4)  将WatchedEvent和对应的watcher集合封装成WatcherSetEventPair

5)  WatcherSetEventPair放入eventThread中的waitingEvents列表

6)  eventthread在run循环中,取中WatcherSetEventPair,调用其中的watcher接口。

=============================================================

以上是watcher的原理和机制的分析。说实话也没怎么看懂。

下面就做几个demo来体会一下。

首先看一下WatchedEvent,该事件有state、type和path属性,如下,

public class WatchedEvent {
    final private KeeperState keeperState;
    final private EventType eventType;
    private String path;
    ........
}

那么state表示什么含义,type又表示什么含义:

下面来看KeeperState 的注解。

public enum KeeperState {
    /** Unused, this state is never generated by the server */
    @Deprecated
    Unknown (-1),

    /** The client is in the disconnected state - it is not connected
     * to any server in the ensemble. */
    Disconnected (0),

    /** Unused, this state is never generated by the server */
    @Deprecated
    NoSyncConnected (1),

    /** The client is in the connected state - it is connected
     * to a server in the ensemble(全体,总效果) (one of the servers specified
     * in the host connection parameter during ZooKeeper client
     * creation). */
    SyncConnected (3),

    /**
     * Auth failed state
     */
    AuthFailed (4),

    /**
     * The client is connected to a read-only server, that is the
     * server which is not currently connected to the majority.
     * The only operations allowed after receiving this state is
     * read operations.
     * This state is generated for read-only clients only since
     * read/write clients aren‘t allowed to connect to r/o servers.
     */
    ConnectedReadOnly (5),

    /**
     * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
     * so that they can perform Zookeeper actions with their SASL-authorized permissions.
     */
    SaslAuthenticated(6),

    /** The serving cluster has expired this session. The ZooKeeper
     * client connection (the session) is no longer valid. You must
     * create a new client connection (instantiate a new ZooKeeper
     * instance) if you with to access the ensemble. */
    Expired (-112);
    ........
}

再看EventType,

/**
 * Enumeration of types of events that may occur on the ZooKeeper
 */
public enum EventType {
    None(-1),
    NodeCreated(1),
    NodeDeleted(2),
    NodeDataChanged(3),
    NodeChildrenChanged(4);
}

ok,很好明白。下面就做一个示例,

package com.usfot;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;

/**
 * Created by liyanxin on 2015/3/17.
 */
public class ZookeeperWathcherDemo {

    //互斥锁
    private static Integer mutex = new Integer(-1);

    public static void main(String args[]) throws KeeperException, InterruptedException, IOException {

        ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 300000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                System.out.println("状态:" + event.getState() + "|类型:" + event.getType() +
                        "|Wrapper:" + event.getWrapper() + "|Path:" + event.getPath());
            }
        });

        // 创建一个目录节点
        zk.create("/testRootPath", "testRootData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
        // 创建一个子目录节点
        zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);


        zk.getData("/testRootPath/testChildPathOne", new Watcher() {
            @Override
            public void process(WatchedEvent event) {

                if (event.getState() == Event.KeeperState.SyncConnected
                        && event.getType() == Event.EventType.NodeDataChanged) {
                    System.out.println("znode:/testRootPath/testChildPathOne data change");
                    synchronized (mutex) {
                        mutex.notify();
                    }
                }
            }
        }, null);

        synchronized (mutex) {
            mutex.wait(); //阻塞直到notify
        }
        zk.close();
    }
}

这段代码对路径为/testRootPath/testChildPathOne的znode注册了一个watcher,该watcher监听的state为SyncConnected,并且类型为NodeDataChanged,当发生这个时间后,打印消息,并且notify条件变量mutex,使程序结束。

是如何使该znode发生改变呢,如下打开zk的客户端,

[zk: localhost:2181(CONNECTED) 26] set /testRootPath/testChildPathOne mydata
cZxid = 0x700000044
ctime = Tue Mar 17 17:46:12 CST 2015
mZxid = 0x700000045
mtime = Tue Mar 17 17:47:49 CST 2015
pZxid = 0x700000044
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
[zk: localhost:2181(CONNECTED) 27]

这时那段程序打印消息,如下,

状态:SyncConnected|类型:None|Wrapper:-1,3,
|Path:null
znode:/testRootPath/testChildPathOne data change

Process finished with exit code 0

参考:http://luzengyi.blog.163.com/blog/static/529188201064113744373/

====================================END====================================


zookeeper watcher学习笔记

标签:

原文地址:http://my.oschina.net/xinxingegeya/blog/388088

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!