码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ总结

时间:2020-03-22 09:14:28      阅读:108      评论:0      收藏:0      [点我收藏+]

标签:hang   发送消息   new   code   订单   git   ttl   长度   删除   

1.RabbitMQ简介

  RabbitMQ是一个用Erlang语言实现了AMQP(Advanced Message Queuing Protocol)协议的消息队列服务。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

2.Rabbit基础概念

2.1 RabbitMQ模型

 

 

技术图片

Message (消息): 消息是不具名的,它由消息头和消息体组成。消息体是不透明的, 而消息头则由一系列可选属性组成,这些属性包括 routing-key (路由键)、 priority (相 对于其他消息的优先级〉、 delivery咀 od巳(指出该消息可能需要持久化存储)等。

Publisher (消息生产者): 一个向交换器发布消息的 客户端应用程序。

Exchange (交换器):用来接收生产者发送的消息,并将这些消息路由给服务器中的队列。

Queue (消息队列):用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一条消息可被投入一个或多个队列中 。消息一直在队列里面等待消费者连接到这个队列将其取走 。

Binding (绑定): 用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交 换器和消息队列连接起来的路由规则,所以可以将交换器理解成 一个由绑定构成的路
由表。

Connection (网络连接〉:比如一个TCP连接。

Channel (信道): 多路复用连接中的一条独立的双向数据流通道。信道是建立在真实 的TCP连接内的虚拟连接, AMQP命令都是通过信道发送出去的,不管是发布消息、 订阅队列还是接收消息,这些动作都是通过信道完成的。 因为对于操作系统来说, 建 立和销毁 TCP 连接都是非常昂贵的开销,所以引入了信道的概念,以复用一个 TCP 连接。
Consumer (消息消费者): 表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host (虚拟主机, 在 RabbitMQ 中 Qlj vhosD: 表示一批交换器 、 消息队列和相关 对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。本质上每个 vhost 就是一台缩小版的 RabbitMQ服务器,它拥有自己的队列、 交换器、绑定和权限机制。 vhost是 AMQP概念的基础,必须在连接时指定, RabbitMQ默认的 vhost是“/”。

Broker: 表示消息队列服务器实体。

3.入门示例

源码地址  https://github.com/chensanzui/rabbitmq-example

public class QuickStart {
    private static final String QUEUE_NAME ="queueName";
    private static final String ROUTEING_KEY = "routeingKey";
    public static void main(String[] args) throws Exception {
        Publisher(); // 推送消息
        Consumer(); // 消费消息
    }
    public static Connection createConntection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();
        return connection;
    }
    /**
     * 推送消息
     */
    public static void Publisher() throws Exception{
        // 创建一个连接
        Connection conn = createConntection();
        if (conn != null) {
            try {
                // 创建通道
                Channel channel = conn.createChannel();
                // 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String content = String.format("当前时间:%s", System.currentTimeMillis());
                // 发送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-routing headers,此属性为MessageProperties.PERSISTENT_TEXT_PLAIN用于设置纯文本消息存储到硬盘;参数四:消息主体】
                channel.basicPublish("", ROUTEING_KEY, null, content.getBytes("UTF-8"));
                System.out.println("已发送消息:" + content);
                // 关闭连接
                channel.close();
                conn.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 消费消息
     */
    public static void Consumer() throws Exception {
        // 创建一个连接
        Connection conn = createConntection();
        if (conn != null) {
            try {
                // 创建通道
                Channel channel = conn.createChannel();
                // 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);

                // 创建订阅器,并接受消息
                channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        // 队列名称
                        String routingKey = envelope.getRoutingKey();
                        // 内容类型
                        String contentType = properties.getContentType();
                        // 消息正文
                        String content = new String(body, "utf-8");
                        System.out.println("消息正文:" + content);
                        // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于index的消息】
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

4.理解Exchange

那么为什么我们需要 Exchange 而不是直接将消息发送至队列呢?

AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,将各个消息分发到相应的队列中。

技术图片

在实际应用中我们只需要定义好 Exchange 的路由策略,而生产者则不需要关心消息会发送到哪个 Queue 或被哪些 Consumer 消费。在这种模式下生产者只面向 Exchange 发布消息,消费者只面向 Queue 消费消息,Exchange 定义了消息路由到 Queue 的规则,将各个层面的消息传递隔离开,使每一层只需要关心自己面向的下一层,降低了整体的耦合度。

Exchange 收到消息时,他是如何知道需要发送至哪些 Queue 呢?这里就需要了解 Binding 和 RoutingKey 的概念:

Binding 表示 Exchange 与 Queue 之间的关系,我们也可以简单的认为队列对该交换机上的消息感兴趣,绑定可以附带一个额外的参数 RoutingKey。Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 Binding 做匹配,如果满足匹配,就往 Exchange 所绑定的 Queue 发送消息,这样就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue。RoutingKey 的意义依赖于交换机的类型。

4.1Exchang类型

direct:默认的交换器类型,如果路由键匹配的话,消息就投递到相应的队列。

font:fanout有别于direct交换器,fanout是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。

topic:Topic Exchange 和 Direct Exchange 类似,也需要通过 RoutingKey 来路由消息,区别在于Direct Exchange 对 RoutingKey 是精确匹配,而 Topic Exchange 支持模糊匹配。分别支持*和#通配符,*表示匹配一个单词,#则表示匹配没有或者多个单词。

headers:和direct交换器完全一致,但性能却很差,几乎用不到。

5.消息ACK机制

5.1什么是消息确认ACK。

  如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。这就是消息的ACK机制。
消息的ACK确认机制默认是打开的,消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。

5.2消息收到未确认会怎么样?

如果应用程序接收了消息,因为bug忘记确认接收的话,消息在队列的状态会从“Ready”变为“Unacked”,如果消息收到却未确认,Rabbit将不会再给这个应用程序发送更多的消息了,这是因为Rabbit认为你没有准备好接收下一条消息。此条消息会一直保持Unacked的状态,直到你确认了消息,或者断开与Rabbit的连接,Rabbit会自动把消息改完Ready状态,分发给其他订阅者。 当然你可以利用这一点,让你的程序延迟确认该消息,直到你的程序处理完相应的业务逻辑,这样可以有效的防治Rabbit给你过多的消息,导致程序崩溃。

5.3消息拒绝

消息在确认之前,可以有两个选择:

选择1:断开与Rabbit的连接,这样Rabbit会重新把消息分派给另一个消费者;

选择2:拒绝Rabbit发送的消息使用channel.basicReject(long deliveryTag, boolean requeue),参数1:消息的id;参数2:处理消息的方式,如果是true,Rabbib会重新分配这个消息给其他订阅者,如果设置成false的话,Rabbit会把消息发送到一个特殊的“死信”队列,用来存放被拒绝而不重新放入队列的消息。

6.事物消息和和Confirm发送方消息

正常情况下,如果消息经过交换器进入队列就可以完成消息的持久化,但如果消息在没有到达broker之前出现意外,那就造成消息丢失,通过事物消息和Confirm可以解决这个问题。其中事物消息有严重的性能问题。一般选择confirm发送方消息方式。

6.1事物消息

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

  1. channel.txSelect()声明启动事务模式;

  2. channel.txComment()提交事务;

  3. channel.txRollback()回滚事务;

  try {
            long start=System.currentTimeMillis();
            int max = 10000;
            for(int i=0;i<max;i++) {
                //声明事物
                channel.txSelect();
                //发布消息
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                System.out.println("发送消息:" + message);
                //提交消息
               channel.txCommit();
            }
            long end = System.currentTimeMillis();
            System.out.println("发送"+max+"事物模式消息,耗时为:"+(end-start));

        } catch (Exception e) {
            //回滚消息
            channel.txRollback();
            e.printStackTrace();
        }

6.2confirm发送方确认模式

Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。

Confirm的三种实现方式:

方式一:channel.waitForConfirms()普通发送方确认模式;

方式二:channel.addConfirmListener()异步监听发送方确认模式;

方式三:channel.waitForConfirmsOrDie()批量确认模式;

6.2.1普通发送方确认模式

        try {
            long start=System.currentTimeMillis();
            int max = 10000;
            //开启发送方确认模式
            channel.confirmSelect();
            for(int i=0;i<max;i++) {
                //发布消息
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                if(channel.waitForConfirms()) {
                    System.out.println("发送消息:" + message);
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("发送"+max+"发送方确认模式消息,耗时为:"+(end-start));

        } catch (Exception e) {
            e.printStackTrace();
        }

6.2.2 异步监听模式

        try {
            long start=System.currentTimeMillis();
            int max = 10000;
            //开启发送方确认模式
            channel.confirmSelect();
            for(int i=0;i<max;i++) {
                //发布消息
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
            }
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("未确认消息,标识:" + deliveryTag);
                }
            });
            long end = System.currentTimeMillis();
            System.out.println("发送"+max+"发送方确认模式消息,耗时为:"+(end-start));
            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        }

6.2.3 批量确认模式

try {
            long start=System.currentTimeMillis();
            int max = 10000;
            //开启发送方确认模式
            channel.confirmSelect();
            for(int i=0;i<max;i++) {
                //发布消息
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                System.out.println("发送消息:" + message);

            }
            long end = System.currentTimeMillis();
            //直到所有信息都发布,只要有一个未确认就会IOException
            channel.waitForConfirmsOrDie();
            System.out.println("发送"+max+"批量确认模式消息,耗时为:"+(end-start));

        } catch (Exception e) {
            e.printStackTrace();
        }

7.消息的持久化

持久化是为提高rabbitmq消息的可靠性,防止在异常情况(重启,关闭,宕机)下数据的丢失,rabbitmq持久化分为三个部分: 交换器的持久化、队列的持久化和消息的持久化。

7.1交换器的持久化

交换器的持久化是通过声明队列时,将durable参数设置为true实现的。如果交换器不设置持久化,那么rabbitmq服务重启之后,相关的交换器元数据将会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了,建议将交换器设置为持久化。

queueDeclare(queue, durable, exclusive, autoDelete,  arguments)
channel.queueDeclare(q_name, true, false, false, map);

7.2队列的持久化

队列的持久化是通过声明队列时,将durable参数设置为true实现的。如果队列不设置持久化,那么rabbitmq服务重启之后,相关的队列元数据将会丢失,而消息是存储在队列中的,所以队列中的消息也会被丢失

7.3消息的持久化

队列的持久化只能保证其队列本身的元数据不会被丢失,但是不能保证消息不会被丢失。所以消息本身也需要被持久化,可以在投递消息前设置AMQP.BasicProperties的属性deliveryMode为2即可:

AMQP.BasicProperties low = new AMQP.BasicProperties
                .Builder()
                .deliveryMode(2)
                .build();

8.死信队列

当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:

1.消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))
3。队列达到最大长度
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理。

9.mandatory和immediate

mandatory
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。

immediate
当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

10.惰性队列

RabbitMQ从3.6.0版本开始引入了惰性队列(Lazy Queue)的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。

惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的,这样可以减少了内存的消耗,但是会增加I/O的使用,如果消息是持久化的,那么这样的I/O操作不可避免,惰性队列和持久化消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。

11.消息丢失

技术图片

生产者弄丢了数据

生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit

但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。

所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

事务机制和 cnofirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

RabbitMQ 弄丢了数据

就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

设置持久化有两个步骤:

  • 创建 queue 的时候将其设置为持久化

    这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2

    就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。

所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。

消费端弄丢了数据

RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。

这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

技术图片

12.如何保证消息的顺序性

一个 queue对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

13.怎么保证消息队列消费的幂等性?(如何保证消息不被重复消费)

主要是结合业务来思考,我这里给几个思路:
比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

14.如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

大量消息在 mq 里积压了几个小时了还没解决

几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上 11 点多。这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。

一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。

一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
mq 中的消息过期失效了

假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。

这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。

假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

mq 都快写满了

如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

参考资料:

https://www.cnblogs.com/vipstone/p/9350075.html

https://blog.csdn.net/u013256816/article/details/54743481

https://blog.csdn.net/Iperishing/article/details/86674488

 

RabbitMQ总结

标签:hang   发送消息   new   code   订单   git   ttl   长度   删除   

原文地址:https://www.cnblogs.com/chensanzui/p/12543803.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!