标签:默认 复杂度 中间 unicode caff read 增加 公司 还原
1.背景上篇文章介绍了RocketMQ整体架构和原理有兴趣的可以阅读一下,在这篇文章中的延时消息部分,我写道开源版的RocketMQ只提供了18个层级的消息队列延时,这个功能在开源版中显得特别鸡肋,但是在阿里云中的RocketMQ却提供了支持40天之内任意秒级延时队列,果然有些功能你只能充钱才能拥有。当然你或许想换一个开源的消息队列,在开源社区中消息队列延时消息很多都没有被支持比如:RabbitMQ,Kafka等,都只能通过一些特殊方法才能完成延时的功能。为什么这么多都没有实现这个功能呢?是因为技术难度比较复杂吗?接下来我们分析一下如何才能实现一个延时消息。
在实现分布式消息队列的延时消息之前,我们想想我们平时是如何在自己的应用程序上实现一些延时功能的?在Java中可以通过下面的方式来完成我们延时功能:
ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,我们提交任务的时候,会将任务首先提交到DelayedWorkQueue一个优先级队列中,按照过期时间进行排序,这个优先级队列也就是我们堆结构,每次提交任务排序的复杂度是O(logN)。然后取任务的时候就会从堆顶取出我们的任务,也就是我们延迟时间最小的任务。ScheduledThreadPoolExecutor有个好处是执行延时任务可以支持多线程并行执行,因为他继承的是ThreadPoolExecutor。
我们实现本地延时比较简单,直接使用Java中现成的即可,那我们分布式消息队列的实现有哪些难点呢?
有很多同学首先会想到我们实现分布式消息队列的延时任务,可不可以直接使用本地的那一套,用ScheduledThreadPoolExecutor,Timer,当然这是可以的,前提是你的消息量很小,但是我们分布式消息队列往往都是企业级别的中间件,数据量都是非常的大,那么我们纯内存的方案肯定是行不通的。所以我们就有了下面这几个方案来解决我们这个问题。
数据库一般来说是我们很容易想到的一个办法,我们通常可以建立下面这样一个表:
CREATE TABLE `delay_message` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`excute_time` bigint(16) DEFAULT NULL COMMENT ‘执行时间,ms级别‘,
`body` varchar(4096) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT ‘消息体‘,
PRIMARY KEY (`id`),
KEY `time_index` (`excute_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
这个表中我们使用excute_time代表我们真实的执行时间,并且对其建立索引,然后在我们的消息服务中,启动一个定时任务,定时从数据库中扫描已经可以执行的消息,然后开始执行,具体流程如下面所示:
使用数据库的方法是一个比较原始的方法,在没有延时消息这个概念之前,要做一个订单多少分钟过期的这种功能,通常使用这个方法去完成。而这个方法通常也比较局限于我们单个业务,如果想扩展为我们企业级的一个中间件的话是不行的,因为mysql由于BTree的特性,会随着维护二级索引的开销越来越大,导致写入会越来越慢,所以这个方案通常不会被考虑。
我们之前介绍RocketMQ在开源版本中只实现了18个Level的延时消息,但是有很多公司基于RocketMQ做了自己的一套支持任意时间的延时消息,在美团内部封装了RocketMQ使用LevelDB做了对延时消息的封装,在滴滴开源的DDMQ中,使用了RocksDB对RocketMQ的延时消息部分进行了封装。
其原理基本和Mysql类似,如下图所示:
为什么同样是数据库RocksDB会比Mysql更加合适呢?因为RocksDB的特性是LSM树,其使用场景适用于大量写入,和消息队列的场景更加契合,所以这个也是滴滴和美团选择其作为延时消息封装的存储介质。
再说时间轮之前,让我们再次回到我们的实现本地延时的时候使用的ScheduledThreadPoolExecutor还有Timer,他们都是使用的优先级队列完成的,优先级队列本质上也就是堆结构,堆结构的插入的时间复杂度是O(LogN),如果未来我们的内存可以做到无限,我们使用使用优先级队列去做延时消息的存储,但是随着消息的增多,我们的插入消息的效率也会越来越低,那么怎么才能让我们的插入消息的效率不随着消息的增多而变低呢?答案就是时间轮。
什么是时间轮呢?其实我们可以简单的将其看做是一个多维数组。在很多框架中都使用了时间轮来做一些定时的任务,用来替代我们的Timer,比如我之前讲过的有关本地缓存Caffeine一篇文章,在Caffeine中是一个二层时间轮,也就是二维数组,其一维的数据表示较大的时间维度比如,秒,分,时,天等,其二维的数据表示该时间维度较小的时间维度,比如秒内的某个区间段。当定位到一个TimeWhile[i][j]之后,其数据结构其实是一个链表,记录着我们的Node。在Caffeine利用时间轮记录我们在某个时间过期的数据,然后去处理。
由于时间轮是一个数组的结构,那么其插入复杂度是O(1)。我们解决了效率之后,但是我们的内存依旧不是无限的,我们时间轮如何使用呢?答案当然就是磁盘,在去哪儿开源的QMQ中已经实现了时间轮+磁盘存储,这里为了方便描述我将其转化为RocketMQ中的结构来进行讲解,实现图如下:
时间轮+磁盘存储我个人觉得比上面的RocksDB要更加正统一点,不依赖其他的中间件就可以完成,可用性自然也就更高,当然阿里云的RocketMQ具体怎么实现的这个两种方案都有可能。
在社区中也有很多公司使用的Redis做的延时消息,在Redis中有一个数据结构是Zest,也就是有序集合,他可以实现类似我们的优先级队列的功能,同样的他也是堆结构,所以插入算法复杂度依然是O(logN),但是由于Redis足够快,所以这一块可以忽略。(这块没有做对比的基准测试,只是猜测)。有同学会问,redis不是纯内存的k,v吗,同样的应该也会受到内存限制啊,为什么还会选择他呢?
其实在这个场景中,Redis是很容易水平扩展的当一个Redis内存不够,这里可以使用两个甚至更多,来满足我们的需要,redis延时消息的原理图(原图出自:https://www.cnblogs.com/lylife/p/7881950.html)如下:
我们怎么才能知道Delayed Queue中的消息到期了呢?这里有两种方法:
本文介绍了三种方式实现分布式延时消息,希望能在你实现自己的延迟消息的时候提供一点思路。总的来说可能前两种方法来说适用面更加广一点,毕竟在RocketMQ这些大型的消息队列中间件,还有一些其他的集成功能,比如顺序消息,事务消息等,延时消息可能更加倾向于是分布式消息队列中的一个功能,而不是作为一个独立的组件存在。当然其中还有一些细节并没有一一介绍,具体细节可以去参考QMQ和DDMQ的源码。
如果大家觉得这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O:
标签:默认 复杂度 中间 unicode caff read 增加 公司 还原
原文地址:https://blog.51cto.com/14980978/2544637