码迷,mamicode.com
首页 > 其他好文 > 详细

zookeeper - 客户端源码分析

时间:2016-08-22 00:14:18      阅读:219      评论:0      收藏:0      [点我收藏+]

标签:

  zookeeper客户端的主类入口是Zookeeper类,负责与zookeeper server端的通信以及触发watcher等。

下文主要分析zookeeper客户端的工作流程。

1 zookeeper构造函数

主要分两类,一是不带sessionid的,这是客户端第一次连接server时采用

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
        watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }

另一类是带有sessionid的,这主要是当发生connect掉线时,客户端发起连接用的,这些连接对用户是透明的,基本过程是相似的。

2 Zookeeper主类包含的主要成员

  主要包含一个ClientCnxn用来管理与服务器的网络通信,一个watchManager用来管理watcher;

  此外定义了内部类 ZKWatchManager 和 WatchRegistration,后者用来注册watcher到相应的watchset。

  以及供用户使用的API create exists setData getData 和getChildren以及相应的异步方法等。

  zookeeper的启动是cnxn.start()。

3 ClientCnxn类

此类的启动代码如下:

1  public void start() {
2         sendThread.start();
3         eventThread.start();
4     }

 

可以看到,这个类开启了两个线程,一个是SendThread、一个是EventThread。

其中SendThread负责发送 数据请求/ping等信息给服务端以及负责读取服务端发送回来的信息。

EventThread负责出来服务端发来的事件信息。

详细得说,SendThrad的运行代码是:

1 while(state.isAlive()){
2   如果没有连接:发起连接 -- 此处会调用 clientCnxnSocket的connect方法
3 如果是连接状态,处理sasl事情
4 使用to表示客户端距离timeout还剩多少时间,准备发起ping连接
5 调用clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
6 }

注意到,最后他会调用doTransport方法 。其中 pendingQueue是一个用来存放已经发送、等待回应的Packet队列,outgoingQueue是一个存放等待发送的Packet的队列。

connect方法会做些什么呢?

 a 首先会创建一个sock

 b  将sock注册到selector,如果立刻连接上了,调用 sendThread.primeConnection()

primeConnection 主要是用来注册zookeeper之前所有的watcher,以及认证信息。

接下来看下doTransport方法会干些什么:

synchronized (this) {
selected = selector.selectedKeys();
}

它会选择已经就绪的事情,如果是connect成功时间,调用primeConnection;
如果是读写事件,调用doIO(pendingQueue, outgoingQueue, cnxn)

此外,如果发现outgoingQueue里 findSendablePacket,还需要 enableWrite

接下来,我们看下doIo都会做些什么:

如果是读取事件,那么一次会从incomingBuffer里读取rc、长度或者 信息 或者是因为没有初始化。

  a 如果读取的是长度信息,那么 incomingBuffer = ByteBuffer.allocate(len);

  b 如果不是长度信息,而且没有初始化,那么进行初始化,初始化会干这么几件事情:

      一 读取connect结果,这会做下面几件事:

    a1 从 incoming读取连接信息,初始化一些变量

1 readTimeout = negotiatedSessionTimeout * 2 / 3;
2 connectTimeout = negotiatedSessionTimeout / hostProvider.size(); 

    b1 发送一个连接成功的事件 

eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,eventState, null));

      二 enableRead ,如果findSendablePacket 则 enableWrite

  三 lenBuffer.clear(), incomingBuff = lenBuffer 

      四 设置初始化变量为true

 c 使用sendThread读取信息

1    sendThread.readResponse(incomingBuffer);
2    lenBuffer.clear();
3    incomingBuffer = lenBuffer;
4    updateLastHeard();

接下来,我们观察  readResponse会做些什么:

首先读取header,如果其xid == -2,表明是一个ping的response,return

如果xid是 -4 ,表明是一个AuthPacket的response return

如果xid是 -1,表明是一个notification,此时要继续读取并构造一个enent,通过EventThread.queueEvent发送,return

其它情况下:

