标签:body 需求 前缀 dbi 回调 region ado 分析 生效
15分钟内未支付的订单过期失效。
这个太简单了,就是在查询的时候判断是否失效,如果失效了就给他设置失效状态。但是弊端也很明显,每次查询都要对未失效的订单做判断,如果用户不查询,订单就不失效,那么如果有类似统计失效状态个数的功能,将会受到影响,所以只能适用于简单独立的场景。简直low爆了。
这种是常见的方法,利用一个定时器,在设置的周期内轮询检查并处理需要过期的订单。
具体实现有基于Timer
的,有基于Quartz
,还有Scheduler
,实现起来比较简单。
过期时间加索引,然后定时任务去处理,每次更新固定条数就好了。比如定时脚本每次启动,根据条件去查询,每次查1000条,更新状态为过期未付款,如果结果不足1000条或是没有结果,就说明不用查下一次了。
弊端
1.基于MQ实现 阿里的RocketMQ是支持延迟消息
2.基于Redis实现 参考https://blog.csdn.net/yinpeng186/article/details/104193976
顾名思义,首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列多用于需要延时工作的场景。最常见的是以下场景:
延迟消费,比如:
1 ,订单成功后,在 30 分钟内没有支付,自动取消订单
2 ,如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
3 ,支付成功后, 2 秒后查询支付结果
将未支付的订单放到一个有序的队列中,程序会自动依次取出过期的订单。
如果当前没有过期的订单,就会阻塞,直至有过期的订单。由于每次只处理过期的订单,并且处理的时间也很精准,不存在定时调度方案的那两个弊端。
由于.net没有DelayQueue 病毒哥写了一个 https://github.com/virus611/TimeQueue
java处理方案
实现:
1.首先创建一个订单类OrderDelayDto
需要实现Delayed
接口。然后重写getDelay()
方法和compareTo()
方法,只加了订单编号和过期时间两个属性。
这两个方法很重要,getDelay()
方法实现过期的策略,比如,订单的过期时间等于当前时间就是过期,返回负数就代表需要处理。否则不处理。compareTo()
方法实现订单在队列中的排序规则,这样即使后面加入的订单,也能加入到排序中,我这里写的规则是按照过期时间排序,最先过期的排到最前面,这一点很重要,因为排在最前面的如果没有被处理,就会进入阻塞状态,后面的不会被处理。
import lombok.Data; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @author mashu * Date 2020/5/17 16:25 */ @Data public class OrderDelayDto implements Delayed { /** * 订单编号 */ private String orderCode; /** * 过期时间 */ private Date expirationTime; /** * 判断过期的策略:过期时间大于等于当前时间就算过期 * * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expirationTime.getTime() - System.currentTimeMillis(), TimeUnit.NANOSECONDS); } /** * 订单加入队列的排序规则 * * @param o * @return */ @Override public int compareTo(Delayed o) { OrderDelayDto orderDelayDto = (OrderDelayDto) o; long time = orderDelayDto.getExpirationTime().getTime(); long time1 = this.getExpirationTime().getTime(); return time == time1 ? 0 : time < time1 ? 1 : -1; } }
写个main 方法测试一下,创建两个订单o1和o2,放入到延时队列中,然后while()方法不断的去取。
在此方法内通过队列的take()
方法获得已过期的订单,然后做出相应的处理。
public static void main(String[] args) { DelayQueue<OrderDelayDto> queue = new DelayQueue<>(); OrderDelayDto o1 = new OrderDelayDto(); //第一个订单,过期时间设置为一分钟后 o1.setOrderCode("1001"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 1); o1.setExpirationTime(calendar.getTime()); OrderDelayDto o2 = new OrderDelayDto(); //第二个订单,过期时间设置为现在 o2.setOrderCode("1002"); o2.setExpirationTime(new Date()); //往队列中放入数据 queue.offer(o1); queue.offer(o2); // 延时队列 while (true) { try { OrderDelayDto take = queue.take(); System.out.println("订单编号:" + take.getOrderCode() + " 过期时间:" + take.getExpirationTime()); } catch (InterruptedException e) { e.printStackTrace(); } } }
我故意把第二个订单的过期时间设置为第一个订单之前,从结果可以看出,他们已经自动排序把最先过期的排到了最前面。
第一个订单的失效时间是当前时间的后一分钟,结果也显示一分钟后处理了第一条订单。
2.然而通常情况下,我们会使用多线程去取延时队列中的数据,这样即使线程启动之后也能动态的向队列中添加订单。
创建一个线程类OrderCheckScheduler
实现Runnable
接口,
添加一个延时队列属性,重写run()
方法,在此方法内通过队列的take()
方法获得已过期的订单,然后做出相应的处理
import java.util.concurrent.DelayQueue; /** * @author mashu * Date 2020/5/17 14:27 */ public class OrderCheckScheduler implements Runnable { // 延时队列 private DelayQueue<OrderDelayDto> queue; public OrderCheckScheduler(DelayQueue<OrderDelayDto> queue) { this.queue = queue; } @Override public void run() { while (true) { try { OrderDelayDto take = queue.take(); System.out.println("订单编号:" + take.getOrderCode() + " 过期时间:" + take.getExpirationTime()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
好了,写个方法测试一下:
public static void main(String[] args) { // 创建延时队列 DelayQueue<OrderDelayDto> queue = new DelayQueue<>(); OrderDelayDto o1 = new OrderDelayDto(); //第一个订单,过期时间设置为一分钟后 o1.setOrderCode("1001"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 1); o1.setExpirationTime(calendar.getTime()); OrderDelayDto o2 = new OrderDelayDto(); //第二个订单,过期时间设置为现在 o2.setOrderCode("1002"); o2.setExpirationTime(new Date()); //运行线程 ExecutorService exec = Executors.newFixedThreadPool(1); exec.execute(new OrderCheckScheduler(queue)); //往队列中放入数据 queue.offer(o1); queue.offer(o2); exec.shutdown(); }
结果和上面的一样
基于redis的过期提醒功能,听名字就知道这个方案最是纯真、最直接的,就是单纯处理过期的订单。
修改个redis的配置吧先,因为redis默认不开启过期提醒。notify-keyspace-events
改为notify-keyspace-events "Ex"
1.支付业务中,未支付订单自动关闭 2.缓存过期提醒
一般情况下,对于未支付订单自动关闭的处理中,我们可以使用定时服务来实现,比如每分钟调用一次接口来处理未支付并且已经过期的订单,但是这样的话,一是会比较损耗计算机性能,即使没有订单的时候也会每分钟进行处理,二是订单处理时间的精确度最大延迟会是 59s ,而且也要保证定时服务一直可用
那我们想只针对当有未支付并过期的订单并且低延迟处理要怎么办呢,我们可以通过 redis 的缓存过期机制来进行订阅推送处理
一 .Redis配置
原理:利用 Keyspace Notifications 功能,允许用户订阅 Sub/Pub 频道 ,当 key键 过期的时候触发事件通知,故需要订阅 _keyevent@0_:expired( 通道0表示 db0,可根据 dbindex 选择合适的值)
实现:将配置文件进行修改
取消 notify-keyspace-events Ex 的注释,对 notify-keyspace-events “” 进行注释
(Ex 为配置参数,意思是监听某个key的失效事件,可参考如下表格参数说明,其中,参数中至少要有一个 K 或者 E , 否则的话, 不管其余的参数是什么, 都不会有任何通知被分发)
字符 | 发送通知 |
---|---|
K | 键空间通知,所有通知以 keyspace@ 为前缀,针对Key |
E | 键事件通知,所有通知以 keyevent@ 为前缀,针对event |
g | DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知 |
$ | 字符串命令的通知 |
l | 列表命令的通知 |
s | 集合命令的通知 |
h | 哈希命令的通知 |
z | 有序集合命令的通知 |
x | 过期事件:每当有过期键被删除时发送 |
e | 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送 |
A | 参数 g$lshzxe 的别名,相当于是All |
二.程序实现
redis 数据库初始化
private static RedisHelper redisHelper1= new RedisHelper(0); private static RedisHelper redisHelper2= new RedisHelper(1);
添加有过期时间的数据
public string AddData() { try { for (int i = 1; i <= 5; i++) { //在 redis db0 中添加五条数据,过期时间为10s redisHelper1.StringSet("order_" + i, i, TimeSpan.FromSeconds(10)); } return "ok"; } catch (Exception e) { return e.Message; } }
过期数据通知处理
public static void OrderData() { try { //监听过期Key键,固定值。 const string ChannelName = "__keyevent@0__:expired"; //进行对 redis db0 订阅,并获取 Key 的值,但是无法监听到 Key 所对应的 Value 值,所以需要将必要数据作为 key 进行存入。 redisHelper1.Subscribe(ChannelName, (channel, key) => { //将 Key 值写入 redis db1 中作为生效依据 redisHelper2.StringSet(key, ""); }); } catch (Exception) { throw; } }
为了保证 OrderData 方法在web程序启动的时候就进行处理,我们将该方法在 Global.asax 进行调用
最后,我们调用 AddData 接口看看效果
数据失效前效果:
数据失效后效果:
不足:
Redis pub/sub 是一种并不可靠的消息机制,他不会做信息的存储,只是在线转发,肯定也没有 ack 确认机制,另外只有订阅段监听才会转发,所以 Keyspace Notifications 也是不可靠的通知系统,如果我们的业务是需要很好的可靠性,那么这还重方式就不是最好的选择。一般我们更加推荐 RabbitMQ 的 DLX(Dead-Letter-Exchange) 来实现,也就是延迟队列功能。只不过 redis 的这种方案更加容易实现,操作成本较低。对于可靠性要求不高的业务还是很方便。
用户前台下单之后,如果用户未支付,30分钟后订单会自动取消,订单状态和库存变回原来状态和库存
简单的逻辑就是 用户下单会把一条消息插入生产队列中,当然消息队列的配置是30分钟,30分钟之内如果用户支付,就会调用消费者接口,将消息消费掉,如果30分钟没有支付,超时消息会到死信队列中,然后后台任务会检查到死信队列中的消息,将消息消费掉,过程中会改订单状态等
简单代码
//声明队列 channel.QueueDeclare ( queue: QueueName, //队列名称 durable: false, //队列是否持久化.false:队列在内存中,服务器挂掉后,队列就没了;true:服务器重启后,队列将会重新生成.注意:只是队列持久化,不代表队列中的消息持久化!!!! exclusive: false, //队列是否专属,专属的范围针对的是连接,也就是说,一个连接下面的多个信道是可见的.对于其他连接是不可见的.连接断开后,该队列会被删除.注意,不是信道断开,是连接断开.并且,就算设置成了持久化,也会删除. autoDelete: true, //如果所有消费者都断开连接了,是否自动删除.如果还没有消费者从该队列获取过消息或者监听该队列,那么该队列不会删除.只有在有消费者从该队列获取过消息后,该队列才有可能自动删除(当所有消费者都断开连接,不管消息是否获取完) arguments: null //队列的配置 );
1 //加载消息队列(订单超时) 2 //定时任务触发器 3 services.AddHostedService<DeadListener>(); 4 或者 5 services.AddTransient<IHostedService, DeadListener>();
/// <summary> /// 订单超时未处理消息队列(生产者) /// </summary> /// <param name="routeKey"></param> /// <returns></returns> public Task PublisherOrder(string routeKey) { const string routingKeyDead = "queue-dead-routing-jd"; //死信队列路由 var routingKeyDelay = "queue-delay-" + routeKey;//消息队列路由 const string orderQueueName = "zzhelloJd"; //定义消息队列名 const string orderQueueDeadName = "zzhello_dead_Jd"; //定义一个死信消息队列名 var factory = new ConnectionFactory { UserName = _configuration["RabbitMQConfig:RabbitUserName"],//用户名 Password = _configuration["RabbitMQConfig:RabbitPassword"],//密码 HostName = _configuration["RabbitMQConfig:RabbitHost"],//rabbitmq ip }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //定义死信交换机 channel.ExchangeDeclare("exchange-D", ExchangeType.Direct, true, false, null); //创建一个名叫"zzhello_dead"的消息队列 channel.QueueDeclare(orderQueueDeadName, true, false, false, null); //将死信队列绑定到死信交换机 channel.QueueBind(orderQueueDeadName, "exchange-D", routingKeyDead); var dic = new Dictionary<string, object> { {"x-message-ttl", 1800000},//队列上消息过期时间,应小于队列过期时间 60000 1800000 //{"x-message-ttl", 120000},//队列上消息过期时间,应小于队列过期时间 60000 1800000 {"x-dead-letter-exchange", "exchange-D"},//过期消息转向路由 {"x-dead-letter-routing-key", routingKeyDead}//过期消息转向路由相匹配routingkey }; channel.ExchangeDeclare("exchange-L", ExchangeType.Direct, true, false, null);//定义一个Direct类型交换机 //创建一个名叫"zzhello"的消息队列 channel.QueueDeclare(orderQueueName, true, false, false, dic); //将队列绑定到交换机 channel.QueueBind(orderQueueName, "exchange-L", routingKeyDelay, dic); var body = Encoding.UTF8.GetBytes(routeKey.ToString()); //向该消息队列发送消息message channel.BasicPublish("exchange-L", routingKeyDelay, null, body); } } return Task.CompletedTask; }
/// <summary> /// 支付成功后处理消费者 /// </summary> /// <returns></returns> [Obsolete] public Task ConsumerOrder(string routeKey) { const string orderQueueName = "zzhelloJd"; //定义消息队列名 var routingKeyDelay = "queue-delay-" + routeKey;//消息队列路由 const string routingKeyDead = "queue-dead-routing-jd"; //死信队列路由 var factory = new ConnectionFactory { UserName = _configuration["RabbitMQConfig:RabbitUserName"],//用户名 Password = _configuration["RabbitMQConfig:RabbitPassword"],//密码 HostName = _configuration["RabbitMQConfig:RabbitHost"],//rabbitmq ip }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { var dic = new Dictionary<string, object> { {"x-message-ttl", 1800000},//队列上消息过期时间,应小于队列过期时间 60000 1800000 {"x-dead-letter-exchange", "exchange-D"},//过期消息转向路由 {"x-dead-letter-routing-key", routingKeyDead}//过期消息转向路由相匹配routingkey }; channel.ExchangeDeclare("exchange-L", ExchangeType.Direct, true, false, null);//定义一个Direct类型交换机 //创建一个名叫"zzhello"的消息队列 channel.QueueDeclare(orderQueueName, true, false, false, dic); //将队列绑定到交换机 channel.QueueBind(orderQueueName, "exchange-L", routingKeyDelay, dic); //回调,当consumer收到消息后会执行该函数 //var consumer = new EventingBasicConsumer(channel); //consumer.Received += (model, ea) => //{ // var body = ea.Body; // var message = Encoding.UTF8.GetString(body); //}; ////消费队列"hello"中的消息 //channel.BasicConsume(queue: name, // autoAck: true, // consumer: consumer); var consumer = new QueueingBasicConsumer(channel); //消费队列,并设置应答模式为程序主动应答 channel.BasicConsume(orderQueueName, false, consumer); //阻塞函数,获取队列中的消息 var ea = consumer.Queue.Dequeue(); var bytes = ea.Body; var str = Encoding.UTF8.GetString(bytes); Console.WriteLine("队列消息:" + str); //回复确认 channel.BasicAck(ea.DeliveryTag, false); } } return Task.CompletedTask; }
public class DeadListener : RabbitListener { #region Fileds // 因为Process函数是委托回调,直接将其他Service注入的话两者不在一个scope, // 这里要调用其他的Service实例只能用IServiceProvider CreateScope后获取实例对象 private readonly IServiceProvider _services; private readonly ILogger<RabbitListener> _logger; #endregion #region Ctors public DeadListener(IServiceProvider services, IConfiguration configuration, ILogger<RabbitListener> logger) : base(configuration) { RouteKey = "queue-dead-routing-jd"; QueueName = "zzhello_dead_Jd"; _logger = logger; _services = services; } #endregion #region Methods protected override bool Process(string message) { var taskMessage = message; if (taskMessage == null) { // 返回false 的时候回直接驳回此消息,表示处理不了 return false; } try { using (var scope = _services.CreateScope()) { var xxxService = scope.ServiceProvider.GetRequiredService<IOrderService>(); //_logger.LogInformation($"开始更新订单状态:UpdateOrderCancel,message:{message}"); //LoggerHelper.Write($"开始更新订单状态:UpdateOrderCancel,message:{message}"); var re= xxxService.UpdateOrderCancel(Guid.Parse(taskMessage)).Result; //_logger.LogInformation($"结束更新订单状态:UpdateOrderCancel,message:{message},result:{re}"); //LoggerHelper.Write($"结束更新订单状态:UpdateOrderCancel,message:{message},result:{re}"); if (re) { return true; } else { return false; } } } catch (Exception ex) { _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}"); _logger.LogError(-1, ex, "Process fail"); LoggerHelper.Write($"DeadListener 自动更新订单状态报错,错误提示 :{ex}"); return false; } } #endregion } public class RabbitListener : IHostedService { private readonly IConnection _connection; private readonly IModel _channel; protected RabbitListener(IConfiguration configuration) { try { var factory = new ConnectionFactory { // 这是我这边的配置,自己改成自己用就好 UserName = configuration["RabbitMQConfig:RabbitUserName"],//用户名 Password = configuration["RabbitMQConfig:RabbitPassword"],//密码 HostName = configuration["RabbitMQConfig:RabbitHost"]//rabbitmq ip //Port = options.Value.RabbitPort, }; _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); } catch (Exception ex) { Console.WriteLine($"RabbitListener init error,ex:{ex.Message}"); } } public Task StartAsync(CancellationToken cancellationToken) { Register(); return Task.CompletedTask; } protected string RouteKey; protected string QueueName; // 处理消息的方法 protected virtual bool Process(string message) { throw new NotImplementedException(); } // 注册消费者监听在这里 private void Register() { Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}"); // channel.ExchangeDeclare(exchange: "exchange-D", type: "topic"); _channel.ExchangeDeclare("exchange-D", ExchangeType.Direct, true, false, null); _channel.QueueDeclare(QueueName, true, false, false, null); _channel.QueueBind(QueueName, "exchange-D", RouteKey); //启用QoS,每次预取10条,避免消费不过来导致消息堆积在本地缓存 _channel.BasicQos(0, 10, false); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var result = Process(message); if (result) { _channel.BasicAck(ea.DeliveryTag, false);//启用手动ack机制后,没有及时ack导致的队列异常(Unacked过多) } else { _channel.BasicNack(ea.DeliveryTag, false, true);// 启用nack+重入队 机制后,导致的死循环(Ready过多) } }; _channel.BasicConsume(queue: QueueName, consumer: consumer); } public void DeRegister() { _connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { _connection.Close(); return Task.CompletedTask; } }
在这里我先说一下我遇到的问题吧!不知道什么原因会产生异常消息,也就是业务失败产生的unasked消息,这个问题该如何处理
处理方式是启用nack+重入队 机制后,但是这种方式会 导致的死循环(Ready过多),所以要启用Qos和ack机制后,没有及时ack导致的队列堵塞
启用QoS,每次预取5条消息,避免消息处理不过来,全部堆积在本地缓存里
channel.BasicQos(0, 5, false);
开启QoS,当RabbitMQ的队列达到5条Unacked消息时,不会再推送消息给Consumer;
这样问题就解决了!!!!!
标签:body 需求 前缀 dbi 回调 region ado 分析 生效
原文地址:https://www.cnblogs.com/netlock/p/14254720.html