标签:开始 temp pytho 故障 高并发 技术 bool 成功 通信
在消息队列选型时,我们调研了市场上比较常用ActiveMQ,RabbitMQ,RocketMQ,Kafka。
优点有:
缺点有:
这种模式下一条消息只能由一个消费者进行消费,默认情况下,每个消费者是轮询消费的。
这种模型中生产者发送的消息所有消费者都可以消费。
这种模型消费者发送的消息,不同类型的消息可以由不同的消费者去消费。
这种模型和direct模型一样,都是可以根据routing key将消息路由到不同的队列,只不过这种模型可以让队列绑定routing key 的时候使用通配符。这种类型的routing key都是由一个或多个单词组成,多个单词之间用.
分割。
通配符介绍:
*
:只匹配一个单词
#
:匹配一个或多个单词
一条消息从生产到消费经历了三个阶段,分别是生产者,MQ和消费者,对于RabbitMQ来说,消息的传递还涉及到交换机。因此RabbitMQ出现消息丢失的情况有四个
分别是
针对上面提到的四种情况,分别进行处理
# 开启生产者确认机制,
# 注意这里确认的是是否到达交换机
spring.rabbitmq.publisher-confirm-type=correlated
@RestController
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("send")
public void sendMessage(){
/**
* 生产者确认消息
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(correlationData);
System.out.println(ack);
System.out.println(cause);
}
});
rabbitTemplate.convertAndSend("s","error","这是一条错误日志!!!");
}
}
spring.rabbitmq.publisher-returns=true
@RestController
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("send")
public void sendMessage(){
/**
* 消息未达队列时返回该条消息
*/
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println(returnedMessage);
}
});
rabbitTemplate.convertAndSend("s","error","这是一条错误日志!!!");
}
}
/**
* 定义一个持久化的topic交换机
* durable 持久化
* @return
*/
@Bean
public Exchange exchangeJavatrip(){
return ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build();
}
/**
* 定义一个持久化的队列
* durable 持久化
* @return
*/
@Bean
public Queue queueJavatrip(){
return QueueBuilder.durable(QUEUE).build();
}
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = MqConfig.QUEUE)
public void receive(String body, Message message, Channel channel) throws Exception{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println(deliveryTag);
// 系统业务逻辑判断是否签收
if(deliveryTag % 2 == 0){
channel.basicAck(deliveryTag,false);
}else{
// 第二个参数是否批量确认,第三个参数是否重新回队列
channel.basicNack(deliveryTag,false,true);
}
}
消息重复的原因有两个:
生产时消息重复
由于生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ已经接收到了消息。这时候生产者就会重新发送一遍这条消息。
消费时消息重复。
消费者消费成功后,在给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。
由于消息重复是网络波动等原因造成的,无法避免,我们能做的的就是保证消息的幂等性,以防业务重复处理。具体处理方案为:
让每个消息携带一个全局的唯一ID,即可保证消息的幂等性,具体消费过程为:
@RabbitListener(queues = MqConfig.QUEUE)
public void receive(Message message, Channel channel){
String messageId = message.getMessageProperties().getMessageId();
String body = new String(message.getBody());
String redisId = redisTemplate.opsForValue().get(messageId)+"";
// 如果redis中存有当前消息的消息id
// 则证明消费过
if(messageId.equals(redisId)){
return;
}
redisTemplate.opsForValue().set(messageId, UUID.randomUUID());
}
消息堆积的原因有两个
解决方案如下:
当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信。
当消息在一个队列中变成一个死信之后,如果配置了死信队列,它将被重新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。
一条消息成为死信后,一般会通过死信队列进行存库,然后定时将库中的死信进行重新投递到消息队列上。
RabbitMQ可以使用死信队列来实现延时消费,用户下单之后,将订单信息投递到消息队列中,并且设置消息过期时常为30分钟。如果用户支付则正常关闭订单,如果用户未支付,消息达到过期时间,消息会进入死信交换,由消费者进行消费死信队列来关闭订单。
RabbitMQ有两种集群模式,分别是普通集群和镜像集群,普通模式无法保证RabbitMQ的高可用。
假如有三个节点,rabbitmq1、rabbitmq2、rabbitmq3,消息实际上只存在于其中一个节点,三个节点仅有相同的元数据,即队列的结构,当消息进入rabbitmq2节点的queue后,consumer从rabbitmq1的节点进行消费,rabbitmq1和rabbitmq2会进行临时通信,从rabbitmq2中获取消息然后返回给consumer。
这种模式存在以下两个问题:
当rabbitmq2宕机后,消息无法正常消费,没有做到真正的高可用
实际数据还是在单个实例上,存在瓶颈问题
假如有三个节点,rabbitmq1、rabbitmq2、rabbitmq3,每个实例之间都可以相互通信,每次生产者写消息到queue的时候,每个rabbitmq节点上都有queue的消息数据和元数据。这种模式使用于可靠性要求较高的场景。
如果觉得文章不错,欢迎关注、点赞、收藏,你们的支持是我创作的动力,感谢大家。
如果文章写的有问题,请不要吝惜文笔,欢迎留言指出,我会及时核查修改。
如果你还想看到更多别的东西,可以微信搜索「Java旅途」进行关注。回复“手册”领取Java面试手册!
标签:开始 temp pytho 故障 高并发 技术 bool 成功 通信
原文地址:https://www.cnblogs.com/zhixie/p/14918619.html