码迷,mamicode.com
首页 > 其他好文 > 详细

RocketMq牛刀小试

时间:2016-01-04 11:46:20      阅读:739      评论:0      收藏:0      [点我收藏+]

标签:

 

 1.介绍

 RocketMq是一个纯java、分布式、队列模型的的开源的消息中间件,具有以下特点

 1.能够保证严格的消息顺序

 2.提供丰富的消息拉取模式

 3.高效的消息订阅机制

 4.实时的消息订阅机制

 5.亿级消息的堆积能力


 2.安装(以虚拟机参考)

 RocketMq是java实现的,因此安装的前提必须有java环境,配置好jdk环境,在此就不多说了

 把下载好的alibaba-rocketmq-3.1.1.tar.gz上传到linux服务器,解压:     

 tar -zxvf alibaba-rocketmq-3.1.1.tar.gz

 提升操作的权限 chmod +x ./alibaba-rocketmq/bin/*

 由于我虚拟机的内存比较小,因此在运行过程中会报内存的异常信息,因此需要修改RocketMq启动时的虚拟机参数配置

 

vi  ./alibaba-rocketmq/bin/runserver.sh   #nameserver 内存

vi  ./alibaba-rocketmq/bin/runbroker.sh  #broke内存

JAVA_OPT_1="-server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"(参考你自己的机器内存)

 以上就是配置好的RocketMq的相关信息,下面开始启动RocketMq

 

启动nameserver:nohup ./bin/mqnamesrv >/dev/nameserver.log 2>&1 &         #默认端口9876

关闭nameserver:./bin/mqshutdown namesrv

启动mqbroker :nohup ./bin/mqbroker -n 100.66.51.152:9876 >/dev/broker.log 2>&1 &     #默认端口10911(100.66.51.152:9876为nameserver,链接进行注册)

关闭mqbroker :./bin/mqshutdown broker


3.概念介绍

 技术分享

 


  下面来通过看上图来了解一下RocketMq中的基本组件

  上图中的Consumer和Producer是属于client组件模块中的,主要面对的是开发的模块。主要提供了consumer订阅消息和producer发布消息。

 broker:是每个RocketMQ中最核心的部分,该组件提供消息的存储和分发。用于producer存储消息和consumer中订阅该存储中的消息。

 namesrv:是一个注册中心,每个broker启动则将会将字节的信息发布到namesrv,发布到namesrv的信息包括broker提供的信息。那么client启动的时候,就可以将自己所需要的topic信息向namesrv订阅,然后namesrv通过存储的broker获得信息,直接返回给client端。


 实例讲解

 下面通过一个官方的实例开运行一下,上面搭建好的RocketMq

  生产者

 

/**     
 * @FileName: Producer.java   
 * @Package:com.test   
 * @Description: TODO  
 * @author: LUCKY    
 * @date:2015年12月28日 下午2:32:22   
 * @version V1.0     
 */
package com.test;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

/**
 * @ClassName: Producer
 * @Description: 模拟生产者
 * @author: LUCKY
 * @date:2015年12月28日 下午2:32:22
 */
public class Producer1 {
	public static void main(String[] args) throws Exception {

		DefaultMQProducer producer = new DefaultMQProducer("Producer");
		// 必须要设置nameserver地址
		producer.setNamesrvAddr("100.66.154.81:9876");
		try {
			producer.start();
			for(long i=0l;i<3;i++){
				Message msg = new Message("topic"+i, "push"+i, "1",
						("第"+i+"内容").getBytes());
				SendResult result = producer.send(msg);
				System.out.println(result);
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
	
		}
	}

}

 

 消费者

 

/**     
 * @FileName: Consumer.java   
 * @Package:com.test   
 * @Description: TODO  
 * @author: LUCKY    
 * @date:2015年12月28日 下午2:43:23   
 * @version V1.0     
 */
package com.test;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * @ClassName: Consumer
 * @Description: 模拟消费者
 * @author: LUCKY
 * @date:2015年12月28日 下午2:43:23
 */
public class Consumer4 {

	public static void main(String[] args) {
		DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");
		consumer.setNamesrvAddr("100.66.154.81:9876");
		try {
			// 订阅PushTopic下Tag为push的消息,都订阅消息
			consumer.subscribe("TopicTest", "TagA");
		
			
		
		
			
			// 程序第一次启动从消息队列头获取数据
			consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
			consumer.registerMessageListener(new MessageListenerConcurrently() {

				public ConsumeConcurrentlyStatus consumeMessage(
						List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {
					// msgs中只收集同一个topic,同一个tag,并且key相同的message
					// 会把不同的消息分别放置到不同的队列中
					for(Message msg:msgs){
			
						System.out.println(new String(msg.getBody()));
					}
				
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			
			consumer.resume();
			consumer.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

RocketMq牛刀小试

标签:

原文地址:http://blog.csdn.net/luckyzhoustar/article/details/50453963

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!