Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息。代码实现上其实很简单,就是在消费端添加
consumer.setMessageModel(MessageModel.BROADCASTING);
就可以了。我们看实验步骤:
一、启动ConsumerBroadCastMember1
二、启动ConsumerBroadCastMember2
三、运行ProducerBraodCast
四、我们可以看到两个Consumer都收到了同样的消息。
Producer端:
package org.hope.lee.producer; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendCallback; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; public class ProducerBroadCast { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("push_consumer"); producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); try { // 设置实例名称 producer.setInstanceName("producer_broadcast"); // 设置重试次数 producer.setRetryTimesWhenSendFailed(3); // 开启生产者 producer.start(); // 创建一条消息 Message msg = new Message("topic_broadcast", "TagA", "OrderID0034", "message_broadcast_test".getBytes()); SendResult send = producer.send(msg); System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus()); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } producer.shutdown(); } }
Consumer端:
package org.hope.lee.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; public class ConsumerBroadCastMember1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast"); consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); // 批量消费,每次拉取10条 consumer.setConsumeMessageBatchMaxSize(10); //设置广播消费 consumer.setMessageModel(MessageModel.BROADCASTING); //设置集群消费 // consumer.setMessageModel(MessageModel.CLUSTERING); // 如果非第一次启动,那么按照上次消费的位置继续消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 订阅PushTopic下Tag为push的消息 consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C"); consumer.registerMessageListener(new MqBroadCastListener()); consumer.start(); System.out.println("Consumer1 Started."); } } class MqBroadCastListener implements MessageListenerConcurrently{ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { MessageExt msg = msgs.get(0); String msgBody = new String(msg.getBody(), "utf-8"); System.out.println("msgBody:" + msgBody); } catch(Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
package org.hope.lee.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; public class ConsumerBroadCastMember2 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast"); consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); // 批量消费,每次拉取10条 consumer.setConsumeMessageBatchMaxSize(10); //设置广播消费 consumer.setMessageModel(MessageModel.BROADCASTING); //设置集群消费 // consumer.setMessageModel(MessageModel.CLUSTERING); // 如果非第一次启动,那么按照上次消费的位置继续消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 订阅PushTopic下Tag为push的消息 consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C"); consumer.registerMessageListener(new MqBroadCastListener()); consumer.start(); System.out.println("Consumer2 Started."); } }
结果:
https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api