zookeeper自身提供了一个简易的客户端。主要包括一下几个模块:
1.启动模块。
2.核心执行模块。
3.网络通信模块。
启动模块
启动程序,接收和解析命令行。详见zookeeper源码之客户端启动模块。
核心执行模块
客户端操作ZooKeeper服务端的核心类,详见zookeeper源码之客户端核心执行模块。
类图
ZooKeeper
ZooKeeper是客户端操作ZooKeeper服务端的核心类。当用户向ZooKeeperMain执行相关命令时,最终会交给ZooKeeper执行,其会将用户请求封装成对象,然后发送到服务端。内部使用ClientCnxn来提供与服务端的通信。 请求数据会被封装成RequestHeader、Request对象,相应的返回结果会存储在Response,ReplyHeader对象。
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); if (acl != null && acl.size() == 0) { throw new KeeperException.InvalidACLException(); } request.setAcl(acl); ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { return response.getPath(); } else { return response.getPath().substring(cnxn.chrootPath.length()); } }
ClientCnxn
为客户端发送请求到服务端,管理底层IO连接。 将用户调用的请求对象(RequestHeader、Request)封装成Packet对象,存入发送队列。内部有一个线程会不断读取发送队列中的Packet对象,通过NIO将Packet对象发送到服务端,然后将Packet对象放入pending队列,该线程会不断读取服务端的返回信息,并且将结果设置到Packet对象的Response,ReplyHeader对象中。
//等待发送的数据包队列
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
//发送后等待结果的数据包队列
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
class SendThread extends Thread { boolean doIO() throws InterruptedException, IOException { ...if (!outgoingQueue.isEmpty()) { ByteBuffer pbb = outgoingQueue.getFirst().bb; sock.write(pbb); if (!pbb.hasRemaining()) { sentCount++; Packet p = outgoingQueue.removeFirst(); if (p.header != null && p.header.getType() != OpCode.ping && p.header.getType() != OpCode.auth) { pendingQueue.add(p); } } } } ... } ... @Override public void run() { ...while (zooKeeper.state.isAlive()) { ...if (doIO()) { lastHeard = now; } ... }
...
}