码迷,mamicode.com
首页 > 编程语言 > 详细

Spring框架之事务处理源码分析

时间:2018-08-16 13:52:23      阅读:196      评论:0      收藏:0      [点我收藏+]

标签:dos   session   tor   res   idt   统一   local   ida   工作   

1、事务就是以可控的方式对数据资源(数据库,文件系统)进行访问的一组操作。为了保证事务执行前后,数据资源所承载的系统状态处于“正确”状态,事务本身有4个限定属性(ACID):原子性,一致性,隔离性,持久性。

      原子性:事务包含的全部操作是一个不可分割的整体,要么全部提交成功,要么全部失败。

      一致性:一致性要求事务所包含的操作不能违反数据资源的一致性检查。

      隔离性:各个事务之间相互影响的程度,不同的隔离级别决定了各个事物对该数据资源访问的不同行为。隔离性是面向数据资源的并发访问。

        四种隔离级别,由弱到强分别为Read Uncommitted,Read Commited,Repeatable Read,Serializable

           Read Uncommitted: 一个事务可以读取另一个事务没有提交的更新结果,以低的隔离度来寻求较高的性能。事务回滚后,另一个事务看到的就是脏数据。

      Read Committed:默认级别。

      Repeatable Read: 保证在整个事务过程中,对同一笔数据的读取结果是相同的。不管其他事务是否同时在对同一笔数据进行更新,也不管其他事务对同一笔数

                据更新与否。

      Serializable:最严格的隔离级别。所有事务都必须按顺序执行,可以避免所有问题,但是是性能最差的隔离级别,很少场景会使用。通常会使用其他隔离级别

              加上相应的并发锁的机制来控制对数据的访问,这样既保证了系统性能不会损失太大,也能够在一定程度上保证数据的一致性。

      Oracle只支持:Read Committed、Serializable。

      EJB、Spring、JDBC等数据库访问方式,都提供4种隔离级别,但是最终是否以指定的隔离执行,由底层的数据资源来决定。

      持久性:一旦事务操作成功提交,对数据所做的变更将被记载并不可逆转。

2、事务管理代码

        Connection connection = null;
        boolean roolback = false;
        DataSource dataSource=null;
        try {
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);
            //jdbc 操作。。。。。。
            connection.commit();
        } catch (SQLException e) {
            roolback = true;
            e.printStackTrace();
        }finally {
            if(connection!=null){
                if (roolback){
                    connection.rollback();
                }
                connection.close();
            }
        }

 Hibernate API的事务管理代码

        try {
            session = sessionFactory.openSession();
            transaction = session.beginTransaction();
            //数据库访问
            session.flush();
            transaction.commit();
        }catch (HibernateException e){
            transaction.rollback();
        }finally {
            session.close();
        }

     

3、JDBC的局部事务控制(事务只涉及到一个数据源)是由同一个java.sql.Connection来完成的,所以要保证两个DAO的数据访问方法会处于一个事务中,我们就得保证它们使用的是同一个java.sql.Connection。

Spring 的事务狂阶设计理念的基本原则是:让事务的关注点和数据访问的关注点相分离。

  当在业务层使用事务的抽象API进行事务界定时,不需要关心事务将要加诸于上的事务资源是什么,对不同的事务资源的管理将由响应的框架实现类来操心。
  当在数据访问层对可能参与事务的数据资源进行访问的时候,只需要使用响应的数据访问API进行数据访问,而不需要关心当前事务资源如何参与事务或是是否需要参与事务。这同样将由事务框架类来打理。

import org.springframework.dao.DataAccessException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
public class Service {
    private PlatformTransactionManager transactionManager;
    public  void serviceMethod(){
        TransactionDefinition definition=null;
        TransactionStatus txStatus = getTransactionManager().getTransaction(definition);
        try {
            //数据库操作  dao1.doDataAccess(); dao2.doDataAccess();
        }catch (DataAccessException e){
            getTransactionManager().rollback(txStatus);
            throw e;
        }catch (Exception e){
            getTransactionManager().rollback(txStatus);
            throw e;
        }
        getTransactionManager().commit(txStatus);
    }
    private PlatformTransactionManager getTransactionManager() {
        return  transactionManager;
    }
}

  

public interface PlatformTransactionManager {
    TransactionStatus getTransaction(TransactionDefinition var1) throws TransactionException;

    void commit(TransactionStatus var1) throws TransactionException;

    void rollback(TransactionStatus var1) throws TransactionException;
}

  

技术分享图片

 技术分享图片

如何传递Connection?图19-2虽然也可以保证Dao使用同一个connection,但是导致事务管理代码和数据访问代码之间通过Connection直接耦合。现在是JDBC使用Connection,如果是Hibernate的话就要声明对Session的依赖了。

