码迷,mamicode.com
首页 > 其他好文 > 详细

RocketMQ概览

时间:2021-04-19 14:11:21      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:开始   直接   conf   异步   更新   https   注意   进制   failed   

核心概念

Topic:消息主题,一级消息类型,通过Topic对消息进行分类。

Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类。

Message ID:消息的全局唯一标识,由消息队列RocketMQ版系统自动生成,唯一标识某条消息。

Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。

Producer:消息生产者,也称为消息发布者,负责生产并发送消息。

Consumer:消息消费者,也称为消息订阅者,负责接收并消费消息。可分为两类:

  • Push Consumer:消息由消息队列RocketMQ版推送至Consumer。
  • Pull Consumer:该类Consumer主动从消息队列RocketMQ版拉取消息。目前仅TCP Java SDK支持该类Consumer。

Group:一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。

                 例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。

 

广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。

                 例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。

 

消息收发模型

消息队列RocketMQ版支持发布和订阅模型,消息生产者应用创建Topic并将消息发送到Topic。消费者应用创建对Topic的订阅以便从其接收消息。通信可以是一对多(扇出)、多对一(扇入)和多对多。

技术图片

生产者集群:用来表示发送消息应用,一个生产者集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。

一个生产者集群可以发送多个Topic消息。发送分布式事务消息时,如果生产者中途意外宕机,消息队列RocketMQ版服务端会主动回调生产者集群的任意一台机器来确认事务状态。

 

消费者集群:用来表示消费消息应用,一个消费者集群下包含多个消费者实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个消费者对象。

一个消费者集群下的多个消费者以均摊方式消费消息。如果设置的是广播方式,那么这个消费者集群下的每个实例都消费全量数据。

一个消费者集群对应一个Group ID,一个Group ID可以订阅多个Topic

 

集群消费和广播消费

消息队列RocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订阅方订阅关注的Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列RocketMQ版约定以下概念:

集群:使用相同Group ID的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。
集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

 

集群消费模式

适用场景
适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示。

技术图片

 

注意事项

  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。

 

 

广播消费模式

适用场景
适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示。

技术图片

 

注意事项

  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同订阅逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。
  • 广播模式下,消息队列RocketMQ版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 广播模式下服务端不维护消费进度,所以消息队列RocketMQ版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

 

系统部署架构

技术图片

图中所涉及到的概念如下所述:

  • Name Server:是一个几乎无状态节点,可集群部署,在消息队列RocketMQ版中提供命名服务,更新和发现Broker服务。
  • Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和Slave Broker,一个Master Broker可以对应多个Slave Broker,但是一个Slave Broker只能对应一个Master Broker。Broker启动后需要完成一次将自己注册至Name Server的操作;随后每隔30s定期向Name Server上报Topic路由信息。
  • 生产者:与Name Server集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长链接,且定时向Master Broker发送心跳。
  • 消费者:与Name Server集群中的其中一个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并向提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅规则由Broker配置决定。

 

消息类型

  

普通消息

消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。

定时和延时消息

允许消息生产者对指定消息进行定时(延时)投递,最长支持40天。

顺序消息

允许消息消费者按照消息发送的顺序对消息进行消费。

事务消息

实现类似X或Open XA的分布事务功能,以达到事务最终一致性状态。

 

普通消息是指消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。

定时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。

延时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。

 

顺序消息(FIFO消息)是消息队列RocketMQ版提供的一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景

技术图片

  • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

技术图片

对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。

  • 用户注册需要发送发验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
  • 电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
producer.start();
        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(
                    // Message所属的Topic。
                    "Order_global_topic",
                    // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                    "TagA",
                    // Message Body,可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                    "send order global msg".getBytes()
            );
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
            // 注意:不设置也不会影响消息正常收发。
            msg.setKey(orderId);
            // 分区顺序消息中区分不同分区的关键字段,Sharding Key与普通消息的key是完全不同的概念。
            // 全局顺序消息,该字段可以设置为任意非空字符串。
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                // 发送消息,只要不抛异常就是成功。
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // 在应用退出前,销毁Producer对象。
        // 注意:如果不销毁也没有问题。
 producer.shutdown();

 

