码迷,mamicode.com
首页 > 其他好文 > 详细

七、seata客户端二阶段分支事务的提交和回滚

时间:2020-03-14 15:00:58      阅读:418      评论:0      收藏:0      [点我收藏+]

标签:keep   回滚   操作   one   worker   ati   statement   bool   初始   

所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

 

正文

在阅读seata自动配置相关的内容的时候,我们说过。客户端会初始化一个RMClient的RPC客户端,且同时会添加一个监听器RmMessageListener,监听器将监听来自seata的server发送的RPC消息。我们再回顾一下这段代码

public static void init(String applicationId, String transactionServiceGroup) {
    RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
    rmRpcClient.setResourceManager(DefaultResourceManager.get());
    rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
    rmRpcClient.init();
}

RmMessageListener监听到的消息将交付给TransactionMessageHandler处理

这里的DefaultRMHandler.get()将返回一个DefaultRMHandler的单例对象,DefaultRMHandler简介实现TransactionMessageHandler,并且组合了三种handler

1)RMHandlerAT

2) RMHandlerTCC

3) RMHandlerSaga

分别对应不同的事务模式,三种处理器和DefaultRMHandler一样都继承了AbstractRMHandler。而AbstractRMHandler包含了两个核心方法,doBranchCommit和doBranchRollback,分别用于二阶段分支事务的提交和分支事务的回滚。

 

分支事务提交doBranchCommit

跟进AbstractRMHandler的doBranchCommit方法

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    // 选择对应的ResourceManager,调用commit
    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
}

doBranchCommit方法将会获取到对应的ResourceManager,我们以AT模式为例。AT模式将会获取到DataSourceManager这个ResourceManager。

我们跟进DataSourceManager的branchCommit方法

private ResourceManagerInbound asyncWorker;
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
}

branchCommit方法中交付给了一个异步线程处理,我们跟进AsyncWorker的branchCommit

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
        LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid
            + "] will be handled by housekeeping later.");
    }
    return BranchStatus.PhaseTwo_Committed;
}

异步线程中使用了一个BlockQueue来排队处理,将会有一个Scheduler定时从BlockQueue中获取poll出来,然后进行undoLog的批量删除

UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);

小结:所谓分支事务的二阶段提交,其实就是异步删除undoLog。因为一阶段的时候已经提交了本地事务,所以二阶段就非常地快速。

 

分支事务回滚doBranchRollback

和doBranchCommit,先跟进AbstractRMHandler的doBranchRollback方法

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    // 选择ResourceManager,调用rollback
    BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
}

可以看到,一样是调用ResourceManager的方法。我们同样以DataSourceManager为例,跟进DataSourceManager的branchRollback方法

@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    // 选择Resource
    DataSourceProxy dataSourceProxy = get(resourceId);

    try {
        // undo补偿
        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}

在阅读DataSourceProxy的时候说过,DataSourceProxy将会作为一个Resource注册到ResourceManager当中。

而在分支事务回滚的时候,将会获取到该Resource,也就是DataSourceProxy。并且执行对应数据源的undo补偿操作。

我们跟进undo方法,看看补偿操作,方法较长,这里缩减掉一些内容

@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    // ...
    for (; ; ) {
        try {
            //...

            // 查找undoLog
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();

            boolean exists = false;
            while (rs.next()) {
                   // ...
                   // 反编码
                UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer);
                BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                try {
                    List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                    
                    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                        //...
                        // 执行器处理undo
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
                        undoExecutor.executeOn(conn);
                    }
                }
            }

            if (exists) {
                // 删除undoLog
                deleteUndoLog(xid, branchId, conn);
                conn.commit();
            } else {
                //...
            }

            return;
        }
        // ...
    }
}

我们看到,首先是查询出分支事务的undoLog。然后反序列化出undoLog数据对象,且丢给执行器去执行。执行完毕删除undoLog,且提交事务。

由此可见,rollback就是把StatementProxy准备地undoLog拿出来,然后进行反向地补偿操作。

 

总结

本文我们简单地阅读了一下seata客户端在二阶段分支事务的commit和rollback操作做了啥。commit主要就是把undoLog删除,rollback则是获取了undoLog然后对数据进行反向生成。

到这里,seata的客户端代码阅读部分就结束了。我们从自动配置 -> 切面 -> 数据源代理 -> 监听器这么一个流程阅读下来可以发现AT模式的核心要点就是在于数据源代理,由undoLog做反向补偿操作。

后续,将开始server端的代码阅读...

 

七、seata客户端二阶段分支事务的提交和回滚

标签:keep   回滚   操作   one   worker   ati   statement   bool   初始   

原文地址:https://www.cnblogs.com/lay2017/p/12491982.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!