标签:配置 clu 网络传输 大量 最大 常见错误 setname 设置 const
Topic 下队列的奇偶数会影响 Customer 个数里面的消费数量
集群模式(默认):
广播模式:
怎么切换模式:通过 setMessageModel()
一个 Message 只有一个 Tag,Tag 是二级分类。过滤分为 Broker 端和 Consumer 端过滤。
一般是监听 * ,或者指定 tag,|| 运算,SLQ92,FilterServer 等;
生产者
@RequestMapping("/api/v1/pay_cb") public Object callback( String tag, String amount) throws Exception { Message message = new Message(JmsConfig.TOPIC,tag, "",tag.getBytes()); // 设置属性,用于sql过滤 message.putUserProperty("amount",amount); SendResult sendResult = payProducer.getProducer().send(message); System.out.printf("发送结果=%s, sendResult=%s \n", sendResult.getSendStatus(), sendResult.toString()); return new HashMap<>(); }
消费者
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; @Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //默认是集群方式,可以更改为广播,但是广播方式不支持重试 consumer.setMessageModel(MessageModel.CLUSTERING); //多标签订阅 //consumer.subscribe(JmsConfig.TOPIC, "order_pay || order_finish || order_create"); //根据sql语法进行过滤消息 consumer.subscribe(JmsConfig.TOPIC, MessageSelector.bySql(" amount > 5 ")); consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); try { System.out.printf("%s 2 Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { System.out.println("消费异常"); e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } }
注意:消费者订阅关系要一致,不然会消费混乱,甚至消息丢失。订阅关系一致:订阅关系由 Topic 和 Tag 组成,同一个 group name,订阅的 Topic 和 Tag 必须是一样的。
在 Broker 端进行 MessageTag 过滤原理:遍历 message queue 存储的 message tag 和 订阅传递的 tag 的 hashcode 是否一样,不一样则跳过,符合的则传输给 Consumer,在 consumer queue 存储的是对应的 hashcode,对比也是通过 hashcode 对比;Consumer 收到过滤消息后也会进行匹配操作,但是是对比真实的 message tag 而不是 hashcode。
建议:单一职责,多个队列;如果想使用多个 Tag,可以使用 sql 表达式,但是不建议。
常见错误:
The broker does not support consumer to filter message by SQL92 解决:broker.conf 里面配置如下 enablePropertyFilter=true 备注,修改之后要重启 Broker master 节点配置:vim conf/2m-2s-async/broker-a.properties slave 节点配置:vim conf/2m-2s-async/broker-a-s.properties
Push 和 Pull 优缺点分析
PushConsumer 本质是长轮训
PullConsumer 需要自己维护 Offset(参考官方例子)
标签:配置 clu 网络传输 大量 最大 常见错误 setname 设置 const
原文地址:https://www.cnblogs.com/jwen1994/p/12364557.html