标签:
zookeeper 实现队列_Queue
根据zookeeper的官方文档改写的demo,加了详细的注释。
Queue.java
package com.usfot;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
/**
* Created by liyanxin on 2015/3/17.
*/
public class Queue implements Watcher {
private static final String addr = "127.0.0.1:2181";
private String root;
private ZooKeeper zk = null;
private Integer mutex;
/**
* Constructor of producer-consumer queue
*
* @param root
*/
public Queue(String root) {
this.root = root;
try {
//连接zk服务器
zk = new ZooKeeper(addr, 10 * 10000, this);
} catch (IOException e) {
e.printStackTrace();
}
mutex = new Integer(-1);
// Create ZK node name
if (zk != null) {
try {
//建立根目录节点
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}
/**
* 当znode上事件触发,唤醒相应的等待线程
*
* @param event
*/
public void process(WatchedEvent event) {
synchronized (mutex) {
//System.out.println("Process: " + event.getType());
mutex.notify();
}
}
/**
* Add element to the queue.
*
* @param i
* @return
*/
boolean produce(int i) throws KeeperException, InterruptedException {
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
// Add child with value i
b.putInt(i);
value = b.array();
zk.create(root + "/element" + i, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("=========Produce znode=========");
return true;
}
/**
* Remove first element from the queue.
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
int consume() throws KeeperException, InterruptedException {
int retvalue = -1;
Stat stat = null;
// Get the first element available
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
Integer min = new Integer(list.get(0).substring(7));
for (String s : list) {
Integer tempValue = new Integer(s.substring(7));
//System.out.println("Temporary value: " + tempValue);
if (tempValue < min) min = tempValue;
}
System.out.println("Temporary value: " + root + "/element" + min);
byte[] b = zk.getData(root + "/element" + min,
false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
}
QueueTest.java
package com.usfot;
import org.apache.zookeeper.KeeperException;
/**
* 测试基于zookeeper的生产者和消费者队列
* 测试过程:
* 一个线程充当生产者,一个线程充当消费者
* Created by liyanxin on 2015/3/17.
*/
public class QueueTest {
public static void main(String args[]) {
Producer producer = new Producer(new Queue("/app1"));
//生产者线程启动
producer.start();
// 启动多个线程作为消费者,但在当前的分布式环境中,消费者之间会竞争某些资源,也是说任务不是
// 同步进行的,导致一个消费者想要得到某个节点的数据时,而这个节点却被另一个消费者删除了,
// 导致KeeperException$NoNodeException。线程是不安全的。
// for (int i = 0; i < 100; i++) {
// Consumer consumer = new Consumer(new Queue("/app1"));
// consumer.start();
// }
// 消费者线程
Consumer consumer = new Consumer(new Queue("/app1"));
consumer.start();
}
}
/**
* 生产者线程
*/
class Producer extends Thread {
private Queue queue;
public Producer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
// 该生产者线程创建1000个znode
for (int i = 0; i < 100; i++) {
try {
queue.produce(i);
Thread.sleep(1000);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消费者线程
*/
class Consumer extends Thread {
private Queue queue;
public Consumer(Queue queue) {
this.queue = queue;
}
// 消费者线程
// 不断的从zk服务器中读取znode,进行操作。
@Override
public void run() {
try {
while (true) {
int retvalue = queue.consume();
System.out.println("thread_name=" +
Thread.currentThread().getName() + ":" + retvalue);
Thread.sleep(1000);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果也是我想象中的,如下,
............................... =========Produce znode========= Temporary value: /app1/element97 thread_name=Thread-1:97 =========Produce znode========= Temporary value: /app1/element98 thread_name=Thread-1:98 =========Produce znode========= Temporary value: /app1/element99 thread_name=Thread-1:99 Going to wait 最后消费者线程Going to wait,只有当有新的znode创建后或事件发生后触发才会唤醒消费者线程。
====================================END====================================
标签:
原文地址:http://my.oschina.net/xinxingegeya/blog/388301