标签:
public class DistributedQueueExample
{
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception
{
CuratorFramework clientA = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
clientA.start();
CuratorFramework clientB = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
clientB.start();
DistributedQueue<String> queueA = null;
QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
queueA = builderA.buildQueue();
queueA.start();
DistributedQueue<String> queueB = null;
QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
queueB = builderB.buildQueue();
queueB.start();
for (int i = 0; i < 100; i++)
{
queueA.put(" test-A-" + i);
Thread.sleep(10);
queueB.put(" test-B-" + i);
}
Thread.sleep(1000 * 10);// 等待消息消费完成
queueB.close();
queueA.close();
clientB.close();
clientA.close();
System.out.println("OK!");
}
/** 队列消息序列化实现类 */
private static QueueSerializer<String> createQueueSerializer()
{
return new QueueSerializer<String>()
{
@Override
public byte[] serialize(String item)
{
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes)
{
return new String(bytes);
}
};
}
/** 定义队列消费者 */
private static QueueConsumer<String> createQueueConsumer(final String name)
{
return new QueueConsumer<String>()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
System.out.println("连接状态改变: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception
{
System.out.println("消费消息(" + name + "): " + message);
}
};
}
}
消费消息(A): test-A-0
消费消息(A): test-B-0
......
消费消息(B): test-A-51
消费消息(B): test-B-51
消费消息(B): test-A-52
消费消息(B): test-B-52
消费消息(B): test-A-53
消费消息(B): test-B-54
消费消息(B): test-A-55
......
消费消息(A): test-A-99
消费消息(A): test-B-99
OK!
public class DistributedIdQueueExample
{
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
DistributedIdQueue<String> queue = null;
QueueConsumer<String> consumer = createQueueConsumer("A");
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildIdQueue();
queue.start();
for (int i = 0; i < 10; i++)
{
queue.put(" test-" + i, "Id" + i);
Thread.sleep((long) (50 * Math.random()));
queue.remove("Id" + i);
}
Thread.sleep(1000 * 3);
queue.close();
client.close();
System.out.println("OK!");
}
......
}
消费消息(A): test-2
消费消息(A): test-3
消费消息(A): test-4
消费消息(A): test-7
OK!
public class DistributedPriorityQueueExample
{
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
DistributedPriorityQueue<String> queue = null;
QueueConsumer<String> consumer = createQueueConsumer("A");
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildPriorityQueue(0);
queue.start();
for (int i = 0; i < 5; i++)
{
int priority = (int) (Math.random() * 100);
System.out.println("test-" + i + " 优先级:" + priority);
queue.put("test-" + i, priority);
Thread.sleep((long) (50 * Math.random()));
}
Thread.sleep(1000 * 2);
queue.close();
client.close();
}
......
}
test-0 优先级:34
test-1 优先级:51
test-2 优先级:63
test-3 优先级:45
test-4 优先级:36
消费消息(A): test-0
消费消息(A): test-4
消费消息(A): test-3
消费消息(A): test-1
消费消息(A): test-2
OK!
public class DistributedDelayQueueExample
{
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
DistributedDelayQueue<String> queue = null;
QueueConsumer<String> consumer = createQueueConsumer("A");
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildDelayQueue();
queue.start();
for (int i = 0; i < 10; i++)
{
queue.put("test-" + i, System.currentTimeMillis() + 3000);
}
System.out.println("put 完成!");
Thread.sleep(1000 * 5);
queue.close();
client.close();
System.out.println("OK!");
}
......
}
put 完成!
消费消息(A): test-0
消费消息(A): test-3
消费消息(A): test-1
消费消息(A): test-2
消费消息(A): test-6
消费消息(A): test-4
消费消息(A): test-5
消费消息(A): test-7
消费消息(A): test-8
消费消息(A): test-9
OK!
// 创建
public SimpleDistributedQueue(CuratorFramework client, String path)
// 增加元素
public boolean offer(byte[] data) throws Exception
// 删除元素
public byte[] take() throws Exception
// 另外还提供了其它方法
public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception
标签:
原文地址:http://www.cnblogs.com/LiZhiW/p/4951529.html