标签:keep 回滚 操作 one worker ati statement bool 初始
https://www.cnblogs.com/lay2017/p/12485081.html
在阅读seata自动配置相关的内容的时候,我们说过。客户端会初始化一个RMClient的RPC客户端,且同时会添加一个监听器RmMessageListener,监听器将监听来自seata的server发送的RPC消息。我们再回顾一下这段代码
public static void init(String applicationId, String transactionServiceGroup) { RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup); rmRpcClient.setResourceManager(DefaultResourceManager.get()); rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get())); rmRpcClient.init(); }
RmMessageListener监听到的消息将交付给TransactionMessageHandler处理
这里的DefaultRMHandler.get()将返回一个DefaultRMHandler的单例对象,DefaultRMHandler简介实现TransactionMessageHandler,并且组合了三种handler
1)RMHandlerAT
2) RMHandlerTCC
3) RMHandlerSaga
分别对应不同的事务模式,三种处理器和DefaultRMHandler一样都继承了AbstractRMHandler。而AbstractRMHandler包含了两个核心方法,doBranchCommit和doBranchRollback,分别用于二阶段分支事务的提交和分支事务的回滚。
跟进AbstractRMHandler的doBranchCommit方法
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); // 选择对应的ResourceManager,调用commit BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData); response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); }
doBranchCommit方法将会获取到对应的ResourceManager,我们以AT模式为例。AT模式将会获取到DataSourceManager这个ResourceManager。
我们跟进DataSourceManager的branchCommit方法
private ResourceManagerInbound asyncWorker; @Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData); }
branchCommit方法中交付给了一个异步线程处理,我们跟进AsyncWorker的branchCommit
@Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) { LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later."); } return BranchStatus.PhaseTwo_Committed; }
异步线程中使用了一个BlockQueue来排队处理,将会有一个Scheduler定时从BlockQueue中获取poll出来,然后进行undoLog的批量删除
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);
小结:所谓分支事务的二阶段提交,其实就是异步删除undoLog。因为一阶段的时候已经提交了本地事务,所以二阶段就非常地快速。
和doBranchCommit,先跟进AbstractRMHandler的doBranchRollback方法
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); // 选择ResourceManager,调用rollback BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData); response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); }
可以看到,一样是调用ResourceManager的方法。我们同样以DataSourceManager为例,跟进DataSourceManager的branchRollback方法
@Override public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // 选择Resource DataSourceProxy dataSourceProxy = get(resourceId); try { // undo补偿 UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId); } catch (TransactionException te) { if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) { return BranchStatus.PhaseTwo_RollbackFailed_Unretryable; } else { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } return BranchStatus.PhaseTwo_Rollbacked; }
在阅读DataSourceProxy的时候说过,DataSourceProxy将会作为一个Resource注册到ResourceManager当中。
而在分支事务回滚的时候,将会获取到该Resource,也就是DataSourceProxy。并且执行对应数据源的undo补偿操作。
我们跟进undo方法,看看补偿操作,方法较长,这里缩减掉一些内容
@Override public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException { // ... for (; ; ) { try { //... // 查找undoLog selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); selectPST.setLong(1, branchId); selectPST.setString(2, xid); rs = selectPST.executeQuery(); boolean exists = false; while (rs.next()) { // ... // 反编码 UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer); BranchUndoLog branchUndoLog = parser.decode(rollbackInfo); try { List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs(); for (SQLUndoLog sqlUndoLog : sqlUndoLogs) { //... // 执行器处理undo AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); } } } if (exists) { // 删除undoLog deleteUndoLog(xid, branchId, conn); conn.commit(); } else { //... } return; } // ... } }
我们看到,首先是查询出分支事务的undoLog。然后反序列化出undoLog数据对象,且丢给执行器去执行。执行完毕删除undoLog,且提交事务。
由此可见,rollback就是把StatementProxy准备地undoLog拿出来,然后进行反向地补偿操作。
本文我们简单地阅读了一下seata客户端在二阶段分支事务的commit和rollback操作做了啥。commit主要就是把undoLog删除,rollback则是获取了undoLog然后对数据进行反向生成。
到这里,seata的客户端代码阅读部分就结束了。我们从自动配置 -> 切面 -> 数据源代理 -> 监听器这么一个流程阅读下来可以发现AT模式的核心要点就是在于数据源代理,由undoLog做反向补偿操作。
后续,将开始server端的代码阅读...
标签:keep 回滚 操作 one worker ati statement bool 初始
原文地址:https://www.cnblogs.com/lay2017/p/12491982.html