码迷,mamicode.com
首页 > 其他好文 > 详细

RocketMq入门之消息发送和接收

时间:2017-06-12 22:24:35      阅读:1864      评论:0      收藏:0      [点我收藏+]

标签:context   list   tags   cep   srv   logs   term   java   rac   

一、消息产生、发送

 1 public class Producer {
 2 public static void main(String[] args) throws MQClientException {
 3   DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
 4   producer.setNamesrvAddr("172.18.4.114:9876");
 5   producer.setInstanceName("producer");
 6   producer.start();
 7   try {
 8     for (int i = 0; i < 10; i++) {
 9     Thread.sleep(5000); //每5秒发送一次MQ
10     Message msg = new Message("TopicA-test",// topic
11       "TagA",// tag
12       (new Date() + " Hello RocketMQ ,QuickStart" + i)
13       .getBytes()// body
14       );
15     SendResult sendResult = producer.send(msg);
16     }
17   } catch (Exception e) {
18     e.printStackTrace();
19   }
20   producer.shutdown();
21   }
22 }

二、消息接收、消费

 1 import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
 2 import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 3 import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 4 import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 5 import com.alibaba.rocketmq.client.exception.MQClientException;
 6 import com.alibaba.rocketmq.common.message.MessageExt;
 7 
 8 import java.util.List;
 9 
10 
11 public class Consumer {
12     public static void main(String[] args) throws MQClientException {
13         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
14 
15         consumer.setNamesrvAddr("172.18.4.114:9876");
16         consumer.setInstanceName("consumer");
17         consumer.subscribe("TopicA-test", "TagA");
18 
19         consumer.registerMessageListener(new MessageListenerConcurrently() {
20                 @Override
21                 public ConsumeConcurrentlyStatus consumeMessage(
22                     List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
23                     for (MessageExt msg : msgs) {
24                         System.out.println(new String(msg.getTopic()));
25                         System.out.println(new String(msg.getTags()));
26                         System.out.println("=== " + new String(msg.getBody()));
27                     }
28 
29                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
30                 }
31             });
32         consumer.start();
33         System.out.println("Consumer Started.");
34     }
35 }

 



RocketMq入门之消息发送和接收

标签:context   list   tags   cep   srv   logs   term   java   rac   

原文地址:http://www.cnblogs.com/yoyotl/p/6993787.html

(1)
(1)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!