一、搭建RocketMQ集群
我搭建的是2-master no slave模式,所以在${rocketmq}/conf/2m-noslave/下的 brokder-*.properties 中添加 filterServerNums=1
二、依次启动namesrv和broker
在broker-*.properties中配置了filterServerNums=1后当你启动broker后,会自动启动filter
三、代码部分
3.1 Producer部分
package org.hope.lee.filter; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class FilterProducer { public static void main(String[] args) throws MQClientException { String group_name = "filter_producer"; DefaultMQProducer producer = new DefaultMQProducer(group_name); producer.setNamesrvAddr("xxx.xxx.xx.176:9876;xxx.xx.xx.165:9876"); producer.start(); try { for (int i = 0; i < 100; i++) { Message msg = new Message("TopicFilter7",// topic "TagA",// tag "OrderID001",// key ("Hello MetaQ" + i).getBytes());// body msg.putUserProperty("SequenceId", String.valueOf(i)); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
3.2 Customer部分
package org.hope.lee.filter; import java.io.UnsupportedEncodingException; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.MixAll; import com.alibaba.rocketmq.common.message.MessageExt; public class FilterCustomer { public static void main(String[] args) throws MQClientException { String group_name = "filter_consumer"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name); consumer.setNamesrvAddr("xxx.xxx.xx.176:9876;xxx.xxx.xx.165:9876"); // 使用Java代码,在服务器做消息过滤 String filterCode = MixAll.file2String("E:\\code-on-oschina\\hzjsd1108sohu\\RocketMQ-learn\\rocketmq-api\\src\\main\\java\\org\\hope\\lee\\filter\\MessageFilterImpl.java"); System.out.println(filterCode); consumer.subscribe("TopicFilter7", "org.hope.lee.filter.MessageFilterImpl", filterCode); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); try { System.out.println(new String(msgs.get(0).getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
3.3 实现MessageFilter接口
注意:这个类中不能有任何的中文,包括注释中也不能有。否则在Customer启动的时候是找不到这个文件的。
package org.hope.lee.filter; import com.alibaba.rocketmq.common.filter.MessageFilter; import com.alibaba.rocketmq.common.message.MessageExt; public class MessageFilterImpl implements MessageFilter { @Override public boolean match(MessageExt msg) { // NO Chinese System.out.println("-------------"); String property = msg.getUserProperty("SequenceId"); System.out.println("---------" + property); if (property != null) { int id = Integer.parseInt(property); if((id % 2) == 0) { //if ((id % 3) == 0 && (id > 10)) { return true; } } return false; } }
四、测试:
4.1 运行Customer端
4.2运行Producer端
https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api
参考:
[1] 白贺翔博客,https://www.cnblogs.com/baihexiang/articles/5307073.html
[2] 考拉哥博客,http://lifestack.cn/archives/371.html