码迷,mamicode.com
首页 > 系统相关 > 详细

RocketMQ在linux下安装部署

时间:2019-09-07 22:50:12      阅读:231      评论:0      收藏:0      [点我收藏+]

标签:apach   环境   命名   override   too   博客   ati   test   tac   

本博客以当前RocketMQ最新版介绍:v4.4.0

环境要求

  1. 64位JDK 1.8+;
  2. Maven 3.2.x; // 源码编译时需要用到

二进制文件安装

  1. 下载二进制文件:http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip
  2. 二进制文件放到任意目录(由于是纯净的ubuntu镜像,docker环境,因此我放在/root/xxx目录下)
  3. 解压zip包,并重命名文件名
> unzip  rocketmq-all-4.4.0-bin-release.zip && mv rocketmq-all-4.4.0-bin-release rocketmq
  1. 启动server
> cd /root/rocketmq
> nohup sh bin/mqnamesrv & // 第一次安装时,可执行sh bin/mqnamesrv观察是否能够启动
> tailf -f ~/logs/rocketmqlogs/namesrv.log 
// 观察到以下日志时,server已启动成功
2019-09-07 18:06:13 INFO main - The Name Server boot success. serializeType=JSON
  1. 启动broker
> nohup sh bin/mqbroker -n localhost:9876
> tailf -f ~/logs/rocketmqlogs/broker.log 
// 观察到以下日志时,server已启动成功
2019-09-07 20:40:06 INFO main - The broker[0daf9bd41237, 172.17.0.2:10911] boot success. serializeType=JSON and name server is 172.17.0.2:9876

注:broker启动如果过一会直接退出,无任何日志或报错的话,检查一下机子的内存是否充足。RocketMQ的broker默认内存为8g。
修改文件:/root/rocketmq/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

  1. 测试生产者和消费者
// 在测试之前,我们需要先设置环境变量:export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

Producer的源码

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 默认消费组
        DefaultMQProducer producer = new DefaultMQProducer("default");

        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 消息发送
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        // 关闭生产者
        producer.shutdown();
    }
}

Consumer的源码

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 指定消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default");
       // 设置消费偏移点
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅topic,以及tag
        consumer.subscribe("TopicTest", "*");
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 收到数据后,返回ack确认
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
  1. 关闭server和broker
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

RocketMQ在linux下安装部署

标签:apach   环境   命名   override   too   博客   ati   test   tac   

原文地址:https://www.cnblogs.com/chenkaiyin1201/p/11483286.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!