从pendingQueue拿出一个Packet,校验后更新packet信息:

 1           try {
 2                 if (packet.requestHeader.getXid() != replyHdr.getXid()) {
 3                     packet.replyHeader.setErr(
 4                             KeeperException.Code.CONNECTIONLOSS.intValue());
 5                     throw new IOException("Xid out of order. Got Xid "
 6                             + replyHdr.getXid() + " with err " +
 7                             + replyHdr.getErr() +
 8                             " expected Xid "
 9                             + packet.requestHeader.getXid()
10                             + " for a packet with details: "
11                             + packet );
12                 }
13 
14                 packet.replyHeader.setXid(replyHdr.getXid());
15                 packet.replyHeader.setErr(replyHdr.getErr());
16                 packet.replyHeader.setZxid(replyHdr.getZxid());
17                 if (replyHdr.getZxid() > 0) {
18                     lastZxid = replyHdr.getZxid();
19                 }
20                 if (packet.response != null && replyHdr.getErr() == 0) {
21                     packet.response.deserialize(bbia, "response");
22                 }
23 
24                 if (LOG.isDebugEnabled()) {
25                     LOG.debug("Reading reply sessionid:0x"
26                             + Long.toHexString(sessionId) + ", packet:: " + packet);
27                 }
28             } finally {
29                 finishPacket(packet);
30             }

 最后他会调用 finishPacket方法,此方法主要会做下面的事情:

 1     private void finishPacket(Packet p) {
 2         if (p.watchRegistration != null) {
 3             p.watchRegistration.register(p.replyHeader.getErr());
 4         }
 5 
 6         if (p.cb == null) {
 7             synchronized (p) {
 8                 p.finished = true;
 9                 p.notifyAll(); -- 主要是用来通知同步调用的API方法
10             }
11         } else { -- 处理异步调用
12             p.finished = true;
13             eventThread.queuePacket(p);
14         }
15     }

至此,读事件就结束了。 

如果是一个写事件,接下来会做下面的事情:

 a 锁定outgoingQueue,发现一个可以发送的packet,调用createBB创建packet的bb,使用sock.write写入,并且将此packet从outgoing对列移动到pending队列。

 b 做一些开关控制

 1   if (outgoingQueue.isEmpty()) {
 7                     disableWrite();
 8                 } else if (!initialized && p != null && !p.bb.hasRemaining()) {
18                     disableWrite();
19                 } else {
20                     // Just in case
21                     enableWrite();
22                 }

写事件就此结束。

可以看到sendThread中调用了eventThread的queuePacket和queueEvent方法,下面就分析一下EventThread:

此类中主要的数据结构是

private final LinkedBlockingQueue<Object> waitingEvents =
new LinkedBlockingQueue<Object>();

首先,看下其run方法:

 1         public void run() {
 2            try {
 3               isRunning = true;
 4               while (true) {
 5                  Object event = waitingEvents.take();
 6                  if (event == eventOfDeath) {
 7                     wasKilled = true;
 8                  } else {
 9                     processEvent(event);
10                  }
11                  if (wasKilled)
12                     synchronized (waitingEvents) {
13                        if (waitingEvents.isEmpty()) {
14                           isRunning = false;
15                           break;
16                        }
17                     }
18               }
19            } catch (InterruptedException e) {
20               LOG.error("Event thread exiting due to interruption", e);
21            }
22 
23             LOG.info("EventThread shut down for session: 0x{}",
24                      Long.toHexString(getSessionId()));
25         }

 

其中最主要的方法是processEvent方法,下面看下它都做了些什么:

a 如果 event instanceof WatcherSetEventPair,那么分别调用这些watcher的process方法

b 将event转型为Packet,判断p.cb!=null,然后分类: 

if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse)
StatCallback cb = (StatCallback) p.cb;
cb.processResult;

p.response instanceof GetDataResponse => DataCallback cb = (DataCallback) p.cb;
p.response instanceof CreateResponse => StringCallback cb = (StringCallback) p.cb

然后我们去分析在sendThread中调用的两个方法:

 1         public void queueEvent(WatchedEvent event) {
 2             if (event.getType() == EventType.None
 3                     && sessionState == event.getState()) {
 4                 return;
 5             }
 6             sessionState = event.getState();
 7 
 8             // materialize the watchers based on the event
 9             WatcherSetEventPair pair = new WatcherSetEventPair(
10                     watcher.materialize(event.getState(), event.getType(),
11                             event.getPath()),
12                             event);
13             // queue the pair (watch set & event) for later processing
14             waitingEvents.add(pair);
15         }
16 
17        public void queuePacket(Packet packet) {
18           if (wasKilled) {
19              synchronized (waitingEvents) {
20                 if (isRunning) waitingEvents.add(packet);
21                 else processEvent(packet);
22              }
23           } else {
24              waitingEvents.add(packet);
25           }
26        }

 

zookeeper - 客户端源码分析

标签:

原文地址:http://www.cnblogs.com/cycc/p/5793866.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!