标签:code else ttl queue turn div component framework integer
目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:
在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:
在 RabbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,可以参考之前文章来了解:TTL、死信队列
在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ 根目录下的 plugins 下。延迟队列插件下载
首先我们创建交换机和消息队列,application.properties 中配置与上一篇文章相同。
1 import org.springframework.amqp.core.*; 2 import org.springframework.context.annotation.Bean; 3 import org.springframework.context.annotation.Configuration; 4 5 import java.util.HashMap; 6 import java.util.Map; 7 8 @Configuration 9 public class MQConfig { 10 11 public static final String LAZY_EXCHANGE = "Ex.LazyExchange"; 12 public static final String LAZY_QUEUE = "MQ.LazyQueue"; 13 public static final String LAZY_KEY = "lazy.#"; 14 15 @Bean 16 public TopicExchange lazyExchange(){ 17 //Map<String, Object> pros = new HashMap<>(); 18 //设置交换机支持延迟消息推送 19 //pros.put("x-delayed-message", "topic"); 20 TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros); 21 exchange.setDelayed(true); 22 return exchange; 23 } 24 25 @Bean 26 public Queue lazyQueue(){ 27 return new Queue(LAZY_QUEUE, true); 28 } 29 30 @Bean 31 public Binding lazyBinding(){ 32 return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY); 33 }
}
我们在 Exchange 的声明中可以设置exchange.setDelayed(true)
来开启延迟队列,也可以设置为以下内容传入交换机声明的方法中,因为第一种方式的底层就是通过这种方式来实现的。
发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数 new MessagePostProcessor()
是为了获得 Message
对象,因为需要借助 Message
对象的api 来设置延迟时间。
1 import com.anqi.mq.config.MQConfig; 2 import org.springframework.amqp.AmqpException; 3 import org.springframework.amqp.core.Message; 4 import org.springframework.amqp.core.MessageDeliveryMode; 5 import org.springframework.amqp.core.MessagePostProcessor; 6 import org.springframework.amqp.rabbit.connection.CorrelationData; 7 import org.springframework.amqp.rabbit.core.RabbitTemplate; 8 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.stereotype.Component; 10 11 import java.util.Date; 12 13 @Component 14 public class MQSender { 15 16 @Autowired 17 private RabbitTemplate rabbitTemplate; 18 19 //confirmCallback returnCallback 代码省略,请参照上一篇 20 21 public void sendLazy(Object message){ 22 rabbitTemplate.setMandatory(true); 23 rabbitTemplate.setConfirmCallback(confirmCallback); 24 rabbitTemplate.setReturnCallback(returnCallback); 25 //id + 时间戳 全局唯一 26 CorrelationData correlationData = new CorrelationData("12345678909"+new Date()); 27 28 //发送消息时指定 header 延迟时间 29 rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message, 30 new MessagePostProcessor() { 31 @Override 32 public Message postProcessMessage(Message message) throws AmqpException { 33 //设置消息持久化 34 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); 35 //message.getMessageProperties().setHeader("x-delay", "6000"); 36 message.getMessageProperties().setDelay(6000); 37 return message; 38 } 39 }, correlationData); 40 } 41 }
我们可以观察 setDelay(Integer i)
底层代码,也是在 header 中设置 x-delay。等同于我们手动设置 header
message.getMessageProperties().setHeader("x-delay", "6000");
消费端进行消费
标签:code else ttl queue turn div component framework integer
原文地址:https://www.cnblogs.com/azoveh/p/13444694.html