码迷,mamicode.com
首页 > 其他好文 > 详细

RocketMQ 死信队列 | 消费者出现异常如何处理?

时间:2020-06-27 16:08:14      阅读:471      评论:0      收藏:0      [点我收藏+]

标签:使用   消费者   比较   怎么办   efault   tar   http   throws   部分   

RocketMQ 重复消费问题 | 订单系统核心流程引入幂等性机制一文中,我们讨论了消息重复消费的问题,比较好的方案是采用在消费侧使用业务判断法来保证接口的幂等性,这样就能避免消息重复消费的问题。

今天要讨论的是消费者代码执行过程中出现异常,我们应该如何处理?

手动提交 offset

首先来看一段代码,Consumer 类是一个消费者类,它我们为它注册了一个监听器,在处理完消息之后,会将消息的状态返回给 RocketMQ,执行成功返回的是消息状态是 CONSUME_SUCCESS

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");

        // 设置 NameServer 地址
        consumer.setNamesrvAddr("");
        // 订阅 Topic
        consumer.subscribe("TopicTest", "*");
        // 这次回调接口,接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
								// 对消息的处理,比如发放优惠券、积分等
								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

画一张图来表示向 RocketMQ 提交消息状态的流程,如图所示:

技术图片

消息者业务代码出现异常怎么办?

再来看一下消费者的代码中监听器的部分,它说如果消息处理成功,那么就返回消息状态为 CONSUME_SUCCESS,也有可能发放优惠券、积分等操作出现了异常,比如说数据库挂掉了。这个时候应该怎么处理呢?

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
								// 对消息的处理,比如发放优惠券、积分等
								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

我们可以把代码改一改,捕获异常之后返回消息的状态为 RECONSUME_LATER 表示稍后重试。

// 这次回调接口,接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    // 对消息的处理,比如发放优惠券、积分等
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 万一发生数据库宕机等异常,返回稍后重试消息的状态
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

            }
        });

重试队列

这个时候,消息会进入到 RocketMQ 的重试队列中。

  • 比如说消费者所属的消息组名称为AAAConsumerGroup
  • 其重试队列名称就叫做%RETRY%AAAConsumerGroup
  • 重试队列中的消息过一段时间会再次发送给消费者,如果还是无法正常执行会再次进入重试队列
  • 默认重试16次,还是无法执行,消息就会从重试队列进入到死信队列

技术图片

死信队列

  • 重试队列中的消息重试16次任然无法执行,将会进入到死信队列
  • 死信队列的名字是 %DLQ%AAAConsumerGroup
  • 死信队列中的消息可以后台开一个线程,订阅%DLQ%AAAConsumerGroup,并不停重试

技术图片

总结

技术图片

本文从消费者的业务代码出现异常讲起,介绍了 RocketMQ 的重试队列和死信队列:

  1. 代码正常执行返回消息状态为CONSUME_SUCCESS,执行异常返回RECONSUME_LATER
  2. 状态为RECONSUME_LATER的消息会进入到重试队列,重试队列的名称为 %RETRY% + ConsumerGroupName
  3. 重试16次消息任然没有处理成功,消息就会进入到死信队列%DLQ% + ConsumerGroupName;

RocketMQ 死信队列 | 消费者出现异常如何处理?

标签:使用   消费者   比较   怎么办   efault   tar   http   throws   部分   

原文地址:https://www.cnblogs.com/shuiyj/p/13198497.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!