正确方法:要传递Connection,可以将整个事务对应的java.sql.Connection实例放到统一的地方去,无论是谁,要使用该资源,都从这一个地方获取。具体:我们在事务开始之前取得一个java.sql.Connection,然后将该Connection绑定到当前的调用线程,之后,数据访问对象在使用Connection时,就可以从当前线程上获取这个事务开始的时候绑定的Connection实例。当所有的数据访问对象全部使用这个绑定当前线程的Connection完成了数据访问工作时,我们就使用这个Connection实例提交或者回滚事务,然后解除它到当前线程的绑定。

原型代码(不用于生产环境):

public class TransactionResourceManager {
    private static ThreadLocal resources = new ThreadLocal();
    public static Object getResource(){
        return resources.get();
    }
    public static void bindResource(Object resource){
        resources.set(resource);
    }
    public static Object unbindResource(){
        Object res = getResource();
        resources.set(null);
        return res;
    }
}

 

public class MyPlatformTransactionManager implements PlatformTransactionManager {
    private DataSource dataSource;
    public MyPlatformTransactionManager(DataSource dataSource){
        this.dataSource = dataSource;
    }
    @Override
    public TransactionStatus getTransaction(TransactionDefinition transactionDefinition) throws TransactionException {
        Connection connection;
        try {
            connection=dataSource.getConnection();
            TransactionResourceManager.bindResource(connection);
            return new DefaultTransactionStatus(connection,true,true,false,true,null);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void commit(TransactionStatus transactionStatus) throws TransactionException {
        Connection connection = (Connection) TransactionResourceManager.unbindResource();
        try {
            connection.commit();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void rollback(TransactionStatus transactionStatus) throws TransactionException {
        Connection connection = (Connection) TransactionResourceManager.unbindResource();

        try {
            connection.rollback();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

 Spring 框架中的DataSourceUtils工具类的最主要的工作是对connection管理。DataSourceUtils会从TransactionResourceManager(类似TransactionResourceManager类)那里获取Connection资源。如果当前线程没有绑定任何connection,那么就通过数据库访问对象的DataSource引用获取新的connection,否则就必须使用绑定的connection,这就是为什么强调,当我们使用Spring提供的事务支持的时候,必须通过DataSourceUtils来获取连接。而jdbcTemplate等类内部已经使用DataSourceUtils来管理连接了,所以我们不用操心细节。

 

4、Spring的事务抽象包括3个主要接口:即PlatformTransactionManager、TransactionDefinition、TransactionStatus:

技术分享图片

PlatformTransactionManager 负责界定事务边界

TransactionDefinition 负责定义事务相关属性,包括隔离级别、传播行为等。

PlatformTransactionManager 将参照TransactionDefinition 的属性定义来开启相关事务。事务开启之后到事务结束期间的事务状态由TranscationStatus负责,我们也可以通过TransactionStatus对事务进行有限的控制。

 

5、DataSourceTransactionManager 简单介绍:

   技术分享图片

AbstractPlatformTransactionManager:

  • 确定如果有现有的事务;
  • 应用适当的传播行为;
  • 如果有必要暂停和恢复事务;
  • 提交时检查rollback-only标记;
  • 应用适当的修改当回滚(实际回滚或设置rollback-only);
  • 触发同步回调注册(如果事务同步是激活的)

 

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {     //doGetTransaction()方法是AbstractPlatformTransactionManager的抽象接口,实现AbstractPlatformTransactionManager抽象类的类实现该方法返回的对象类型并不相同        Object transaction = this.doGetTransaction();

1
    
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
     Object transaction = this.doGetTransaction();  //doGetTransaction()方法是AbstractPlatformTransactionManager的抽象接口,实现AbstractPlatformTransactionManager抽象类的类实现该方法返回的对象类型并不相同  
     boolean debugEnabled = this.logger.isDebugEnabled();//获取Log类的debug信息,避免之后的代码重复
     //如果definition参数为空,则创建一个默认的事务定义数据
        if (definition == null) {
            definition = new DefaultTransactionDefinition();
        }
     //根据先前获取的transaction object 判断是否存在当前事务,根据判定结果采取不同的处理方式
        if (this.isExistingTransaction(transaction)) {
            return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);(2)
        } else if (((TransactionDefinition)definition).getTimeout() < -1) {
            throw new InvalidTimeoutException("Invalid transaction timeout", ((TransactionDefinition)definition).getTimeout());
        } else if (((TransactionDefinition)definition).getPropagationBehavior() == 2) {
            throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation ‘mandatory‘");
        } else if (((TransactionDefinition)definition).getPropagationBehavior() != 0 && ((TransactionDefinition)definition).getPropagationBehavior() != 3 && ((TransactionDefinition)definition).getPropagationBehavior() != 6) {
            if (((TransactionDefinition)definition).getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {
                this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + definition);
            }

            boolean newSynchronization = this.getTransactionSynchronization() == 0;
            return this.prepareTransactionStatus((TransactionDefinition)definition, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
        } else {
            AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
            if (debugEnabled) {
                this.logger.debug("Creating new transaction with name [" + ((TransactionDefinition)definition).getName() + "]: " + definition);
            }

            try {
                boolean newSynchronization = this.getTransactionSynchronization() != 2;
                DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                this.doBegin(transaction, (TransactionDefinition)definition);
                this.prepareSynchronization(status, (TransactionDefinition)definition);
                return status;
            } catch (RuntimeException var7) {
                this.resume((Object)null, suspendedResources);
                throw var7;
            } catch (Error var8) {
                this.resume((Object)null, suspendedResources);
                throw var8;
            }
        }
    }

  return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);//  如果已经存在事务,则做的处理:

技术分享图片

2    private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
        if (definition.getPropagationBehavior() == 5) {//如果definition定义的传播行为是propagation_never,则抛出异常并退出
            throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation ‘never‘");
        } else {
            AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources;
            boolean newSynchronization;
            if (definition.getPropagationBehavior() == 4) {//如果definition定义的传播行为是PROPAGATION_NOT_SUPPORTED,则挂起当前事务然后返回
                if (debugEnabled) {
                    this.logger.debug("Suspending current transaction");
                }

                suspendedResources = this.suspend(transaction); (3)
                newSynchronization = this.getTransactionSynchronization() == 0;
                return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources);
            } else if (definition.getPropagationBehavior() == 3) {
                if (debugEnabled) {
                    this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
                }

                suspendedResources = this.suspend(transaction);

                try {
                    newSynchronization = this.getTransactionSynchronization() != 2;
                    DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    this.doBegin(transaction, definition);
                    this.prepareSynchronization(status, definition);
                    return status;
                } catch (RuntimeException var7) {
                    this.resumeAfterBeginException(transaction, suspendedResources, var7);
                    throw var7;
                } catch (Error var8) {
                    this.resumeAfterBeginException(transaction, suspendedResources, var8);
                    throw var8;
                }
            } else {
                boolean newSynchronization;
                if (definition.getPropagationBehavior() == 6) {
                    if (!this.isNestedTransactionAllowed()) {
                        throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify ‘nestedTransactionAllowed‘ property with value ‘true‘");
                    } else {
                        if (debugEnabled) {
                            this.logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
                        }

                        if (this.useSavepointForNestedTransaction()) {
                            DefaultTransactionStatus status = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null);
                            status.createAndHoldSavepoint();
                            return status;
                        } else {
                            newSynchronization = this.getTransactionSynchronization() != 2;
                            DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, (Object)null);
                            this.doBegin(transaction, definition);
                            this.prepareSynchronization(status, definition);
                            return status;
                        }
                    }
                } else {
                    if (debugEnabled) {
                        this.logger.debug("Participating in existing transaction");
                    }

                    if (this.isValidateExistingTransaction()) {
                        if (definition.getIsolationLevel() != -1) {
                            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                                Constants isoConstants = DefaultTransactionDefinition.constants;
                                throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, "ISOLATION_") : "(unknown)"));
                            }
                        }

                        if (!definition.isReadOnly() && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                            throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
                        }
                    }

                    newSynchronization = this.getTransactionSynchronization() != 2;
                    return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null);
                }
            }
        }
    }

  

