当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要做其它的事情,且会消耗很长的时间,在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者宕掉了,这时候就要使用消息接收确认机制,可以让其它的消费者再次执行刚才宕掉的消费者没有完成的事情。另外,在默认情况下,我们创建的消息队列以及存放在队列里面的消息,都是非持久化的,也就是说当RabbitMQ宕掉了或者是重启了,创建的消息队列以及消息都不会保存,为了解决这种情况,保证消息传输的可靠性,我们可以使用RabbitMQ提供的消息队列的持久化机制。
1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.MessageProperties;
5 public class ClientSend1 {
6 public static final String queue_name="my_queue";
7 public static final boolean durable=true; //消息队列持久化
8 public static void main(String[] args)
9 throws java.io.IOException{
10 ConnectionFactory factory=new ConnectionFactory(); //创建连接工厂
11 factory.setHost("localhost");
12 factory.setVirtualHost("my_mq");
13 factory.setUsername("zhxia");
14 factory.setPassword("123456");
15 Connection connection=factory.newConnection(); //创建连接
16 Channel channel=connection.createChannel();//创建信道
17 channel.queueDeclare(queue_name, durable, false, false, null); //声明消息队列,且为可持久化的
18 String message="Hello world"+Math.random();
19 //将队列设置为持久化之后,还需要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
20 channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
21 System.out.println("Send message:"+message);
22 channel.close();
23 connection.close();
24 }
25
26 }
1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.QueueingConsumer;
5 public class ClientReceive1 {
6 public static final String queue_name="my_queue";
7 public static final boolean autoAck=false;
8 public static final boolean durable=true;
9 public static void main(String[] args)
10 throws java.io.IOException,java.lang.InterruptedException{
11 ConnectionFactory factory=new ConnectionFactory();
12 factory.setHost("localhost");
13 factory.setVirtualHost("my_mq");
14 factory.setUsername("zhxia");
15 factory.setPassword("123456");
16 Connection connection=factory.newConnection();
17 Channel channel=connection.createChannel();
18 channel.queueDeclare(queue_name, durable, false, false, null);
19 System.out.println("Wait for message");
20 channel.basicQos(1); //消息分发处理
21 QueueingConsumer consumer=new QueueingConsumer(channel);
22 channel.basicConsume(queue_name, autoAck, consumer);
23 while(true){
24 Thread.sleep(500);
25 QueueingConsumer.Delivery deliver=consumer.nextDelivery();
26 String message=new String(deliver.getBody());
27 System.out.println("Message received:"+message);
28 channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
29 }
30 }
31 }
行22: 设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完并且已经对刚才处理的消息进行确认之后, 才发送下一条消息,防止消费者太过于忙碌。如下图所示: