标签:The ict opp date long 概念 统一 cas prepare
1. 背景最近一直再做一些系统上的压测,并对一些问题做了优化,从这些里面收获了一些很多好的优化经验,后续的文章都会以这方面为主。
这次打压的过程中收获比较的大的是,对RocketMq的一些优化。最开始我们公司使用的是RabbitMq,在一些流量高峰的场景下,发现队列堆积比较严重,导致RabbitMq挂了。为了应对这个场景,最终我们引入了阿里云的RocketMq,RocketMq可以处理可以处理很多消息堆积,并且服务的稳定不挂也可以由阿里云保证。引入了RocketMq了之后,的确解决了队列堆积导致消息队列宕机的问题。
本来以为使用了RocketMq之后,可以万事无忧,但是其实在打压过程中发现了不少问题,这里先提几个问题,大家带着这几个问题在文中去寻找答案:
在RocketMq中提供了多种消息类型让我们进行配置:
虽然配置种类比较繁多,但是使用的还是普通消息和分区顺序消息。后续主要讲的也是这两种消息。
普通消息的发送的代码比较简单,如下所示:
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("test_group_producer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg =
new Message("Test_Topic", "test_tag", ("Hello World").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
其内部核心代码为:
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 1. 根据 topic找到publishInfo
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 如果是同步 就三次 否则就1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
// 循环
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 更新延迟
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
}
} else {
break;
}
}
// 省略
}
主要流程如下:
Step 1:根据Topic 获取TopicPublishInfo,TopicPublishInfo中有我们的Topic发布消息的信息(),这个数据先从本地获取如果本地没有,则从NameServer去拉取,并且定时每隔20s会去获取TopicPublishInfo。
Step 2:获取总共执行次数(用于重试),如果发送方式是同步,那么总共次数会有3次,其他情况只会有1次。
Step 3: 从MessageQueue中选取一个进行发送,MessageQueue的概念可以等同于Kafka的partion分区,看作发送消息的最小粒度。这个选择有两种方式:
Step 4: 将Message发送至选择出来的MessageQueue上的Broker。
Step 5: 更新Broker的延迟。
Step 6: 根据不同的发送方式来处理结果:
可以看见Rocketmq发送普通消息的流程比较清晰简单,下面来看看顺序消息。
顺序消息分为分区顺序消息和全局顺序消息,全局顺序消息比较容易理解,也就是哪条消息先进入,哪条消息就会先被消费,符合我们的FIFO,很多时候全局消息的实现代价很大,所以就出现了分区顺序消息。分区顺序消息的概念可以如下图所示:
我们通过对消息的key,进行hash,相同hash的消息会被分配到同一个分区里面,当然如果要做全局顺序消息,我们的分区只需要一个即可,所以全局顺序消息的代价是比较大的。
对RocketMq熟悉的小伙伴会发现,它其实并没有提供顺序消息发送相关的API,但是在阿里云的RocketMq版本提供了顺序消息的API,原理比较简单,其实也是对现有API的一个封装:
SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object shardingKey) {
int select = Math.abs(shardingKey.hashCode());
if (select < 0) {
select = 0;
}
return mqs.get(select % mqs.size());
}
}, shardingKey);
可以看见顺序消息将MessageQueue的选择交由我们发送方去做,所以我们直接利用我们shardingKey的hashCode进行发送分区。
普通消息使用比较简单,如下面代码所示:
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Test_Consumer");
consumer.subscribe("TopicTest", "*");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(10);
consumer.start();
System.out.printf("Consumer Started.%n");
}
启动Consumer之后,我们就开始真正的从Broker去进行消费了,但是我们如何从Broker去消费的呢?首先在我们的第一步里面我们订阅了一个Topic,我们就会定时去刷新Topic的相关信息比如MessageQueue的变更,然后将对应的MessageQueue分配给当前Consumer:
// 这个数据 是10s更新一次 从内存中获取
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 这个数据实时去拉取
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//通过默认的分配策略进行分配
allocateResult =
strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll,
cidAll);
} catch (Throwable e) {
log.error(
"AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
strategy.getName(), e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
这里首先向Broker拿到当前消费所有的ConsumerId默认是对应机器的Ip+实例名字,Broker中的ConsumerId信息是Consumer通过心跳定时进行上报得来的,然后根据消费分配策略将消息分配给Consumer,这里默认是平均分配,将我们分配到的消息队列,记录在
processQueueTable中,如果出现了新增,那么我们需要创建一个PullRequst代表这拉取消息的请求,异步去处理:
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 这里就是获取我们第一次应该拿什么offset
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
this.dispatchPullRequest(pullRequestList);
在PullService中会不断的从PullRequestQueue拿取数据,然后进行拉取数据。
while (!this.isStopped()) {
try {
// rebalance 之后第一次向这个队列放数据 后续消费的时候会继续放
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
拉取数据之后,这里会给PullCallBack进行响应:
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
如果这里成功拉取到消息的话,我们首先将拉取的消息存入到我们的ProcessQueue中,ProcessQueue用于我们消费者处理的状态以及待处理的消息,然后提交到我们的Consumer线程池中进行真正的业务逻辑消费,然后再提交一个PullRequest用于我们下次消费。
大家看到这里有没有发现这个模式和我们的netty中的单线程accpet,多个线程来处理业务逻辑很相似,其原理都是一样,由一个线程不断的去拉取,然后由我们业务上定义的线程池进行处理。如下图所示:
我们发现我们拉取消息其实是一个循环的过程,这里就来到了第一个问题,如果消息队列消费的速度跟不上消息发送的速度,那么就会出现消息堆积,很多同学根据过程来看可能会以为,我们的拉取消息一直在进行,由于我们的消费速度比较慢,会有很多message以队列的形式存在于我们的内存中,那么会导致我们的JVM出现OOM也就是内存溢出。
那么到底会不会出现OOM呢?其实是不会的,RocketMq对安全性方面做得很好,有下面两段代码:
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
System.out.println(cachedMessageCount + ":"+pullRequest);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
首先是会判断当前内存缓存的Message数量是否大于限制的值默认是1000,如果大于则延迟一段时间再次提交pullRequest。
然后判断当前内存缓存的Size大小是否大于了某个值,默认是100M,如果大于也会延迟一段时间再次提交pullRequest。
所以在我们consumer上如果出现消息堆积,基本也没有什么影响。
那我们想想第二个问题应该怎么解决呢?在普通消息的场景下,如何提升消费速度?
在rocketmq中对消息的消费结果处理也比较重要,这里还是先提三个问题:
首先我们来看第一个问题,怎么处理消费结果,在proce***esult中有如下代码:
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
在上面的第四步中,如果不深入进去看内部逻辑,这里会误以为,他会将当前消息的offset给更新到最新的消费进度,那问题三中说的中间的offset是有可能被丢失的,但实际上是不会发生的,具体的逻辑保证在removeMessage中:
public long removeMessage(final List<MessageExt> msgs) {
long result = -1;
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
result = this.queueOffsetMax + 1;
int removedCnt = 0;
for (MessageExt msg : msgs) {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (Throwable t) {
log.error("removeMessage exception", t);
}
return result;
}
在removeMessage中通过msgTreeMap去做了一个保证,msgTreeMap是一个TreeMap,根据offset升序排序,如果treeMap中有值的话,他返回的offset就会是当前msgTreeMap中的firstKey,而不是当前的offset,从而就解决了问题三。
上面的过程总结为下图所示:
顺序消息的消费前面过程和普通消息基本一样,这里我们需要关注的是将消息丢给我们消费线程池之后的逻辑:
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// 省略
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
// 省略
}
可以发现这里比普通消息多了一个步骤,那就是加锁,这里会获取到以messageQueue为纬度去加锁,然后去我们的processQueue中获取到我们的Message, 这里也是用的我们的msgTreeMap, 获取的最小offset的Message。
所以我们之前的线程池提高并发速度的策略在这里没有用了,那么应该怎么办呢?既然我们加锁是以messageQueue为纬度,那么增加MessageQueue就好了,所以这里的提升消费速度刚好和普通消息相反,再普通消息中提升Messagequeue可能效果并没有那么大,但是在顺序消息的消费中提升就很大了。
我们在压测的时候,发现顺序消息消费很慢,消息堆积很严重,经过调试发现阿里云上的rocketmq默认读写队列为16,我们consumer机器有10台,每个consumer线程池大小为10,理论并发应该有100,但是由于顺序消息的原因导致实际并发只有16,最后找阿里的技术人员将读写队列扩至100,这样充分利用我们的资源,极大的增加了顺序消息消费的速度,消息基本不会再堆积。
顺序消息的结果处理和普通消息的处理流程,稍有不同,代码如下:
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
switch (status) {
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
break;
default:
break;
}
}
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
return continueConsume;
}
这里回到我们的第三个问题,如何设置消息消费的重试次数呢?由于我们直接使用的阿里云的mq,所以我们又包装了一层,方便接入。在接入层中我们最开始统一配置了最大重试2000次,这里设置2000次的原因主要是想让我们的消息队列尽量无限重试,因为我们默认消息基本最终会成功,但是为了以防万一,所以这里设置了一个较大的数值2000次。设置2000次对于我们的普通消息,基本没什么影响,因为他会重新投递至broker,但是我们的顺序消息是不行的,如果顺序消息设置重试2000次,当遇到了这种不可能成功的消息的时候就会导致消息一直在本地进行重试,并且由于对队列加锁了,所以当前MessageQueue将会一直被阻塞,导致后续消息不会被消费,如果设置2000次那么至少会阻塞半个小时以上。所以这里应该将顺序消息设置一个较小的值,目前我们设置为16。
之前没怎么看过Rocketmq的源码,经过这次打压,从Rocketmq中学习到了很多精妙优秀的设计,将一些经验提炼成了文中的一些问题,希望大家能仔细阅读,找到答案。
如果大家觉得这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O:
标签:The ict opp date long 概念 统一 cas prepare
原文地址:https://blog.51cto.com/14980978/2544651