标签:开始 直接 conf 异步 更新 https 注意 进制 failed
Topic:消息主题,一级消息类型,通过Topic对消息进行分类。
Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类。
Message ID:消息的全局唯一标识,由消息队列RocketMQ版系统自动生成,唯一标识某条消息。
Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。
Producer:消息生产者,也称为消息发布者,负责生产并发送消息。
Consumer:消息消费者,也称为消息订阅者,负责接收并消费消息。可分为两类:
Group:一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。
集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。
例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。
广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。
例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。
消息队列RocketMQ版支持发布和订阅模型,消息生产者应用创建Topic并将消息发送到Topic。消费者应用创建对Topic的订阅以便从其接收消息。通信可以是一对多(扇出)、多对一(扇入)和多对多。
生产者集群:用来表示发送消息应用,一个生产者集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。
一个生产者集群可以发送多个Topic消息。发送分布式事务消息时,如果生产者中途意外宕机,消息队列RocketMQ版服务端会主动回调生产者集群的任意一台机器来确认事务状态。
消费者集群:用来表示消费消息应用,一个消费者集群下包含多个消费者实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个消费者对象。
一个消费者集群下的多个消费者以均摊方式消费消息。如果设置的是广播方式,那么这个消费者集群下的每个实例都消费全量数据。
一个消费者集群对应一个Group ID,一个Group ID可以订阅多个Topic
消息队列RocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订阅方订阅关注的Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列RocketMQ版约定以下概念:
集群:使用相同Group ID的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。
集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。
适用场景
适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示。
注意事项
适用场景
适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示。
注意事项
图中所涉及到的概念如下所述:
普通消息 |
消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。 |
定时和延时消息 |
允许消息生产者对指定消息进行定时(延时)投递,最长支持40天。 |
顺序消息 |
允许消息消费者按照消息发送的顺序对消息进行消费。 |
事务消息 |
实现类似X或Open XA的分布事务功能,以达到事务最终一致性状态。 |
普通消息是指消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。
定时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
延时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
顺序消息(FIFO消息)是消息队列RocketMQ版提供的一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
producer.start(); for (int i = 0; i < 1000; i++) { String orderId = "biz_" + i % 10; Message msg = new Message( // Message所属的Topic。 "Order_global_topic", // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。 "TagA", // Message Body,可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。 "send order global msg".getBytes() ); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。 // 注意:不设置也不会影响消息正常收发。 msg.setKey(orderId); // 分区顺序消息中区分不同分区的关键字段,Sharding Key与普通消息的key是完全不同的概念。 // 全局顺序消息,该字段可以设置为任意非空字符串。 String shardingKey = String.valueOf(orderId); try { SendResult sendResult = producer.send(msg, shardingKey); // 发送消息,只要不抛异常就是成功。 if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } } // 在应用退出前,销毁Producer对象。 // 注意:如果不销毁也没有问题。 producer.shutdown(); |
事务消息适用于所有对数据最终一致性有强需求的场景
事务消息发送步骤如下:
事务消息回查步骤如下:
Topic和Tag的关系如下图
到底什么时候该用Topic,什么时候该用Tag?
建议从以下几个方面进行判断:
总的来说,针对消息分类,您可以选择创建多个Topic,或者在同一个Topic下创建多个Tag。但通常情况下,不同的Topic之间的消息没有必然的联系,而Tag则用来区分同一个Topic下相互关联的消息,例如全集和子集的关系、流程先后的关系。
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。
处理方法
因为不同的Message ID对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。
以支付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据。具体代码示例如下:
Message message = new Message(); message.setKey("ORDERID_100"); SendResult sendResult = producer.send(message); |
消费者收到消息时可以根据消息的Key,即订单号来实现消息幂等:
consumer.subscribe("ons_test", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { String key = message.getKey() // 根据业务唯一标识的Key做幂等处理。 } }); |
订阅关系一致指的是同一个消费者Group ID下所有Consumer实例所订阅的Topic、Group ID、Tag必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
消息队列RocketMQ版里的一个消费者Group ID代表一个Consumer实例群组。对于大多数分布式应用来说,一个消费者Group ID下通常会挂载多个Consumer实例。
由于消息队列RocketMQ版的订阅关系主要由Topic+Tag共同组成,因此,保持订阅关系一致意味着同一个消费者Group ID下所有的实例需在以下两方面均保持一致:
正确订阅关系
消息处理流程中,如果客户端的消费速度跟不上服务端的发送速度,未处理的消息会越来越多,这部分消息就被称为堆积消息。消息出现堆积进而会造成消息消费延迟。以下场景需要重点关注消息堆积和延迟的问题:
客户端消费原理
通过以上客户端消费原理可以看出,消息堆积的主要瓶颈在于本地客户端的消费能力,即消费耗时和消费并发度。想要避免和解决消息堆积问题,必须合理的控制消费耗时和消息并发度,其中消费耗时的优先级高于消费并发度,必须先保证消费耗时的合理性,再考虑消费并发度问题。
消费耗时
影响消费耗时的消费逻辑主要分为CPU内存计算和外部I/O操作,通常情况下代码中如果没有复杂的递归和循环的话,内部计算耗时相对外部I/O操作来说几乎可以忽略。外部I/O操作通常包括如下业务逻辑:
这类外部调用的逻辑和系统容量您需要提前梳理,掌握每个调用操作预期的耗时,这样才能判断消费逻辑中I/O操作的耗时是否合理。通常消费堆积都是由于这些下游系统出现了服务异常、容量限制导致的消费耗时增加。
例如:某业务消费逻辑中需要写一条数据到数据库,单次消费耗时为1 ms,平时消息量小未出现异常。业务侧进行大促活动时,写数据库TPS爆发式增长,并很快达到数据库容量限制,导致消费单条消息的耗时增加到100 ms,业务侧可以明显感受到消费速度大幅下跌。此时仅通过调整消息队列RocketMQ版SDK的消费并发度并不能解决问题,需要对数据库容量进行升配才能从根本上提高客户端消费能力。
消费并发度
RocketMQ并发度计算方法如下表所示。
消息类型 | 消费并发度 |
---|---|
普通消息 | 单节点线程数*节点数量 |
定时和延时消息 | |
事务消息 | |
顺序消息 | Min(单节点线程数*节点数量,分区数) |
客户端消费并发度由单节点线程数和节点数量共同决定,一般情况下需要优先调整单节点的线程数,若单机硬件资源达到上限,则必须通过扩容节点来提高消费并发度。
如何避免消息堆积和延迟
为了避免在业务使用时出现非预期的消息堆积和延迟问题,您需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。整理出正常业务运行场景下的性能基线,才能在故障场景下迅速定位到阻塞点。其中最重要的就是梳理消息的消费耗时和消息消费的并发度。
如何处理消息堆积
确认消息的消费耗时是否合理。
查看客户端堆栈信息。只需要关注线程名为ConsumeMessageThread的线程,这些都是业务消费消息的逻辑。
常见的异常堆栈信息如下:
示例一:空闲无堆积的堆栈。
消费空闲情况下消费线程都会处于WAITING状态等待从消费任务队里中获取消息。
示例二:消费逻辑有抢锁休眠等待等情况。
消费线程阻塞在内部的一个睡眠等待上,导致消费缓慢。
示例三:消费逻辑操作数据库等外部存储卡住。
消费线程阻塞在外部的HTTP调用上,导致消费缓慢。
针对某些特殊业务场景,如果消息堆积已经影响到业务运行,且堆积的消息本身可以丢弃,您可以通过重置消费位点跳过这些堆积的消息做到快速恢复。
规范&特性
Group
Topic
死信消息
消息
标签:开始 直接 conf 异步 更新 https 注意 进制 failed
原文地址:https://www.cnblogs.com/erichi101/p/14666135.html