标签:
zookeeper 实现屏障_Barrier
根据官网的demo,自己的理解,加了相应的注释,这里把代码贴出来,如下,
Barrier
package com.usfot;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.List;
/**
* 继承watcher,实现分布式环境中不同任务之间的同步处理(利用了Watcher机制的反向推送)。
* 针对事件的触发使线程做出相应的处理,从而避免无谓的while(true),导致cpu空转。
*/
public class Barrier implements Watcher {
private static final String addr = "127.0.0.1:2181";
private ZooKeeper zk = null;
private Integer mutex;
private int size = 0;
private String root;
public Barrier(String root, int size) {
this.root = root;
this.size = size;
try {
zk = new ZooKeeper(addr, 10 * 1000, this);
mutex = new Integer(-1);
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 当触发事件后,唤醒在mutex上等待的线程
* 只要是zk服务器上节点的数据发生改变(不管哪个zk client改变了数据),
* 这里都会接收到相应的事件,从而唤醒相应的线程,做出相应的处理
*
* @param event
*/
public synchronized void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
/**
* 当新建znode时,首先持有mutex监视器才能进入同步代码块。
* 当znode发生事件后,会触发process,从而唤醒在mutex上等待的线程。
* 通过while循环判断创建的节点个数,当节点个数大于设定的值时,这个enter方法才执行完成。
*
* @param name
* @return
* @throws Exception
*/
public boolean enter(String name) throws Exception {
zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() < size) {
mutex.wait();
} else {
return true;
}
}
}
}
/**
* 同理。对于leave方法,当delete znode时,触发事件,从而唤醒mutex上等待的线程,通过while循环
* 判断节点的个数,当节点全部删除后,leave方法结束。
* 从而使整个添加删除znode的线程结束
*
* @param name
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public boolean leave(String name) throws KeeperException, InterruptedException {
zk.delete(root + "/" + name, 0);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() > 0) {
mutex.wait();
} else {
return true;
}
}
}
}
}
BarrierTest
package com.usfot;
import java.util.Random;
public class BarrierTest {
/**
* 启动三个线程,也就对应着三个zk客户端
*
* @param args
* @throws Exception
*/
public static void main(String args[]) throws Exception {
for (int i = 0; i < 3; i++) {
Process p = new Process("Thread-" + i, new Barrier("/test_node", 3));
p.start();
}
}
}
class Process extends Thread {
private String name;
private Barrier barrier;
public Process(String name, Barrier barrier) {
this.name = name;
this.barrier = barrier;
}
@Override
public void run() {
try {
barrier.enter(name);
System.out.println(name + " enter");
Thread.sleep(1000 + new Random().nextInt(2000));
barrier.leave(name);
System.out.println(name + " leave");
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行这段程序,如下,
Thread-1 enter Thread-2 enter Thread-0 enter Thread-0 leave Thread-1 leave Thread-2 leave Process finished with exit code 0
打开zk的client,如下,
[zk: localhost:2181(CONNECTED) 8] ls / [testRootPath, test_node, mynode, zookeeper, zk_test0000000005, zk_test] [zk: localhost:2181(CONNECTED) 9] get /test_node cZxid = 0x800000051 ctime = Tue Mar 17 19:08:49 CST 2015 mZxid = 0x800000051 mtime = Tue Mar 17 19:08:49 CST 2015 pZxid = 0x800000062 cversion = 12 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 0 [zk: localhost:2181(CONNECTED) 10] ls /test_node [] [zk: localhost:2181(CONNECTED) 11]
===============================END===============================
标签:
原文地址:http://my.oschina.net/xinxingegeya/blog/388105