事务挂起:如果当前线程存在事务,但事务传播特性又要求开启新事务,需要将已有的事务进行挂起,事务的挂起及涉及线程与事务信息的保存,实现源码如下:

3、    protected final AbstractPlatformTransactionManager.SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
     //如果事务是激活的,且当前线程事务同步机制也是激活状态 if (TransactionSynchronizationManager.isSynchronizationActive()) {
      //挂起当前线程中所有同步的事务(4) List suspendedSynchronizations = this.doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { suspendedResources = this.doSuspend(transaction); }           //在线程中保存与事务处理有关的信息,并将线程里有关的线程局部变量重置 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName((String)null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel((Integer)null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false);
//将当前线程中事务相关信息保存 return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException var8) { this.doResumeSynchronization(suspendedSynchronizations); throw var8; } catch (Error var9) { this.doResumeSynchronization(suspendedSynchronizations); throw var9; }
     //如果事务是激活的,但是事务同步机制不是激活的,则只需要保存事务状态,不需要重置事务相关的线程局部变量 } else if (transaction != null) { Object suspendedResources = this.doSuspend(transaction); return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources); } else {///事务和事务同步机制都不是激活的,则不要想处理 return null; } }

  

4    private List<TransactionSynchronization> doSuspendSynchronization() {
        List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations();
        Iterator var2 = suspendedSynchronizations.iterator();

        while(var2.hasNext()) {
            TransactionSynchronization synchronization = (TransactionSynchronization)var2.next();
            synchronization.suspend();//if (this.holderActive) { TransactionSynchronizationManager.unbindResource(this.resourceKey); }
        }

        TransactionSynchronizationManager.clearSynchronization();
        return suspendedSynchronizations;
    }

  

Spring框架之事务处理源码分析

标签:dos   session   tor   res   idt   统一   local   ida   工作   

原文地址:https://www.cnblogs.com/yaohuiqin/p/9440870.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!