标签:自己实现 取消 一般来说 剖析 伪代码 base red 一些事 失败
这是一篇从去年写到今年的文章,希望大家会喜欢
分布式事务一直是一个老生常谈的一个话题,在我的公众号下面下面已经写过很多篇分布式事务相关的文章了,但是依旧没有将其完全剖析。在之前的文章中我也多次提到我们可以使用消息队列来实现我们的分布式事务,但是大多都是一笔带过,很多读者都对这一块产生了很多疑问,希望读完这篇文章能让你理解如何用消息队列实现分布式事务。
当然首先要回顾一下我们的一些基本概念:
CAP定理,又被叫作布鲁尔定理。对于设计分布式系统来说(不仅仅是分布式事务)的架构师来说,CAP就是你的入门理论。
对于CP来说,放弃可用性,追求一致性和分区容错性,我们的zookeeper其实就是追求的强一致。
对于AP来说,放弃一致性(这里说的一致性是强一致性),追求分区容错性和可用性,这是很多分布式系统设计时的选择,后面的BASE也是根据AP来扩展。
顺便一提,CAP理论中是忽略网络延迟,也就是当事务提交时,从节点A复制到节点B,但是在现实中这个是明显不可能的,所以总会有一定的时间是不一致。同时CAP中选择两个,比如你选择了CP,并不是叫你放弃A。因为P出现的概率实在是太小了,大部分的时间你仍然需要保证CA。就算分区出现了你也要为后来的A做准备,比如通过一些日志的手段,让其他机器回复至可用。
BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。是对CAP中AP的一个扩展
基本可用:分布式系统在出现故障时,允许损失部分可用功能,保证核心功能可用。
软状态:允许系统中存在中间状态,这个状态不影响系统可用性,这里指的是CAP中的不一致。
最终一致:最终一致是指经过一段时间后,所有节点数据都将会达到一致。
BASE解决了CAP中理论没有网络延迟,在BASE中用软状态和最终一致,保证了延迟后的一致性。BASE和 ACID 是相反的,它完全不同于ACID的强一致性模型,而是通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终达到一致状态。
我们的所有事务消息都可以看作是BASE模型的实现。在业界中有事务消息功能比较有代表性的就是阿里开源的RocketMQ和去哪儿开源的QMQ,他们两个消息队列都实现了事务消息功能,但是实现的方式却各有不同,接下来也会分别剖析这两个消息队列是如何实现事务消息。
RocketMQ事务消息到底是怎么一回事呢?
基本流程如下:
第一阶段Prepared消息,会拿到消息的地址。
第二阶段执行本地事务。
第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。消息接受者就能使用这个消息。
如果确认消息失败,在RocketMq Broker中提供了定时扫描没有更新状态的消息,如果有消息没有得到确认,会向消息发送者发送消息,来判断是否提交,在rocketmq中是以listener的形式给发送者,用来处理。
如果确认消息失败,在RocketMq Broker中提供了定时扫描没有更新状态的消息,如果有消息没有得到确认,会向消息发送者发送消息,来判断是否提交,在rocketmq中是以listener的形式给发送者,用来处理。
如果消费超时,则需要一直重试,消息接收端需要保证幂等。如果消息消费失败,这个就需要人工进行处理,因为这个概率较低,如果为了这种小概率时间而设计这个复杂的流程反而得不偿失
这个图大家想必在其他地方已经看见过很多次了,很多时候从看这个图只能一知半解,那接下来看看代码是如何实现的吧。
在RocketMQ的事务消息中有个很重要的监听器叫TransactionListener,我们需要实现他
其中有两个方法:
executeLocalTransaction:顾名思义执行我们的本地事务方法,一般来说我们的本地事务方法是由上层的业务顺序推进调用,但是在rocketMQ的事务消息中是需要由Listener来进行驱动,如果要使用RocketMQ的事务消息需要对我们的业务进行一定的改造。并且这里还需要注意的是,我们在事务中还需要保存消息的事务ID和当前事务的对应关系。
checkLocalTransaction:根据我们之前的事务ID来检查我们的本地事务状态,这里的状态有三种:
事务消息共有三种状态,提交状态、回滚状态、中间状态:
对于我们的消息发送有如下代码:
我们发现在代码中我们将我们之前的listener以及一个线程池来和我们的producer进行绑定,这里线程池的作用是我们checkLocalTransaction所使用的线程池。
这里的代码比较简单,主要分下面几个步骤
对于checkLocalTransaction:
在RocketMQ中会接收RocketMQ-Broker发送的CHECK_TRANSACTION_STATE请求,来执行检查本地事务状态。
在Broker上会对事务消息进行特殊判断:
如果是事务消息那么就需要走prepareMessage这个逻辑,prepareMessage这个逻辑如下:
主要是将当前消息的topic替换成RMQ_SYS_TRANS_HALF_TOPIC。我们的一阶段发送半消息到这里就完成了,接下来就是Broker处理我们事务的commit或者rollback:
图中红色方框表示我们的核心步骤,对于commit的一共有三步:
对于rollback一共有两步:
对于获取消息这个比较简单,通过记录的offset直接查询就好,对于将消息发送到原来的topic逻辑基本上可以复用,这里要重点讨论的是如何删除半消息,我们都知道RocketMQ是顺序写入,我们不可能去真正的删除消息,那么就只能依靠一些其他的途径,我们可以想到消息消费了之后,只要offset不重置,这个消息就不会再被消费,那么其实就实现了删除的功能。RocketMQ也是通过这样的思路,自己实现了一个消费者,去消费RMQ_SYS_TRANS_HALF_TOPIC这个Topic,如果消息需要删除的话消费了之后就不需要做其他操作,如果不需要删除的话,消费了之后又会重新投递。
那其实核心就在于怎么去记录半消息是否应该删除呢?对于这个问题RocketMQ采用了新的TopicRMQ_SYS_TRANS_OP_HALF_TOPIC来保存半消息是否删除,其实在上面的删除半消息的流程中其实也是对RMQ_SYS_TRANS_OP_HALF_TOPIC投递了一个op_message,然后由后台任务去进行操作。
整个流程原理图如下面所示:
上面已经讲了如何使用RocketMQ的事务消息和实现原理,想必大家已经对RocketMQ事务消息有自己的认识了。但是RocketMQ的事务消息目前在我的一些业务实战中是从来没有使用过的,主要原因有几个方面:
综上所述,RocketMQ的事务消息在我看来的确属于比较鸡肋,很难去适应于老业务。那么怎么去接下来讲一下QMQ的事务消息的解决方案,看看这种方案能否解决我们所说的这种问题呢?
QMQ的事务消息没有RocketMQ那么的复杂,对于消息中间件的本身改造是很小的,其依赖了数据库自身的本地事务,比如一个创建订单,需要发送两种消息,分别是A和B,那么有如下的伪代码:
begin transaction;
createOrder();
commit transaction;
sendMessageA();
snedMessageB();
这个时候我们发现消息A和消息B都在事务之外,其一致性得不到保证,那么其实我们发送消息的时候不一定要真正的和消息中间件打交道,我们可以做一个本地的存储,保存我们的消息:
begin transaction;
createOrder();
saveMessageA();
saveMessageB();
commit transaction;
// 发送消息
sendMessageA();
snedMessageB();
可以看见其实我们只是增加两个保存消息的操作,那么我们是如何保证一致性呢,如果发送MessageA的时候挂了,那么我们就可以通过定时任务去拉去我们数据库中保存的并没有发送的消息,然后再次进行发送。
其实这种方法同样的可以扩展至其他的消息队列,因为对于消息中间件本身是没有***的,如果RocketMQ或者Kafka也想使用这种方法来保证事务消息,也是可以的。
我们来看看这种方法能否解决RocketMQ事务消息带来的问题呢?
begin transaction;
createOrder();
sendMessageA();
snedMessageB();
commit transaction;
这里的send其实内部逻辑是saveMessage,在commit之后会自动进行发送,并且后台有定时任务会补偿发送。
RocketMQ事务消息带来的问题基本可以解决,但是其同样也有缺点,因为其引入了额外的数据库写,如果事务消息较多,那么就会多出很多写数据库的操作,对于响应时间比较敏感的服务需要仔细考虑
介绍了两种事务消息,对于我个人而言,QMQ实现的方案能更加适应于大多数业务。但是这里要注意事务消息并不是所有的分布式一致性都能使用,事务消息使用的场景只能是发出这个消息就能代表这个操作成功的场景,什么意思呢?举个例子,比如我们支付的时候会扣积分,扣券等等,如果我发一个扣积分的消息能代表一定成功吗?这个肯定是不行的,因为用户的积分可能不够,就会导致扣除失败。如果是发送一个赠送积分的消息那么就可以代表成功,因为赠送积分是属于加法,并没有太多的限制。
如果发现事务消息不能很好的满足的满足业务场景,那么你就可以考虑其他的一些事务策略,比如TCC,saga等,这些在我之前的文章都有讲述。
如果大家觉得这篇文章对你有帮助,你的关注和转发是对我最大的支持
标签:自己实现 取消 一般来说 剖析 伪代码 base red 一些事 失败
原文地址:https://blog.51cto.com/14980978/2544634