一:介绍
1.说明原理
A:生产者将信道设置成confirm模式,一旦信道进到confirm模式,所有该信道上发布的消息都会被指派一个唯一的ID(从1开始)。
一旦消息被投递到所有匹配的队列后,broker就会发送一个确认给生产者,并包括了唯一的ID,这样就使得生产者知道消息已经到达目的队列。
B:如果消息和消息队列是可持久化的,那么确认消息会将消息写入磁盘后发出,broker会回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,并且broker也可以设置basic.ack的multiple域。
用来表示这个序列号之前所有的消息已经得到处理。
二:程序(发送一条数据)
1.生产者
1 package com.mq.confirm; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 public class Send { 8 private static final String QUEUE_NAME="test_queue_confirm"; 9 public static void main(String[] args)throws Exception{ 10 Connection connection= ConnectionUtil.getConnection(); 11 Channel channel=connection.createChannel(); 12 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 13 14 channel.confirmSelect(); 15 String msg="confirm msg"; 16 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); 17 18 if (!channel.waitForConfirms()){ 19 System.out.println("message send failed"); 20 }else{ 21 System.out.println("message send success"); 22 } 23 } 24 }
2.消费者
1 package com.mq.confirm; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class Receive { 9 private static final String QUEUE_NAME="test_queue_confirm"; 10 public static void main(String[] args)throws Exception { 11 Connection connection = ConnectionUtil.getConnection(); 12 Channel channel = connection.createChannel(); 13 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 14 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ 15 @Override 16 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 17 System.out.println(new String(body,"utf-8")); 18 } 19 }); 20 } 21 }
3.现象
发送成功后,在send端会返回发送成功、
在接收端会接受到信息。
三:程序(发送多条)
1.生产者
1 package com.mq.confirm; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 public class SendMultipleMessage { 8 private static final String QUEUE_NAME="test_queue_confirm"; 9 public static void main(String[] args)throws Exception{ 10 Connection connection= ConnectionUtil.getConnection(); 11 Channel channel=connection.createChannel(); 12 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 13 14 channel.confirmSelect(); 15 String msg="confirm msg"; 16 17 //一次性发送多条数据 18 for (int i=0;i<10;i++){ 19 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); 20 } 21 22 //确认一次 23 if (!channel.waitForConfirms()){ 24 System.out.println("message send failed"); 25 }else{ 26 System.out.println("message send success"); 27 } 28 } 29 }
2.原理‘
原理就是一次发送多条数据,然后一次返回判断。
缺点:如果丢失,就是全部丢失。
3.现象