事务消息适用于所有对数据最终一致性有强需求的场景

  • 事务消息:消息队列RocketMQ版提供类似X或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致。
  • 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。

技术图片

事务消息发送步骤如下:

  1. 发送方将半事务消息发送至消息队列RocketMQ版服务端。
  2. 消息队列RocketMQ版服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤如下:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

 

RocketMQ最佳实践


Topic与Tag

 

Topic和Tag的关系如下图

技术图片

 

到底什么时候该用Topic,什么时候该用Tag?

建议从以下几个方面进行判断:

  • 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的Topic,无法通过Tag进行区分。
  • 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的Topic进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分。
  • 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分。
  • 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的Topic。

总的来说,针对消息分类,您可以选择创建多个Topic,或者在同一个Topic下创建多个Tag。但通常情况下,不同的Topic之间的消息没有必然的联系,而Tag则用来区分同一个Topic下相互关联的消息,例如全集和子集的关系、流程先后的关系。

 

消息幂等

当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。

 

处理方法

因为不同的Message ID对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。

以支付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据。具体代码示例如下:

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);     

 

消费者收到消息时可以根据消息的Key,即订单号来实现消息幂等:

consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // 根据业务唯一标识的Key做幂等处理。
    }
}); 

 

订阅关系一致

订阅关系一致指的是同一个消费者Group ID下所有Consumer实例所订阅的Topic、Group ID、Tag必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

消息队列RocketMQ版里的一个消费者Group ID代表一个Consumer实例群组。对于大多数分布式应用来说,一个消费者Group ID下通常会挂载多个Consumer实例。

由于消息队列RocketMQ版的订阅关系主要由Topic+Tag共同组成,因此,保持订阅关系一致意味着同一个消费者Group ID下所有的实例需在以下两方面均保持一致:

  • 订阅的Topic必须一致
  • 订阅的Topic中的Tag必须一致(包括Tag的数量和Tag的顺序)

 

正确订阅关系

技术图片

 

消息堆积和延迟问题

消息处理流程中,如果客户端的消费速度跟不上服务端的发送速度,未处理的消息会越来越多,这部分消息就被称为堆积消息。消息出现堆积进而会造成消息消费延迟。以下场景需要重点关注消息堆积和延迟的问题:

  • 业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复。
  • 业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消息延迟也无法接受。

客户端消费原理

技术图片

 

通过以上客户端消费原理可以看出,消息堆积的主要瓶颈在于本地客户端的消费能力,即消费耗时和消费并发度。想要避免和解决消息堆积问题,必须合理的控制消费耗时和消息并发度,其中消费耗时的优先级高于消费并发度,必须先保证消费耗时的合理性,再考虑消费并发度问题。

 

消费耗时

影响消费耗时的消费逻辑主要分为CPU内存计算和外部I/O操作,通常情况下代码中如果没有复杂的递归和循环的话,内部计算耗时相对外部I/O操作来说几乎可以忽略。外部I/O操作通常包括如下业务逻辑:

  • 读写外部数据库,例如Mysql数据库读写。
  • 读写外部缓存等系统,例如Redis读写。
  • 下游系统调用,例如Dubbo调用或者下游HTTP接口调用。

这类外部调用的逻辑和系统容量您需要提前梳理,掌握每个调用操作预期的耗时,这样才能判断消费逻辑中I/O操作的耗时是否合理。通常消费堆积都是由于这些下游系统出现了服务异常、容量限制导致的消费耗时增加。

例如:某业务消费逻辑中需要写一条数据到数据库,单次消费耗时为1 ms,平时消息量小未出现异常。业务侧进行大促活动时,写数据库TPS爆发式增长,并很快达到数据库容量限制,导致消费单条消息的耗时增加到100 ms,业务侧可以明显感受到消费速度大幅下跌。此时仅通过调整消息队列RocketMQ版SDK的消费并发度并不能解决问题,需要对数据库容量进行升配才能从根本上提高客户端消费能力。

 

