标签:return quic 注意 端口 except push 异常 png 发送
准备两台机器,一主一从:
机器IP | hostname | 角色 |
---|---|---|
192.168.243.169 | rocketmq01 | master |
192.168.243.170 | rocketmq02 | slave |
我这里事先在两台机器上安装好了RocketMQ,关于RocketMQ的安装可以参考如下文章:
接下来,我们开始搭建RocketMQ主从集群。首先,配置两台机器的hosts
:
$ vim /etc/hosts
192.168.243.169 rocketmq-nameserver1 rocketmq-master1
192.168.243.170 rocketmq-nameserver2 rocketmq-slave1
修改master节点的配置文件:
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a.properties
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a.properties
#节点所属的集群名称
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq-4.7.1/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq-4.7.1/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq-4.7.1/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
修改slave节点的配置文件:
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a-s.properties
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a-s.properties
#节点所属的集群名称
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq-4.7.1/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq-4.7.1/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq-4.7.1/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
然后将这两个配置文件拷贝到Slave节点上:
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a-s.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties
完成配置后,就可以启动RocketMQ了,在master节点上执行如下命令:
[root@rocketmq01 ~]# nohup sh mqnamesrv &
[root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
在slave节点上执行如下命令:
[root@rocketmq02 ~]# nohup sh mqnamesrv &
[root@rocketmq02 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
启动完成后,分别在两个节点上检查下服务的进程和端口是否正常:
[root@rocketmq01 ~]# jps
1942 Jps
1739 NamesrvStartup
1775 BrokerStartup
[root@rocketmq01 ~]# netstat -lntp |grep java
tcp6 0 0 :::10909 :::* LISTEN 1775/java
tcp6 0 0 :::10911 :::* LISTEN 1775/java
tcp6 0 0 :::10912 :::* LISTEN 1775/java
tcp6 0 0 :::9876 :::* LISTEN 1739/java
[root@rocketmq01 ~]#
修改RocketMQ的管控台配置,并启动:
[root@rocketmq01 ~]# cd /usr/local/src/rocketmq-externals/rocketmq-console/
[root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# vim src/main/resources/application.properties
# 增加nameserver的地址
rocketmq.config.namesrvAddr=192.168.243.169:9876;192.168.243.170:9876
[root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# java -jar target/rocketmq-console-ng-2.0.0.jar
此时在管控台中可以看到有两个节点了:
在Dashboard中也可以看到有两个Broker:
创建一个普通的Maven项目,pom
文件添加rocketmq-client
依赖如下:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
生产者代码示例:
package com.zj.rocketmq.learn.quickstart;
import com.zj.rocketmq.learn.constant.Constants;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.UUID;
/**
* rocketmq - 生产者
*
* @author 01
* @date 2020-11-30
**/
public class Producer {
public static void main(String[] args) throws Exception {
// 在rocketmq中生产者必须在一个生产者组内
String producerGroup = "quickstart_producer_group";
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置nameserver的地址
producer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESSES);
// 启动生产者
producer.start();
// 消息投递的目标主题
String topic = "quickstart_topic";
// 给消息打一个标签,标签的主要作用是用来过滤的
String tag = "quickstart_tag";
// 给消息设置一个key,是消息的唯一标识
String key = UUID.randomUUID().toString();
// 消息体,即具体的消息内容
String body = "this is quickstart message!";
Message message = new Message(topic, tag, key, body.getBytes());
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println("消息发送结果:" + sendResult);
producer.shutdown();
}
}
常量类代码:
public class Constants {
public static final String NAME_SERVER_ADDRESSES = "192.168.243.169:9876;192.168.243.170:9876";
}
运行生产者代码发送一条消息,控制台输出如下:
消息发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A8010B36502437C6DC998FAEE00000, offsetMsgId=C0A8F3A900002A9F0000000000033234, messageQueue=MessageQueue [topic=quickstart_topic, brokerName=broker-a, queueId=1], queueOffset=0]
此时将主节点给停掉,模拟宕机:
[root@rocketmq01 ~]# mqshutdown broker
然后编写消费者端,代码如下:
package com.zj.rocketmq.learn.quickstart;
import com.zj.rocketmq.learn.constant.Constants;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
/**
* rocketmq - 消费者
*
* @author 01
* @date 2020-11-30
**/
public class Consumer {
public static void main(String[] args) throws Exception {
// 定义消费者组
String consumerGroup = "quickstart_consumer_group";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
// 设置nameserver的地址
consumer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESS);
// 设置从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 从哪个主题消费数据
String topic = "quickstart_topic";
// 用于匹配消息标签的表达式
String subExpression = "*";
// 订阅主题
consumer.subscribe(topic, subExpression);
// 注册消息监听器,在监听器中实现消息的处理逻辑
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println("------------- 接收到消息,开始进行业务处理 -------------");
for (MessageExt msg : msgs) {
try {
System.out.printf("topic: %s, tags: %s, keys: %s, body: %s%n",
msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody()));
if ("0".equals(msg.getKeys())) {
throw new RuntimeException("模拟业务处理发生异常");
}
} catch (Exception e) {
e.printStackTrace();
int reconsumeTimes = msg.getReconsumeTimes();
System.err.println("reconsumeTimes: " + reconsumeTimes);
if (reconsumeTimes == 3) {
// TODO 重试次数达到阈值,放弃重试,记录日志后续做补偿...
System.out.println("重试次数达到阈值,放弃重试!");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 消息处理失败时返回,由于Broker的重试机制,会重新消费该消息
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消息处理成功时返回
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
System.out.println("consumer started...");
}
}
运行消费者,正常情况下,该消费者依旧能够消费到数据:
重新启动master节点,让其重新加入集群:
[root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
在此过程注意查看消费者的控制台,正常情况下,master重新加入集群,消费者也不会重复消费,因为master会和slave同步offset进度。
标签:return quic 注意 端口 except push 异常 png 发送
原文地址:https://blog.51cto.com/zero01/2557571