标签:esc author 最好 再处理 ima bytes png 监听器 HERE
部分内容出处 https://www.jianshu.com/p/453c6e7ff81c
rocketmq内部有4个默认的队里,在发送消息时,同一组的消息需要按照顺序,发送到相应的mq中,同一组消息按照顺序进行消费,不同组的消息可以并行的进行消费。
下面看一下producer的代码:
package com.alibaba.rocketmq.example.message.order; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; /** * @author : Jixiaohu * @Date : 2018-04-19. * @Time : 9:20. * @Description : */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException { String groupName = "order_producer"; DefaultMQProducer producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); producer.start(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String dateStr = sdf.format(new Date()); try { for (int i = 1; i <= 3; i++) { String body = dateStr + "Hello RoctetMq : " + i; Message msg = new Message("Topic1", "Tag1", "Key" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer) o; return list.get(id); } }, 0); //0是队列的下标 System.out.println(sendResult); } for (int i = 1; i <= 3; i++) { String body = dateStr + "Hello RoctetMq : " + i; Message msg = new Message("Topic1", "Tag1", "Key" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer) o; return list.get(id); } }, 1); //1是队列的下标 System.out.println(sendResult); } for (int i = 1; i <= 3; i++) { String body = dateStr + "Hello RoctetMq : " + i; Message msg = new Message("Topic1", "Tag1", "Key" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer) o; return list.get(id); } }, 2); //2是队列的下标 System.out.println(sendResult); } for (int i = 1; i <= 3; i++) { String body = dateStr + "Hello RoctetMq : " + i; Message msg = new Message("Topic1", "Tag1", "Key" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer) o; return list.get(id); } }, 3); //3是队列的下标 System.out.println(sendResult); } for (int i = 1; i <= 3; i++) { String body = dateStr + "Hello RoctetMq : " + i; Message msg = new Message("Topic1", "Tag1", "Key" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer) o; return list.get(id); } }, 4); //4是队列的下标 System.out.println(sendResult); } for (int i = 1; i <= 3; i++) { String body = dateStr + "Hello RoctetMq : " + i; Message msg = new Message("Topic1", "Tag1", "Key" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer) o; return list.get(id); } }, 5); //5是队列的下标 System.out.println(sendResult); } } catch (RemotingException e) { e.printStackTrace(); Thread.sleep(1000); } producer.shutdown(); } }
这边发送多组消息,每组消息的顺序分别为1,2,3,
下面查看consumer1,和consumer2,因为要顺序消费,需要注意的是,这两个消费者的监听器是MessageListenerOrderly,两个的代码一样,我这边就只展示一个consumer的代码
package com.alibaba.rocketmq.example.message.order; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; /** * @author : Jixiaohu * @Date : 2018-04-23. * @Time : 9:35. * @Description : 顺序消息消费 */ public class Consumer1 { public Consumer1() throws Exception { String groupName = "order_producer"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); /** * 设置Consumer第一次启动是从队列头开始消费还是队列尾开始消费 * 非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //订阅的主题,以及过滤的标签内容 consumer.subscribe("Topic1", "*"); //注册监听 consumer.registerMessageListener(new Listener()); consumer.start(); System.out.println("Consumer1 Started."); } class Listener implements MessageListenerOrderly { private Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) { // 设置自动提交 context.setAutoCommit(true); for (MessageExt msg : list) { System.out.println(msg + ",context" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(random.nextInt(1)); } catch (InterruptedException e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } } public static void main(String[] args) throws Exception { new Consumer1(); } }
还是按照先启动consumer的顺序,在启动producer的顺序。
这边看一下控制台的信息
总共6组消息,broker-a上接收到4组消息,broker-b上接收到2组消息,同一组的3条消息会发送到同一个broker的同一个队列中,这样才能保证顺序消费,
下面看一下consumer1和consumer2的打印结果
由于顺序消费只能单线程,
可以看到,这边的queueid都是3个 3个打印,不会出现交替,下面看一下一组消息的消费顺序,
可以看到,消息是按照发送的顺序,进行消费,consumer2的打印结果也是类似的,不过consumer2消费了6条消息。
这样就实现了rocket的顺序消费,虽然实现了顺序消费,由于网络通信,会存在着重复数据的问题,
重复数据的问题,rocket不提供解决方案,让业务方自行解决,主要有两个方法:
第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。
第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。
RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。
标签:esc author 最好 再处理 ima bytes png 监听器 HERE
原文地址:https://www.cnblogs.com/shmilyToHu/p/8933500.html