码迷,mamicode.com
首页 > 其他好文 > 详细

zookeeper 实现队列_Queue

时间:2015-03-18 16:09:35      阅读:117      评论:0      收藏:0      [点我收藏+]

标签:

zookeeper 实现队列_Queue


根据zookeeper的官方文档改写的demo,加了详细的注释。

Queue.java

package com.usfot;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

/**
 * Created by liyanxin on 2015/3/17.
 */
public class Queue implements Watcher {

    private static final String addr = "127.0.0.1:2181";
    private String root;
    private ZooKeeper zk = null;
    private Integer mutex;

    /**
     * Constructor of producer-consumer queue
     *
     * @param root
     */
    public Queue(String root) {
        this.root = root;
        try {
            //连接zk服务器
            zk = new ZooKeeper(addr, 10 * 10000, this);
        } catch (IOException e) {
            e.printStackTrace();
        }
        mutex = new Integer(-1);
        // Create ZK node name
        if (zk != null) {
            try {
                //建立根目录节点
                Stat s = zk.exists(root, false);
                if (s == null) {
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                System.out
                        .println("Keeper exception when instantiating queue: "
                                + e.toString());
            } catch (InterruptedException e) {
                System.out.println("Interrupted exception");
            }
        }
    }

    /**
     * 当znode上事件触发,唤醒相应的等待线程
     *
     * @param event
     */
    public void process(WatchedEvent event) {
        synchronized (mutex) {
            //System.out.println("Process: " + event.getType());
            mutex.notify();
        }
    }

    /**
     * Add element to the queue.
     *
     * @param i
     * @return
     */
    boolean produce(int i) throws KeeperException, InterruptedException {
        ByteBuffer b = ByteBuffer.allocate(4);
        byte[] value;

        // Add child with value i
        b.putInt(i);
        value = b.array();
        zk.create(root + "/element" + i, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);

        System.out.println("=========Produce znode=========");
        return true;
    }


    /**
     * Remove first element from the queue.
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    int consume() throws KeeperException, InterruptedException {
        int retvalue = -1;
        Stat stat = null;

        // Get the first element available
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() == 0) {
                    System.out.println("Going to wait");
                    mutex.wait();
                } else {
                    Integer min = new Integer(list.get(0).substring(7));
                    for (String s : list) {
                        Integer tempValue = new Integer(s.substring(7));
                        //System.out.println("Temporary value: " + tempValue);
                        if (tempValue < min) min = tempValue;
                    }
                    System.out.println("Temporary value: " + root + "/element" + min);
                    byte[] b = zk.getData(root + "/element" + min,
                            false, stat);
                    zk.delete(root + "/element" + min, 0);
                    ByteBuffer buffer = ByteBuffer.wrap(b);
                    retvalue = buffer.getInt();

                    return retvalue;
                }
            }
        }
    }
}


QueueTest.java

package com.usfot;

import org.apache.zookeeper.KeeperException;

/**
 * 测试基于zookeeper的生产者和消费者队列
 * 测试过程:
 * 一个线程充当生产者,一个线程充当消费者
 * Created by liyanxin on 2015/3/17.
 */
public class QueueTest {

    public static void main(String args[]) {
        Producer producer = new Producer(new Queue("/app1"));
        //生产者线程启动
        producer.start();

        // 启动多个线程作为消费者,但在当前的分布式环境中,消费者之间会竞争某些资源,也是说任务不是
        // 同步进行的,导致一个消费者想要得到某个节点的数据时,而这个节点却被另一个消费者删除了,
        // 导致KeeperException$NoNodeException。线程是不安全的。
//        for (int i = 0; i < 100; i++) {
//            Consumer consumer = new Consumer(new Queue("/app1"));
//            consumer.start();
//        }

        // 消费者线程
        Consumer consumer = new Consumer(new Queue("/app1"));
        consumer.start();
    }
}

/**
 * 生产者线程
 */
class Producer extends Thread {

    private Queue queue;

    public Producer(Queue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        // 该生产者线程创建1000个znode
        for (int i = 0; i < 100; i++) {
            try {
                queue.produce(i);
                Thread.sleep(1000);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 消费者线程
 */
class Consumer extends Thread {
    private Queue queue;

    public Consumer(Queue queue) {
        this.queue = queue;
    }

    // 消费者线程
    // 不断的从zk服务器中读取znode,进行操作。
    @Override
    public void run() {
        try {
            while (true) {
                int retvalue = queue.consume();
                System.out.println("thread_name=" +
                        Thread.currentThread().getName() + ":" + retvalue);
                Thread.sleep(1000);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果也是我想象中的,如下,

...............................
=========Produce znode=========
Temporary value: /app1/element97
thread_name=Thread-1:97
=========Produce znode=========
Temporary value: /app1/element98
thread_name=Thread-1:98
=========Produce znode=========
Temporary value: /app1/element99
thread_name=Thread-1:99
Going to wait
最后消费者线程Going to wait,只有当有新的znode创建后或事件发生后触发才会唤醒消费者线程。


====================================END====================================


zookeeper 实现队列_Queue

标签:

原文地址:http://my.oschina.net/xinxingegeya/blog/388301

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!