RabbitMQ针对这个问题,提供了两种解决方式;
- 事务机制 :RabbitMQ提供了事务机制保证消息投递,RabbitMQ客户端中与事务机制相关的方法有三个: channel.txSelect 和
channel.txCommit 和channel.txRollback
channel.txSelect : 将当前的channel通道设置为事务模式;
channel.txCommit :用于提交事务;
channel.txRollback :用于事务回滚;
标签:并且 live default code rri 生产 false 失败 事务机制
本章节我们主要聊一聊RabbitMQ使用必须考虑的问题,就是消息可靠性!在生产环境下如何确保消息的可靠性投递,我们首先需要考虑两个问题
1、生产者发送消息,是否发送成功?
2、消费者接收消息如何确认以及拒绝?
当然我们所说的可靠并非一个绝对的概念,因网络、硬件、不可抗因素等;可靠性是一个相对的概念,在条件合理的范围内系统所能确保一切尽可能的趋于完美的消息可靠性;
我们来思考一下需要考虑哪些环节;
Send Massage(消息投递者) 在将消息发送到交换器Exchange的时候,默认RabbitMQ不进行确认投递者是不知道是否投递成功,也就是默认情况下生产者是不知道消息有没有正确地到达服务器,没有到达服务器,如果出现如:网络闪断等因素,则这条消息会无法投递到Exchange
Exchange通过RoutingKey将消息路由至Queue ,这个环节中如果无法路由至Queue队列,如何处理该消息?消息已经路由至Queue队列,却发现没有消费者,又如何处理?,是否也有一样的通知机制告诉我们?
在接收者Receive Message(消息消费者) 在接收到消息后,如何通知RabbitMQ我已经接收到该消息?是否消费者也需要一个确认告知RabbitMQ已经接收到消息?
带着这一系列问题,我们先来看看如何进行保障消息投递的确认;
RabbitMQ针对这个问题,提供了两种解决方式;
channel.txSelect : 将当前的channel通道设置为事务模式;
channel.txCommit :用于提交事务;
channel.txRollback :用于事务回滚;
try { channel.txSelect(); channel.basicPublish(exchange , routingKey , MessageProperties.PERSISTENT_TEXT_PLAIN , msg.getBytes()); int result = 1 / 0 ; channel.txCommit(); }catch (Exception e) { e.printStackTrace(); channel.txRollback(); }
、生产者将Channel设置成Confirm模式,当设置Confirm模式后所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始,ID在同个Channel范围是唯一的),一旦消息被投递到所有匹配的队列之后Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
2、如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出;
3、RabbitMQ回调消息的deliveryTag包含了确认消息的ID,此外RabbitMQ也可以设置channel.basicAck 方法中的multiple参数,表示到这个序号之前的所有消息都己经得到了处理;稍后介绍handleNack 和 handleAck两个方法我们再举个说明;
4、confirm的机制是异步的,如果消息成功发送,会返回ack消息供异步处理,如果消息发送失败发生异常,也会返回nack消息,confirm的时间没有明确说明,并且同一个消息只会被confirm一次;
接下来介绍两种confirm方法
//开启confirm模式 channel.confirmSelect(); //模拟发送50条消息 for(int i =0;i<1000;i++){ String message = "Hello World RabbitMQ"; //发送消息 channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); //每发送2条判断一次是否回复 if(i%2==0){ //waitForConfirms可以换成带有时间参数的方法waitForConfirms(Long mills)指定等待响应时间 if(channel.waitForConfirms()){ System.out.println("Message send success."); } } }
批量的方法从数量级上降低了confirm的性能消耗,提高了效率,但是批量confmn方式的问题在于遇到RabbitMQ服务端返回Basic.Nack 需要重发批量消息而导致的性能降低,也可能导致消息重复消费
依旧还是先看代码:生产者
1 public class ConfirmProducer { 2 3 public static void main(String[] args) throws Exception { 4 //1 创建ConnectionFactory 5 ConnectionFactory connectionFactory = new ConnectionFactory(); 6 connectionFactory.setHost("192.168.1.28"); 7 connectionFactory.setPort(5672); 8 connectionFactory.setVirtualHost("/"); 9 connectionFactory.setUsername("toher"); 10 connectionFactory.setPassword("toher888"); 11 //2 创建Connection 12 Connection connection = connectionFactory.newConnection(); 13 //3 创建Channel 14 Channel channel = connection.createChannel(); 15 //4 指定我们的消息投递模式: 消息的确认模式 16 channel.confirmSelect(); 17 //5 声明交换机 以及 路由KEY 18 String exchangeName = "test_confirm_exchange"; 19 String routingKey = "confirm.send"; 20 //6 发送一条消息 21 String msg = "Test Confirm Message"; 22 channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); 23 //7 添加确认监听 24 channel.addConfirmListener(new ConfirmListener(){ 25 @Override 26 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 27 System.err.println("收到NACK应答"); 28 } 29 @Override 30 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 31 System.err.println("收到ACK应答"); 32 } 33 }); 34 } 35 36 }
消费者:
public class ConfirmConsumer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.1.28"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("toher"); connectionFactory.setPassword("toher888"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_confirm_exchange"; //指定类型为topic String exchangeType = "topic"; String queueName = "test_confirm_queue"; //因为*号代表匹配一个单词,生产者中routingKey3将匹配不到 String routingKey = "confirm.*"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 channel.queueDeclare(queueName, true, false, false, null); //建立一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //5 创建消费者 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费端:" + msg); } }; //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); } }
运行效果
从上面代码我们可以看到有重写了ConfirmListener两个方法:handleNack 和 handleAck,分别用来处理RabbitMQ 回传的Basic.Nack和Basic.Ack;
它们都有两个参数:
long deliveryTag : 前面介绍确认消息的ID
boolean multiple : multiple 是否批量 如果是True 则将比该deliveryTag小的所有数据都移除 否则只移除该条;
我们简单的用一个数组来说明 [1,2,3,4]存储着4条消息ID , 此时确认消息返回的是 deliveryTag = 3 ,multiple = true那么RabbitMQ会通知我们小于ID3的消息得到确认了,如果multiple = false, 就通知我们ID3的确认了
我们再用修改一下上面的代码看一下
//声明一个用来记录消息唯一ID的有序集合SortedSet final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); //开启confirm模式 channel.confirmSelect(); //异步监听方法 处理ack与nack方法 channel.addConfirmListener(new ConfirmListener() { //处理ack multiple 是否批量 如果是批量 则将比该条小的所有数据都移除 否则只移除该条 public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag).clear(); } else { confirmSet.remove(deliveryTag); } } //处理nack 与ack相同 public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag).clear(); } else { confirmSet.remove(deliveryTag); } } });
以上代码按照每一个comfirm的通道维护一个集合,每发送一条数据,集合增加一个元素,每异步响应一条ack或者nack的数据,集合删除一条。SortedSet是一个有序的集合,它的有序是值大小的有序,不是插入时间的有序。JDK中waitForConfirms()方法也是使用了SortedSet集合
————————————————
版权声明:本文为CSDN博主「傲泣龙腾」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/lhmyy521125/java/article/details/88064322
标签:并且 live default code rri 生产 false 失败 事务机制
原文地址:https://www.cnblogs.com/hup666/p/13290045.html