标签:使用 消费者 比较 怎么办 efault tar http throws 部分
在RocketMQ 重复消费问题 | 订单系统核心流程引入幂等性机制一文中,我们讨论了消息重复消费的问题,比较好的方案是采用在消费侧使用业务判断法来保证接口的幂等性,这样就能避免消息重复消费的问题。
今天要讨论的是消费者代码执行过程中出现异常,我们应该如何处理?
首先来看一段代码,Consumer
类是一个消费者类,它我们为它注册了一个监听器,在处理完消息之后,会将消息的状态返回给 RocketMQ,执行成功返回的是消息状态是 CONSUME_SUCCESS
。
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
// 设置 NameServer 地址
consumer.setNamesrvAddr("");
// 订阅 Topic
consumer.subscribe("TopicTest", "*");
// 这次回调接口,接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 对消息的处理,比如发放优惠券、积分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
画一张图来表示向 RocketMQ 提交消息状态的流程,如图所示:
再来看一下消费者的代码中监听器的部分,它说如果消息处理成功,那么就返回消息状态为 CONSUME_SUCCESS
,也有可能发放优惠券、积分等操作出现了异常,比如说数据库挂掉了。这个时候应该怎么处理呢?
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 对消息的处理,比如发放优惠券、积分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
我们可以把代码改一改,捕获异常之后返回消息的状态为 RECONSUME_LATER
表示稍后重试。
// 这次回调接口,接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
// 对消息的处理,比如发放优惠券、积分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 万一发生数据库宕机等异常,返回稍后重试消息的状态
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
这个时候,消息会进入到 RocketMQ 的重试队列中。
AAAConsumerGroup
%RETRY%AAAConsumerGroup
%DLQ%AAAConsumerGroup
%DLQ%AAAConsumerGroup
,并不停重试本文从消费者的业务代码出现异常讲起,介绍了 RocketMQ 的重试队列和死信队列:
CONSUME_SUCCESS
,执行异常返回RECONSUME_LATER
RECONSUME_LATER
的消息会进入到重试队列,重试队列的名称为 %RETRY% + ConsumerGroupName
;%DLQ% + ConsumerGroupName
;标签:使用 消费者 比较 怎么办 efault tar http throws 部分
原文地址:https://www.cnblogs.com/shuiyj/p/13198497.html