标签:
zookeeper 实现屏障_Barrier
根据官网的demo,自己的理解,加了相应的注释,这里把代码贴出来,如下,
Barrier
package com.usfot; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.List; /** * 继承watcher,实现分布式环境中不同任务之间的同步处理(利用了Watcher机制的反向推送)。 * 针对事件的触发使线程做出相应的处理,从而避免无谓的while(true),导致cpu空转。 */ public class Barrier implements Watcher { private static final String addr = "127.0.0.1:2181"; private ZooKeeper zk = null; private Integer mutex; private int size = 0; private String root; public Barrier(String root, int size) { this.root = root; this.size = size; try { zk = new ZooKeeper(addr, 10 * 1000, this); mutex = new Integer(-1); Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { e.printStackTrace(); } } /** * 当触发事件后,唤醒在mutex上等待的线程 * 只要是zk服务器上节点的数据发生改变(不管哪个zk client改变了数据), * 这里都会接收到相应的事件,从而唤醒相应的线程,做出相应的处理 * * @param event */ public synchronized void process(WatchedEvent event) { synchronized (mutex) { mutex.notify(); } } /** * 当新建znode时,首先持有mutex监视器才能进入同步代码块。 * 当znode发生事件后,会触发process,从而唤醒在mutex上等待的线程。 * 通过while循环判断创建的节点个数,当节点个数大于设定的值时,这个enter方法才执行完成。 * * @param name * @return * @throws Exception */ public boolean enter(String name) throws Exception { zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); } else { return true; } } } } /** * 同理。对于leave方法,当delete znode时,触发事件,从而唤醒mutex上等待的线程,通过while循环 * 判断节点的个数,当节点全部删除后,leave方法结束。 * 从而使整个添加删除znode的线程结束 * * @param name * @return * @throws KeeperException * @throws InterruptedException */ public boolean leave(String name) throws KeeperException, InterruptedException { zk.delete(root + "/" + name, 0); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } }
BarrierTest
package com.usfot; import java.util.Random; public class BarrierTest { /** * 启动三个线程,也就对应着三个zk客户端 * * @param args * @throws Exception */ public static void main(String args[]) throws Exception { for (int i = 0; i < 3; i++) { Process p = new Process("Thread-" + i, new Barrier("/test_node", 3)); p.start(); } } } class Process extends Thread { private String name; private Barrier barrier; public Process(String name, Barrier barrier) { this.name = name; this.barrier = barrier; } @Override public void run() { try { barrier.enter(name); System.out.println(name + " enter"); Thread.sleep(1000 + new Random().nextInt(2000)); barrier.leave(name); System.out.println(name + " leave"); } catch (Exception e) { e.printStackTrace(); } } }
执行这段程序,如下,
Thread-1 enter Thread-2 enter Thread-0 enter Thread-0 leave Thread-1 leave Thread-2 leave Process finished with exit code 0
打开zk的client,如下,
[zk: localhost:2181(CONNECTED) 8] ls / [testRootPath, test_node, mynode, zookeeper, zk_test0000000005, zk_test] [zk: localhost:2181(CONNECTED) 9] get /test_node cZxid = 0x800000051 ctime = Tue Mar 17 19:08:49 CST 2015 mZxid = 0x800000051 mtime = Tue Mar 17 19:08:49 CST 2015 pZxid = 0x800000062 cversion = 12 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 0 [zk: localhost:2181(CONNECTED) 10] ls /test_node [] [zk: localhost:2181(CONNECTED) 11]
===============================END===============================
标签:
原文地址:http://my.oschina.net/xinxingegeya/blog/388105