标签:
1.介绍
RocketMq是一个纯java、分布式、队列模型的的开源的消息中间件,具有以下特点
1.能够保证严格的消息顺序
2.提供丰富的消息拉取模式
3.高效的消息订阅机制
4.实时的消息订阅机制
5.亿级消息的堆积能力
2.安装(以虚拟机参考)
RocketMq是java实现的,因此安装的前提必须有java环境,配置好jdk环境,在此就不多说了
把下载好的alibaba-rocketmq-3.1.1.tar.gz上传到linux服务器,解压:
tar -zxvf alibaba-rocketmq-3.1.1.tar.gz
提升操作的权限 chmod +x ./alibaba-rocketmq/bin/*
由于我虚拟机的内存比较小,因此在运行过程中会报内存的异常信息,因此需要修改RocketMq启动时的虚拟机参数配置
vi ./alibaba-rocketmq/bin/runserver.sh #nameserver 内存
vi ./alibaba-rocketmq/bin/runbroker.sh #broke内存
JAVA_OPT_1="-server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"(参考你自己的机器内存)
以上就是配置好的RocketMq的相关信息,下面开始启动RocketMq
启动nameserver:nohup ./bin/mqnamesrv >/dev/nameserver.log 2>&1 & #默认端口9876
关闭nameserver:./bin/mqshutdown namesrv
启动mqbroker :nohup ./bin/mqbroker -n 100.66.51.152:9876 >/dev/broker.log 2>&1 & #默认端口10911(100.66.51.152:9876为nameserver,链接进行注册)
关闭mqbroker :./bin/mqshutdown broker
3.概念介绍
下面来通过看上图来了解一下RocketMq中的基本组件
上图中的Consumer和Producer是属于client组件模块中的,主要面对的是开发的模块。主要提供了consumer订阅消息和producer发布消息。
broker:是每个RocketMQ中最核心的部分,该组件提供消息的存储和分发。用于producer存储消息和consumer中订阅该存储中的消息。
namesrv:是一个注册中心,每个broker启动则将会将字节的信息发布到namesrv,发布到namesrv的信息包括broker提供的信息。那么client启动的时候,就可以将自己所需要的topic信息向namesrv订阅,然后namesrv通过存储的broker获得信息,直接返回给client端。
实例讲解
下面通过一个官方的实例开运行一下,上面搭建好的RocketMq
生产者
/**
* @FileName: Producer.java
* @Package:com.test
* @Description: TODO
* @author: LUCKY
* @date:2015年12月28日 下午2:32:22
* @version V1.0
*/
package com.test;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
/**
* @ClassName: Producer
* @Description: 模拟生产者
* @author: LUCKY
* @date:2015年12月28日 下午2:32:22
*/
public class Producer1 {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
// 必须要设置nameserver地址
producer.setNamesrvAddr("100.66.154.81:9876");
try {
producer.start();
for(long i=0l;i<3;i++){
Message msg = new Message("topic"+i, "push"+i, "1",
("第"+i+"内容").getBytes());
SendResult result = producer.send(msg);
System.out.println(result);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
消费者
/**
* @FileName: Consumer.java
* @Package:com.test
* @Description: TODO
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
* @version V1.0
*/
package com.test;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* @ClassName: Consumer
* @Description: 模拟消费者
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
*/
public class Consumer4 {
public static void main(String[] args) {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");
consumer.setNamesrvAddr("100.66.154.81:9876");
try {
// 订阅PushTopic下Tag为push的消息,都订阅消息
consumer.subscribe("TopicTest", "TagA");
// 程序第一次启动从消息队列头获取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// msgs中只收集同一个topic,同一个tag,并且key相同的message
// 会把不同的消息分别放置到不同的队列中
for(Message msg:msgs){
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.resume();
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
标签:
原文地址:http://blog.csdn.net/luckyzhoustar/article/details/50453963