标签:方式 row cache opera dma 监听器 route 方便 讲解
摘要: 理解 Ack,设置为手动 Ack,如何在异常时,进行数据回返,我们再次不理解基础的发送和接受的功能,官网的实例已经很满足学习的要求了,其实在队列的配置中,最复杂的也就是消费者的逻辑,我这边讲解的适用于开发大型网站,对数据的处理要非常的谨慎的,如果是简单学习,不建议看。
首先啊,有的人不是太理解这个Ack是什么,讲的接地气一点,其实就是一个通知,怎么说呢,当我监听消费者,正常情况下,不会出异常,但是如果是出现了异常,甚至是没有获取的异常,那是不是这条数据就会作废,但是我们肯定不希望这样的情况出现,我们想要的是,如果在出现异常的时候,我们识别到,如果确实是一个不良异常,肯定希望数据重新返回队列中,再次执行我们的业务逻辑代码,此时我就需要一个Ack的通知,告诉队列服务,我是否已经成功处理了这条数据,而如果不配置Ack的话呢,我测试过他会自动的忽略,也就是说此时的服务是no_ack=true的模式,就是说只要我发现你是消费了这个数据,至于异常不异常的,我不管了。通知Ack机制就是这么来的,更加灵活的,我们需要Ack不自动,而是手动,这样做的好处,就是使得我们开发人员更加人性化或者灵活的来处理我们的业务罗杰代码,更加方便的处理异常的问题以及数据的返回处理等。下面是通话机制的四条原则:
A. 在消费者端的mq配置文件上添加,配置 关键代码为 acknowledeg = "manual",意为表示该消费者的ack方式为手动(此时的queue已经和生产者的exchange通过某个routeKey绑定了)
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener queues="queue_xxx" ref="MqConsumer"/>
<rabbit:listener queues="queue_xxx" ref="MqConsumer2"/>
</rabbit:listener-container>
B. 新建一个类 MqConsumer ,并实现接口 ChannelAwareMessageListener ,实现onMessage方法,不需要指定方法。
springAMQP中已经实现了一个功能,如果该监听器已经实现了下面2个接口,则直接调用onMessage方法
C. 关键点在实现了ChannelAwareMessageListener的onMessage方法后,会有2个参数。
一个是message(消息实体),一个是channel就是当前的通道,很多地方都没有说清楚怎么去手动ack,其实手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
其中deliveryTag是tag的id,由生产者生成。第二个参数我其实也没理解用途,暂时还没有模拟出场景,所以先不讨论。
同样的,如果要Nack或者拒绝消息(reject)的时候,也是调用channel里面的basicXXX方法就可以了(当然要制定tagId)。注意如果抛异常或Nack(并且requeue为true),消息会一直重新入队列,一不小心就会重复一大堆消息不断出现~。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,并重新回到队列,api里面解释得很清楚
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
D. 针对上面所描述的情况,我们在搭建一个消息队列时候,我们的思路应该是这样的,首先,我们要启动ack的手动方式,紧接着,我们处理代码逻辑,如果发生了异常信息,我们首先通知到ack,我已经表示接受到这条数据了,你可以进行删除了,不需要让他自动的重新进入队列中,然后,我们启用一个错误处理,手动将其重新插入队列中,在此之前,有几个类和Api一起来看一下。
1. SimpleMessageListenerContainer
这个是我们的基础监听,他的作用就是队列的总监听,可以为其配置ack模式,异常处理类等。。
2. org.springframework.amqp.support.converter.SimpleMessageConverter
这个类和下面的Converter类很容易搞混淆,这个类的作用是可以解析队列中的 message 转 obj
3. org.springframework.amqp.rabbit.retry.MessageRecoverer
这个接口,需要我们开发者自定义实现,其中的一个方法recover(Message message, Throwable cause),就可以看出来他是干嘛的,就是说在监听出错,也就是没有抓取的异常而是抛出的异常会触发该方法,我们就会在这个接口的实现中,将消息重新入队列
4. org.springframework.util.ErrorHandler
这个接口也是在出现异常时候,会触发他的方法
E. 完整实例
1. spring配置队列xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<!-- 连接服务配置 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
password="${rabbitmq.password}" channel-cache-size="${rabbitmq.channel.cache.size}" />
<!-- 设置Ack模式为手动 -->
<bean id="ackManual"
class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">