码迷,mamicode.com
首页 > 其他好文 > 详细

zookeeper 实现屏障_Barrier

时间:2015-03-17 20:20:51      阅读:133      评论:0      收藏:0      [点我收藏+]

标签:

zookeeper 实现屏障_Barrier


根据官网的demo,自己的理解,加了相应的注释,这里把代码贴出来,如下,

Barrier

package com.usfot;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.List;

/**
 * 继承watcher,实现分布式环境中不同任务之间的同步处理(利用了Watcher机制的反向推送)。
 * 针对事件的触发使线程做出相应的处理,从而避免无谓的while(true),导致cpu空转。
 */
public class Barrier implements Watcher {

    private static final String addr = "127.0.0.1:2181";
    private ZooKeeper zk = null;
    private Integer mutex;
    private int size = 0;
    private String root;

    public Barrier(String root, int size) {
        this.root = root;
        this.size = size;

        try {
            zk = new ZooKeeper(addr, 10 * 1000, this);
            mutex = new Integer(-1);
            Stat s = zk.exists(root, false);
            if (s == null) {
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 当触发事件后,唤醒在mutex上等待的线程
     * 只要是zk服务器上节点的数据发生改变(不管哪个zk client改变了数据),
     * 这里都会接收到相应的事件,从而唤醒相应的线程,做出相应的处理
     *
     * @param event
     */
    public synchronized void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

    /**
     * 当新建znode时,首先持有mutex监视器才能进入同步代码块。
     * 当znode发生事件后,会触发process,从而唤醒在mutex上等待的线程。
     * 通过while循环判断创建的节点个数,当节点个数大于设定的值时,这个enter方法才执行完成。
     *
     * @param name
     * @return
     * @throws Exception
     */
    public boolean enter(String name) throws Exception {
        zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() < size) {
                    mutex.wait();
                } else {
                    return true;
                }
            }
        }
    }

    /**
     * 同理。对于leave方法,当delete znode时,触发事件,从而唤醒mutex上等待的线程,通过while循环
     * 判断节点的个数,当节点全部删除后,leave方法结束。
     * 从而使整个添加删除znode的线程结束
     *
     * @param name
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public boolean leave(String name) throws KeeperException, InterruptedException {
        zk.delete(root + "/" + name, 0);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() > 0) {
                    mutex.wait();
                } else {
                    return true;
                }
            }
        }
    }
}

BarrierTest

package com.usfot;

import java.util.Random;

public class BarrierTest {

    /**
     * 启动三个线程,也就对应着三个zk客户端
     *
     * @param args
     * @throws Exception
     */
    public static void main(String args[]) throws Exception {
        for (int i = 0; i < 3; i++) {
            Process p = new Process("Thread-" + i, new Barrier("/test_node", 3));
            p.start();
        }
    }
}

class Process extends Thread {

    private String name;
    private Barrier barrier;

    public Process(String name, Barrier barrier) {
        this.name = name;
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            barrier.enter(name);
            System.out.println(name + " enter");
            Thread.sleep(1000 + new Random().nextInt(2000));
            barrier.leave(name);
            System.out.println(name + " leave");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

执行这段程序,如下,

Thread-1 enter
Thread-2 enter
Thread-0 enter
Thread-0 leave
Thread-1 leave
Thread-2 leave

Process finished with exit code 0

打开zk的client,如下,

[zk: localhost:2181(CONNECTED) 8] ls /
[testRootPath, test_node, mynode, zookeeper, zk_test0000000005, zk_test]
[zk: localhost:2181(CONNECTED) 9] get /test_node

cZxid = 0x800000051
ctime = Tue Mar 17 19:08:49 CST 2015
mZxid = 0x800000051
mtime = Tue Mar 17 19:08:49 CST 2015
pZxid = 0x800000062
cversion = 12
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: localhost:2181(CONNECTED) 10] ls /test_node
[]
[zk: localhost:2181(CONNECTED) 11]


===============================END===============================


zookeeper 实现屏障_Barrier

标签:

原文地址:http://my.oschina.net/xinxingegeya/blog/388105

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!