标签:amp nbsp 分离 异步 gem dea 好的 map value
3.1、Rabbitmq 延时队列
3.2、DelayQueue 延时队列
/**
* 实现 Delayed 定义延时队列
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Sequence implements Delayed {
private Long time;
private String name;
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
}
3.3、Scala 的 Await & Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
object test extends App {
val task = Future{ doSomething() }
Await.result(task, 5 seconds)
}
3.4、Redis 延迟队列
4.1、使用 sortedset 操作元素
4.2、Redis 实现方式
使用sortedset,用时间戳作为score,使用zadd key score1 value1
命令生产消息,使用zrangebysocre key min max withscores limit 0 1消费消息最早的一条消息。
这里选用 Redis 主要的原因就是其支持高性能的 score 排序,同时 Redis 的持久化 bgsave 特性,保证了消息的消费和存贮问题。bgsave 的原理是 fork 和 cow。fork 是指 Redis 通过创建子进程来进行 bgsave 操作, cow 指的是copy on write, 子进程创建后, 父进程通过共享数据段, 父进程继续提供读写服务, 写脏的页面数据会逐渐和子进程分离开来。
4.3、ACK
队列最重要的就是保证消息被成功消费,这里也不可避免的需要考虑这个问题。
前者只需要在业务中处理消费异常的情况,后者则需要维护两个队列。
4.4、多实例问题
多实例是指同一个服务部署在不同的地方,发挥相同的作用,此时就会导致同时消费同一个消息的问题。
一般情况下解决此类问题就需要考虑接入外部应用的辅助。常见的分布式锁的方案有:基于数据库实现分布式锁、基于缓存实现分布式锁、基于 Zookeeper 实现分布式锁,这里使用缓存也就是 Redis 解决问题。
使用 Redis 实现的队列具有很好的扩展性,可以很便捷的应对需求的变更和业务的扩展,但是对于简单的场景直接使用定时任务会更加容易。在有大量的定时任务需要实现的时候,就可以考虑使用延迟队列去实现,让代码更具有扩展性。
Redis 作为消息队列的局限性很大,实现 ack 机制的成本相对较高,然而他的轻量级的特性以及兼容很多的数据结构,Redis 成熟的分布式、持久化、集群等技术体系,让他可以实现一些轻量级的队列。总之没有最好的技术,只有最好的 developer。
5.RabbitMQ(三)RabbitMQ消息过期时间(TTL)
出处:https://zhuanlan.zhihu.com/p/87113913
=================================================================================================================================================
另一篇比较好的博文
在工作中想实现一个延迟功能,一般会借助rocketmq或者kafka的延迟队列功能来实现,但是这俩个消息中间件都有一个弊端,就是很难支持任意时间段的延迟,所以我想借助redis实现一个任意时间段的延迟功能
总体架构图
上述图主要分为5个模块
1 路由模块,为了支持分布式部署,每接收一个延迟消息,都为这个消息生成一个全局唯一的消息ID,根据消息ID和路由算法,决定把延迟消息的消息ID加入到对应的redis.sortSet队列中
2 消息存储,所有的消息元数据存储采用redis.hashmap结构,key为生成的全局消息ID(可以加一个前缀),value为消息的JSON格式,例如{"ttl":600,"topic":"XXX".......}
3 延迟队列,所有的消息的延迟存储在redis.sortSet中,sortSet中的每一个对象为全局生成的消息ID,score为到期时间时间戳
4 定时扫描timer,轮训redis.sortSet队列,使用ZRANGEBYSCORE命令,获取score小于等于当前时间的所有消息ID,然后根据消息ID查询redis.hashmap中的消息元数据,然后根据业务发送到对应的topic下
5 消息中间件,借助消息中间件,订阅者直接消费消息,达到延迟的功能
总结
上述只是整体的罗列了一下借助redis怎么实现任意时间段延迟的功能,一些细节没有详细说明,如果想实现一个比较完美的延迟功能,需要考虑以下几点
a 消息的发送失败如果处理
b redis操作没有事物保证
c 怎么保证hashmap和sortSet中的数据一致性
d 如果消息量大了怎么进行动态的扩展
标签:amp nbsp 分离 异步 gem dea 好的 map value
原文地址:https://www.cnblogs.com/guanbin-529/p/12990114.html