消费并发度

RocketMQ并发度计算方法如下表所示。

消息类型消费并发度
普通消息 单节点线程数*节点数量

定时和延时消息
事务消息
顺序消息 Min(单节点线程数*节点数量,分区数)

客户端消费并发度由单节点线程数和节点数量共同决定,一般情况下需要优先调整单节点的线程数,若单机硬件资源达到上限,则必须通过扩容节点来提高消费并发度。

 

如何避免消息堆积和延迟

为了避免在业务使用时出现非预期的消息堆积和延迟问题,您需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。整理出正常业务运行场景下的性能基线,才能在故障场景下迅速定位到阻塞点。其中最重要的就是梳理消息的消费耗时和消息消费的并发度。

  • 梳理消息的消费耗时
    通过压测获取消息的消费耗时,并对耗时较高的操作的代码逻辑进行分析。梳理消息的消费耗时需要关注以下信息:
    • 消息消费逻辑的计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。
    • 消息消费逻辑中的I/O操作(如:外部调用、读写存储等)是否是必须的,能否用本地缓存等方案规避。
    • 消费逻辑中的复杂耗时的操作是否可以做异步化处理,如果可以是否会造成逻辑错乱(消费完成但异步操作未完成)。
  • 设置消息的消费并发度
    1. 逐步调大线程的单个节点的线程数,并关测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量。
    2. 得到单个节点的最优线程数和消息吞吐量后,根据上下游链路的流量峰值计算出需要设置的节点数,节点数=流量峰值/单线程消息吞吐量。

 

如何处理消息堆积

确认消息的消费耗时是否合理。

  • 若查看到消费耗时较长,则需要查看客户端堆栈信息排查具体业务逻辑。
  • 若查看到消费耗时正常,则有可能是因为消费并发度不够导致消息堆积,需要逐步调大消费线程或扩容节点来解决。

查看客户端堆栈信息。只需要关注线程名为ConsumeMessageThread的线程,这些都是业务消费消息的逻辑。

常见的异常堆栈信息如下:

示例一:空闲无堆积的堆栈。

消费空闲情况下消费线程都会处于WAITING状态等待从消费任务队里中获取消息。

技术图片

示例二:消费逻辑有抢锁休眠等待等情况。
消费线程阻塞在内部的一个睡眠等待上,导致消费缓慢。

技术图片

示例三:消费逻辑操作数据库等外部存储卡住。
消费线程阻塞在外部的HTTP调用上,导致消费缓慢。

技术图片

 

针对某些特殊业务场景,如果消息堆积已经影响到业务运行,且堆积的消息本身可以丢弃,您可以通过重置消费位点跳过这些堆积的消息做到快速恢复。

 

规范&特性

Group

  • Group ID必须以“GID_”或者“GID-”开头,长度限制为7~64个字符,只能包含英文、数字、短横线(-)以及下划线(_)。
  • 消费者必须有对应的Group ID,生产者不做强制要求。

Topic

  • 同一实例下Topic名称必须唯一,不同实例间的Topic名称可以重复。
  • Topic名称长度限制为3~64个字符,只能包含英文、数字、短横线(-)以及下划线(_)。

死信消息

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为3天,3天后会被自动删除。因此,请在死信消息产生后的3天内及时处理。

消息

  • 普通和顺序消息:4 MB
  • 事务和定时或延时消息:64 KB
  • 消息最多保留3天,超过时间将自动滚动删除。
  • 支持重置消费3天之内任何时间点的消息。
  • 定时和延时消息的延时时长可设置40天内的任何时刻,超过40天消息发送将失败。

 

RocketMQ概览

标签:开始   直接   conf   异步   更新   https   注意   进制   failed   

原文地址:https://www.cnblogs.com/erichi101/p/14666135.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!