码迷,mamicode.com
首页 > 其他好文 > 详细

zookeeper应用 - FIFO 队列 分布式队列

时间:2015-06-07 20:13:55      阅读:125      评论:0      收藏:0      [点我收藏+]

标签:

使用ZooKeeper实现的FIFO队列,这个队列是分布式的。

package fifo;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
 * 使用ZooKeeper实现的FIFO队列
 * @author lisg
 *
 */
public class ZKFIFO {
	private static final String HOSTS = "vm1";
	private ZooKeeper zk = null;
	private static final String PARENT_PATH = "/fifo";
	private static final String SEQ_PREFIX = "seq-";
	
	public ZKFIFO() {
		try {
			final CountDownLatch cdl = new CountDownLatch(1);
			zk = new ZooKeeper(HOSTS, 5000, new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					if(KeeperState.SyncConnected.equals(event.getState())) {
						cdl.countDown();
					}
				}
			});
			
			cdl.await();
			
			//创建父节点
			Stat stat = zk.exists(PARENT_PATH, false);
			if(stat == null) {
				zk.create(PARENT_PATH, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}
		} catch (Exception e) {
			System.out.println("zookeeper集群连接失败!");
			e.printStackTrace();
		}
	}
	
	/**
	 * 在父节点下创建顺序子节点
	 * @param data
	 */
	public void push(String data) {
		if(data == null) {
			data = "";
		}
		
		try {
			zk.create(PARENT_PATH + "/" + SEQ_PREFIX, 
					data.getBytes("UTF-8"), 
					ZooDefs.Ids.OPEN_ACL_UNSAFE, 
					CreateMode.PERSISTENT_SEQUENTIAL);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 删除字第一个子节点,并返回它的值
	 * @return
	 */
	public String pop() {
		try {
			final List<String> children = zk.getChildren(PARENT_PATH, false);
			if(children.isEmpty()) {
				return null;
			}
			
			Collections.sort(children);
			
			String firstChildPath = PARENT_PATH + "/" + children.get(0);
			
			final byte[] data = zk.getData(firstChildPath, false, null);
			zk.delete(firstChildPath, -1);
			
			return new String(data, "UTF-8");
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		return null;
	}
	
	public void close() {
		try {
			this.zk.close();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		final ZKFIFO fifo = new ZKFIFO();
		
		/*
		for(int i=0; i<10; i++) {
			new Thread() {
				public void run() {
					fifo.push("data-" + UUID.randomUUID().toString().replace("-", ""));
				};
			}.start();
		}
		*/
		
		System.out.println(fifo.pop());
		
		fifo.close();
	}
}

  

需要改进的地方:
1)zookeeper异常处理、重试

zookeeper应用 - FIFO 队列 分布式队列

标签:

原文地址:http://www.cnblogs.com/lishouguang/p/4558977.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!