码迷,mamicode.com
首页 > 其他好文 > 详细

activemq-重发、去重

时间:2017-09-05 15:44:51      阅读:181      评论:0      收藏:0      [点我收藏+]

标签:fetch   etc   span   cti   消费   通信   class   broker   logs   

activemq的consumer端也有窗口机制,通过prefetchSize就可以设置窗口大小。加入窗口是为了批量获取数据,同时可以设置optimizeAcknowledge来优化确认回复,优化确认一方面可以减轻client负担(不需要频繁的确认消息)、减少通信开销,另一方面由于延迟了确认(默认ack了0.65*prefetchSize个消息才确认),broker再次发送消息时又可以批量发送,如果只是开启了prefetchSize,每条消息都去确认的话,broker在收到确认后也只是发送一条消息,当然也可以手动延迟确认。

consumer会维护两个队列,pendingList和dispatchedList,前者存放从broker已接受但未消费(未回调onMessage)的message,后者用于存放已消费但未确认的message(可用于recover,即redelivery)。

activemq的重发机制是session为单位的,并且重发只发生在client端,并不会向broker请求重发消息,只会在重发后向broker发送一个redelivered命令,如果某消息的redelivered次数达到阈值,这条消息就会被清除并送入DLQ。

 1 public void recover() throws JMSException {
 2 
 3   checkClosed();
 4   if (getTransacted()) {
 5     throw new IllegalStateException("This session is transacted");
 6   }
 7    //该session的每个consumer都会recover
 8   for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
 9     ActiveMQMessageConsumer c = iter.next();
10     c.rollback();
11   }
12 
13 }

同样的,message的确认也是session级别的

1 public void acknowledge() throws JMSException {
2   for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
3     ActiveMQMessageConsumer c = iter.next();
4     c.acknowledge();
5   }
6 }

 

通过持久化、确认机制,broker可以保证消息不丢失,即如果consumer未确认消息,consumer都可以再次得到该消息,但broker并不担保消息被client唯一消费。onMessage处理消息时出错,consumer会自动发起recover;重启consumer后,consumer会得到之前未确认的消息;consumer回复了确认,但确认命令还未得到broker处理时,broker挂掉了,broker重启后,consumer依旧会收到之前确认过的消息。这些情况都会产生重复消息,消息的去重需要client自己保证,最简单直接的方式就是处理完消息时,将消息业务唯一标识符入库,每次处理消息时都检查是否存在该标识符。

 

参考:http://activemq.apache.org/message-redelivery-and-dlq-handling.html

  http://activemq.apache.org/redelivery-policy.html

activemq-重发、去重

标签:fetch   etc   span   cti   消费   通信   class   broker   logs   

原文地址:http://www.cnblogs.com/holoyong/p/7474257.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!