标签:
最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性,
目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景
比kafka还是有过之无不及,其实kafka文档很丰富
但RocketMQ网上的文章太少,找不到相关的操作教程
于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究
下载源码的地址 https://github.com/alibaba/RocketMQ/releases
首选通过在醒目里面Maven依赖方式引用RocketMQ Java SDK
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
在linux 下用wget 下载源码然后解压出来
在runserver.sh里面可以配置 jvm启动的参数 JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
可以 vi runserver.sh
分别给 mqnamesrv mqbroker play.sh 执行的权限
chmod +x mqnamersrv
chmod +x mqbroker
chmod +x play.sh
下面红线框的这段 命令输入错误了,忽略不用看
通过 nohup sh mqnamesrv& 启动 RocketMq
目前没看到结束的命令,也没找到相关的介绍,
我这里用的 ps -ef|grep rocketmq 查到进程pid
然后kill pid号
或则pkill -9 java [慎用]
用jps -v 查看下java进程的参数
rocketmq启动后监听 9876端口,这里还是在看源码里面看到的,资料实在是太少了
在防火墙配置里面加上 9876端口,设置iptables对外开放
部署Broker
nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties &
这里ip换成本机的就是单机实例,如果配置主从 这里可以配其他的ip
Master和Slave的配置文件参考conf目录下的配置文件
Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数
一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分
部署一Master一Slave,集群采用异步复制方式:
Master: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a.properties &
Slave: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a-s.properties &
package com.pgsqlmybatis.common.rocketmq;/* *************************************************************** * 公司名称 : * 系统名称 :信用管家专业版 * 类 名 称 :Ios渠道idfa统计,推广统计用 * 功能描述 : * 业务描述 : * 作 者 名 :@Author Royal * 开发日期 :2016-05-15 * Created :IntelliJ IDEA *************************************************************** * 修改日期 : * 修 改 者 : * 修改内容 : *************************************************************** */ import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("xxxxxxxxxx:9876"); try { producer.start(); String pushMsg="kafka activeMq rocketMq 消息队列使用1"; Message msg = new Message("PushTopic","push","1", pushMsg.getBytes("UTF-8")); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); String pushMsg2="海量级消息记录单机测试2"; msg = new Message("PushTopic","push","2",pushMsg2.getBytes("UTF-8")); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); String pushMsg3="海量级消息记录单机测试3"; msg = new Message("PullTopic","pull","1",pushMsg3.getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
启动生成者
启动消费者
package com.pgsqlmybatis.common.rocketmq;/* *************************************************************** * 公司名称 : * 系统名称 :信用管家专业版 * 类 名 称 :Ios渠道idfa统计,推广统计用 * 功能描述 : * 业务描述 : * 作 者 名 :@Author Royal * 开发日期 :2016-05-15 * Created :IntelliJ IDEA *************************************************************** * 修改日期 : * 修 改 者 : * 修改内容 : *************************************************************** */ import java.io.UnsupportedEncodingException; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("xxxxxxxxxxxx:9876"); try { consumer.subscribe("PushTopic", "push"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> list, ConsumeConcurrentlyContext Context) { Message msg = list.get(0); System.out.println(msg.toString()); String recString= null; try { recString = new String(msg.getBody() ,"UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println(recString); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
以上为单机实例配置
如果你遇到什么问题可以私信我,如果觉得此文对你很有帮助,点下赞推荐下额^_^
参考:http://blog.csdn.net/a19881029/article/details/34446629
标签:
原文地址:http://www.cnblogs.com/fangyuan303687320/p/5495481.html