标签:add body eth bat tco bytes sql语句 语句 property
今天我们再来看看RocketMQ的另外两个小功能,消息的批量发送和过滤。这两个小功能提升了我们使用RocketMQ的效率。
以前我们发送消息的时候,都是一个一个的发送,这样效率比较低下。能不能一次发送多个消息呢?当然是可以的,RocketMQ为我们提供了这样的功能。但是它也有一些使用的条件:
好了,只要我们满足上面的这些限制,就可以使用批量发送了,我们来看看发送端的代码吧,
@Test
public void producerBatch() throws Exception {
List<Message> messages = new ArrayList<>();
for (int i = 0;i<3;i++) {
MessageExt message = new MessageExt();
message.setTopic("cluster-topic");
message.setKeys("key-"+i);
message.setBody(("this is batchMQ,my NO is "+i+"---"+new Date()).getBytes());
messages.add(message);
}
SendResult sendResult = defaultMQProducer.send(messages);
System.out.println("sendResult:" + sendResult.getSendStatus().toString());
}
再来看看消费端的代码,
@Bean(initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer pushConsumer() {
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultMQPushConsumer");
consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
consumer.subscribe("cluster-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("msgs.size():"+msgs.size());
if (msgs != null && msgs.size() > 0) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
return consumer;
}catch (Exception e) {
e.printStackTrace();
}
return null;
}
我们启动项目,批量发送一下,看看效果吧,
发送端的日志如下:
sendResult:SEND_OK
发送成功,看来我们批量发送的3个消息都进入到了队列中,再看看消费端,是一次消费一个,还是一次消费3个,如下:
msgs.size():1
this is batchMQ,my NO is 0---Mon Jun 15 09:31:04 CST 2020
msgs.size():1
this is batchMQ,my NO is 1---Mon Jun 15 09:31:04 CST 2020
msgs.size():1
this is batchMQ,my NO is 2---Mon Jun 15 09:31:04 CST 2020
看样子是一次只消费了一个消息,那么能不能一次消费3个消息呢?当然是可以的,不过要进行特殊的设置,
consumer.setConsumeMessageBatchMaxSize(5);
在消费端,我们设置批量消费消息的数量是5,这个值默认是1。我们再看看消费端的日志,
msgs.size():3
this is batchMQ,my NO is 0---Mon Jun 15 09:35:47 CST 2020
this is batchMQ,my NO is 1---Mon Jun 15 09:35:47 CST 2020
this is batchMQ,my NO is 2---Mon Jun 15 09:35:47 CST 2020
这次一次消费了3个消息,如果消息比较多的话,最大一次能消费5个。这就是RocketMQ的批量发送和批量消费。
其实我们在大多数情况下,使用tag标签就能够很好的实现消息过滤。虽然tag标签咱们并没有过多的介绍,其实也很好理解,就是一个子Topic的概念,咱们在构建消息message的时候,message.setTags("xxx")
。然后在消费的时候,订阅Topic的时候,也可以指定订阅的tag,
consumer.subscribe("cluster-topic", "*");
看到那个"*"了吗?它就是订阅的tag,"*"代表全部的tag,如果您想订阅其中的一个或几个,可以使用这种方式"tagA || tagB || tagC",这是订阅了cluster-topic下的3个tag,其他的tag是不会被消费的。
这里我们所说的消息过滤比tag要高级很多,是可以支持sql的,怎么样?高级吧。比如:我们订阅"a > 5 and b = ‘abc‘"的消息,如下图:
但是,RocketMQ毕竟不是数据库,它只能支持一些基础的SQL语句,并不是所有的SQL都支持,
数字型的支持,>
, >=
, <
, <=
, BETWEEN
, =
字符串支持,=,
<>,
IN
IS NULL
或者IS NOT NULL
逻辑判断,AND
,OR
,NOT
;
字段的类型也只是简单的几种,
并且对消费者的类型也有一定的限制,只能使用push consumer才可以进行消息过滤。好了,说了这么多了,我们看看怎么使用吧,消费端和生产端都要进行相应的改造,先看看生产端吧,
@Test
public void producerBatch() throws Exception {
List<Message> messages = new ArrayList<>();
for (int i = 0;i<3;i++) {
MessageExt message = new MessageExt();
message.setTopic("cluster-topic");
message.setKeys("key-"+i);
message.setBody(("this is batchMQ,my NO is "+i+"---"+new Date()).getBytes());
int a = i+4;
message.putUserProperty("a",String.valueOf(a));
messages.add(message);
}
SendResult sendResult = defaultMQProducer.send(messages);
System.out.println("sendResult:" + sendResult.getSendStatus().toString());
}
我们在之前批量发送的基础上进行了修改,定义了a的值,等于i+4,这样循环3次,a的值就是4,5,6。然后调用message.putUserProperty("a",String.valueOf(a))
,注意,在使用消息过滤的时候,这些附加的条件属性都是通过putUserProperty方法进行设置。这里,我们设置了a的值。再看看消费端,
consumer.subscribe("cluster-topic", MessageSelector.bySql("a > 5"));
消费端,整体上没有变化,只是在订阅的方法中,使用MessageSelector.bySql("a > 5")
,进行了条件的过滤。有的小伙伴可能会有疑问,我既想用sql过滤又想用tag过滤怎么办?当然也是可以,我们可以使用MessageSelector.bySql("a > 5").byTag("xx)
,byTag和bySql不分前后,怎么样,很强大吧。我们运行一下程序,看看效果吧。
我们启动一下服务,报错了,怎么回事?错误信息如下:
The broker does not support consumer to filter message by SQL92
队列不支持过滤消息,我们查询了RocketMQ源码中的BrokerConfig类,这个类就是对broker的一些设置,其中发现了这两个属性,
// whether do filter when retry.
private boolean filterSupportRetry = false;
private boolean enablePropertyFilter = false;
我们把这两个属性在broker的配置文件改为true吧,如下:
filterSupportRetry=true
enablePropertyFilter=true
然后,再重新部署一下我们两主两从的集群环境。环境部署完以后,我们再重启应用,没有报错。在生产端发送一下消息看看吧,
sendResult:SEND_OK
生产端发送消息没有问题,说明3个消息都发送成功了。再看看消费端的日志,
msgs.size():1
this is batchMQ,my NO is 2---Mon Jun 15 10:59:37 CST 2020
只消费了一个消息,并且这个消息中i的值是2,那么a的值就是2+4=6,它是>5的,满足SQL的条件,所以被消费掉了。这完全符合我们的预期。
今天的两个小功能还是比较有意思的,但里边也有需要注意的地方,
我们在使用的时候,多加留意就可以了,有问题,评论区留言吧~
标签:add body eth bat tco bytes sql语句 语句 property
原文地址:https://www.cnblogs.com/boboooo/p/13129755.html