标签:
1.API
2.API 示例
ZooKeeper中的组成员关系
理解ZooKeeper的一种方法就是将其看作一个具有高可用性的文件系统。但这个文件系统中没有文件和目录,而是统一使用“节点”(node)的概念,称为znode。znode既可以作为保存数据的容器(如同文件),也可以作为保存其他znode的容器(如同目录)。所有的znode构成一个层次化的命名空间。一种自然的建立组成员列表的方式就是利用这种层次结构,创建一个以组名为节点名的znode作为父节点,然后以组成员名(服务器名)为节点名来创建作为子节点的znode。如下图给出了一组具有层次结构的znode。
创建组示例代码:
package org.zk; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class CreateGroup implements Watcher{ private static final int SESSION_TIMEOUT=5000; private ZooKeeper zk; private CountDownLatch connectedSignal=new CountDownLatch(1); @Override public void process(WatchedEvent event) { if(event.getState()==KeeperState.SyncConnected){ connectedSignal.countDown(); } } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { CreateGroup createGroup = new CreateGroup(); createGroup.connect(args[0]); createGroup.create(args[1]); createGroup.close(); } private void close() throws InterruptedException { zk.close(); } private void create(String groupName) throws KeeperException, InterruptedException { String path="/"+groupName; if(zk.exists(path, false)== null){ zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } System.out.println("Created:"+path); } private void connect(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); connectedSignal.await(); } }
在上面代码中,main()方法执行时,创建了一个CreateGroup的实例并且调用了这个实例的connect()方法。connect方法实例化了一个新的ZooKeeper类的对象,这个类是客户端API中的主要类,并且负责维护客户端和ZooKeeper服务之间的连接。ZooKeeper类的构造函数有三个参数:
第一个是:ZooKeeper服务的主机地址,可指定端口,默认端口是2181。
Watcher对象接收来自于ZooKeeper的回调,以获得各种事件的通知。在这个例子中,CreateGroup是一个Watcher对象,因此我们将它传递给ZooKeeper的构造函数。
当一个ZooKeeper的实例被创建时,会启动一个线程连接到ZooKeeper服务。由于对构造函数的调用是立即返回的,因此在使用新建的ZooKeeper对象之前一定要等待其与ZooKeeper服务之间的连接建立成功。我们使用Java的CountDownLatch类来阻止使用新建的ZooKeeper对象,直到这个ZooKeeper对象已经准备就绪。这就是Watcher类的
用途,在它的接口中只有一个方法:
public void process(WatcherEvent event);
客户端已经与ZooKeeper建立连接后,Watcher的process()方法会被调用,参数是一个表示该连接的事件。在接收到一个连接事件(由 Watcher.Event.KeeperState的枚举型值SyncConnected来表示)时,我们通过调用CountDownLatch的countDown()方法来递减它的计数器。锁存器(latch)被创建时带有一个值为1的计数器,用于表示在它释放所有等待线程之前需要发生的事件数。在调用一欢countDown()方法之后,计数器的值变为0,则await()方法返回。
现在connect()方法已经返回,下一个执行的是CreateGroup的create()方法。在这个方法中,我们使用ZooKeeper实例中的create()方法来创建一个新的ZooKeeper的znode。所需的参数包括:
路径:用字符串表示。
znode的内容:字节数组,本例中使用空值。
访问控制列表:简称ACL,本例中使用了完全开放的ACL,允许任何客户端对znode进行读写。
创建znode的类型:有两种类型的znode:短暂的和持久的。
创建znode的客户端断开连接时,无论客户端是明确断开还是因为任何原因而终止,短暂znode都会被ZooKeeper服务删除。与之相反,当客户端断开连接时,持久znode不会被删除。我们希望代表一个组的znode存活的时间应当比创建程序的生命周期要长,因此在本例中我们创建了一个持久的znode。
create()方法的返回值是ZooKeeper所创建的路径,我们用这个返回值来打印一条表示路径成功创建的消息。当我们查看“顺序znode”(sequential znode)时.会发现create()方法返回的路径与传递给该方法的路径不同。
加入组
package org.zk; import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; public class JoinGroup extends ConnectionWatcher{ public void join(String groupName,String memberName) throws KeeperException, InterruptedException{ String path="/"+groupName+"/"+memberName; String createdPath=zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Created:"+createdPath); } public static void main(String[] args) throws InterruptedException, IOException, KeeperException { JoinGroup joinGroup = new JoinGroup(); joinGroup.connect(args[0]); joinGroup.join(args[1], args[2]); //stay alive until process is killed or thread is interrupted Thread.sleep(Long.MAX_VALUE); } }
列出组成员
package org.zk; import java.io.IOException; import java.util.List; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; public class ListGroup extends ConnectionWatcher { public void list(String groupNmae) throws KeeperException, InterruptedException{ String path ="/"+groupNmae; try { List<String> children = zk.getChildren(path, false); if(children.isEmpty()){ System.out.printf("No memebers in group %s\n",groupNmae); System.exit(1); } for(String child:children){ System.out.println(child); } } catch (KeeperException.NoNodeException e) { System.out.printf("Group %s does not exist \n", groupNmae); System.exit(1); } } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ListGroup listGroup = new ListGroup(); listGroup.connect(args[0]); listGroup.list(args[1]); listGroup.close(); } }
下面来看如何删除一个组。ZooKeeper类提供了一个delete()方法,该方法有两个参数:
1. 路径
2. 版本号
如果所提供的版本号与znode的版本号一致,ZooKeeper会删除这个znode。这是一种乐观的加锁机制,使客户端能够检测出对znode的修改冲突。通过将版本号设置为-1,可以绕过这个版本检测机制,不管znode的版本号是什么而直接将其删除。ZooKeeper不支持递归的删除操作,因此在删除父节点之前必须先删除子节点。
在代码3.5中,DeleteGroup类用于删除一个组及其所有成员。
代码3.5用于删除一个组及其所有成员的程序
package org.zk; import java.io.IOException; import java.util.List; import org.apache.zookeeper.KeeperException; public class DeleteGroup extends ConnectionWatcher{ public void delete(String groupName) throws InterruptedException, KeeperException{ String path="/"+groupName; List<String> children; try { children = zk.getChildren(path, false); for(String child:children){ zk.delete(path+"/"+child, -1); } zk.delete(path, -1);//删除父节点本身 } catch (KeeperException.NoNodeException e) { System.out.printf("Group %s does not exist\n", groupName); System.exit(1); } } public static void main(String[] args) throws InterruptedException, IOException, KeeperException { DeleteGroup deleteGroup = new DeleteGroup(); deleteGroup.connect(args[0]); deleteGroup.delete(args[1]); deleteGroup.close(); } }
标签:
原文地址:http://www.cnblogs.com/chenyansong/p/5529670.html