标签:run alt set images ica publish executors task 调度
1.工作队列(Work Queue)又叫任务队列(Task Queue)指将任务分发个多个消费者。
2.实际操作:
这里使用一个生产者产生多条数据提供给3个消费者
生产者代码:
public class Producter { |
消费者代码
public class Consumer { //队列名称 private final static String QUEUE_NAME = "Work_Queue"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { //创建连接和通道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); ExecutorService service = Executors.newFixedThreadPool(10); for(int i=0;i<3;i++){ final int cur = i; service.submit(new Runnable() { Channel channel = connection.createChannel(); public void run() { //创建队列消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //指定消费队列 try { channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("线程 "+cur+" 获取到消息 " + message + "开始处理"); Thread.sleep(1000*(cur+5)*2); System.out.println("线程 "+cur+" "+message + "处理完成"); } } catch (Exception e) { e.printStackTrace(); } } }); } service.shutdown(); } } |
运行效果:(消费者循环调度)
3.消息确认:
在处理一个比较耗时的任务的时候,如果消费者在中途崩溃掉,则对应的这条数据就丢失了,为了避免消息丢失的情况,RabbitMQ提供了消息确认
使用两个消费者进行演示,调用方法
public void getConsum() throws IOException, TimeoutException, InterruptedException { |
手动关掉一个消费者
消息被另一个消费者继续进行处理;
4.公平调度:
channel.basicQos(1);//保证一次只分发一个
5.持久化: 保证当RabbitMQ服务器崩溃关机也不会造成消息丢失
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
第二个参数改为true
标签:run alt set images ica publish executors task 调度
原文地址:http://www.cnblogs.com/starktan/p/RabbitMQ.html