What is Zookeeper(官方定义)
Zookeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.All of these kinds of services
are used in some form or another by distributed applications.
Zookeeper is a high-performance coordination service for distributed applications.
official site
http://zookeeper.apache.org/
[Zookeeper结构]
Zookeeper是一个提供分布式任务协同调度的服务,它提供了在分布式开发中所需要的一些常见的服务:命名服务、配置管理、任务同步控制,以及分组服务等。Zookeeper自己本身也是可以分布式部署的。
Zookeeper的系统结构如下
1、Zookeeper的各个服务节点之间相互通信来维持全局的信息,数据放在内存中(内存数据库,各个节点之间的数据相同)。通过事务日志和系统快照来进行持久化存储。
2、客户端和Zookeeper之间维持一个TCP的长连接
3、组成Zookeepr的各个服务节点,会选出一个leader server。其它的服务节点成为followers。 所有的写请求会被转到leader server来处理,然后再通知followers节点更新状态;读请求则可由followers直接处理。由于写操作需要进行全局同步,所以Zookeeper更适合读多写少类型的应用。
Zookeeper对分布式任务的协同调度,是以管理分布式任务数据来实现,它以类Unix文件系统的方式提供数据节点,也就是znode。
理解有误?仅通过数据管理能起到协同调度的功能?或者说ZooKeeper应该不是专门用来做数据存储的,分布式应用程序该如何利用Zookeeper来实现任务的协调调度?
同步分布式数据,保证数据对外读写的一致性,只是Zookeeper能够实现分布式任务协调调度的一方面。另一方面,当Zookeeper的数据节点发生变化时,它能够及时通知这个节点上的Watcher,通知Watcher(s)去做处理。当然,由于ZooKeeper支持分布式部署,所以ZooKeeper自己各个服务节点的分布式管理,也是很重要的一方面。
Zookeeper有一套复杂的机制来保证这些分布式数据对外的读写一致性
znode的层次结构如下所示。
对znode具体数据的理解,可以查看官网上的数据使用示例。
[Zookeeper部署]
Zookeeper可部署为standalone模式,只部署一个实例,可在线下测试环境中使用。在正式环境中,需要按照Replicated模式进行部署,部署在多个机器上。
[使用示例]
这里除了对上面的内容有一个简短的翻译总结之外,还给出了Zookeeper所能使用的典型的应用场景,信息量不够,代码不完整。代码示例,可以主要以Zookeeper官网上的为准。
下面是来自Zookeeper官网上的一个例子的完整的源代码,通过这个例子可以了解ZooKeeper工作的一些基本原理
/**
* A simple class that monitors the data and existence of a ZooKeeper
* node. It uses asynchronous ZooKeeper APIs.
*/
import java.util.Arrays;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
public class DataMonitor implements Watcher, StatCallback {
ZooKeeper zk;
String znode;
Watcher chainedWatcher;
boolean dead;
DataMonitorListener listener;
byte prevData[];
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// Get things started by checking if the node exists. We are going
// to be completely event driven
zk.exists(znode, true, this, null);
}
/**
* Other classes use the DataMonitor by implementing this method
*/
public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void exists(byte data[]);
/**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
}
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don‘t need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It‘s all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let‘s find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}
byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don‘t need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);
prevData = b;
}
}
}
/**
* A simple example program to use DataMonitor to start and
* stop executables based on a znode. The program watches the
* specified znode and saves the data that corresponds to the
* znode in the filesystem. It also starts the specified program
* with the specified arguments when the znode exists and kills
* the program if the znode goes away.
*/
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class Executor
implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
String znode;
DataMonitor dm;
ZooKeeper zk;
String filename;
String exec[];
Process child;
public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}
/**
* @param args
*/
public static void main(String[] args) {
if (args.length < 4) {
System.err
.println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
System.arraycopy(args, 3, exec, 0, exec.length);
try {
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
}
/***************************************************************************
* We do process any events ourselves, we just need to forward them on.
*
* @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
*/
public void process(WatchedEvent event) {
dm.process(event);
}
public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}
public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}
static class StreamWriter extends Thread {
OutputStream os;
InputStream is;
StreamWriter(InputStream is, OutputStream os) {
this.is = is;
this.os = os;
start();
}
public void run() {
byte b[] = new byte[80];
int rc;
try {
while ((rc = is.read(b)) > 0) {
os.write(b, 0, rc);
}
} catch (IOException e) {
}
}
}
public void exists(byte[] data) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.println("Starting child");
child = Runtime.getRuntime().exec(exec);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}