标签:
在Spring中,声明式事务是通过事务属性(transaction attribute)来定义的。事务属性描述了事务策略如何应用到方法上。事务属性包含5个方面:
<tx:advice id="txAdviceHibernate" transaction-manager="txManagerHibernate"> <tx:attributes> <tx:method name="add*" propagation="REQUIRED" rollback-for="Exception" no-rollback-for="" /> <tx:method name="batch*" propagation="REQUIRED" rollback-for="Exception" /> <tx:method name="delete*" propagation="REQUIRED" rollback-for="Exception" /> <tx:method name="execute*" propagation="REQUIRED" rollback-for="Exception" /> <tx:method name="export*" propagation="REQUIRED" rollback-for="Exception" /> <tx:method name="import*" propagation="REQUIRED" rollback-for="Exception" /> <tx:method name="insert*" propagation="REQUIRED" rollback-for="Exception" /> <tx:method name="save*" propagation="REQUIRED" rollback-for="Exception" /> <tx:method name="update*" propagation="REQUIRED" rollback-for="Exception" /> <tx:method name="count*" propagation="NOT_SUPPORTED" read-only="true" /> <tx:method name="find*" propagation="NOT_SUPPORTED" read-only="true" /> <tx:method name="get*" propagation="NOT_SUPPORTED" read-only="true" /> <tx:method name="list*" propagation="NOT_SUPPORTED" read-only="true" /> <tx:method name="load*" propagation="NOT_SUPPORTED" read-only="true" /> <tx:method name="page*" propagation="NOT_SUPPORTED" read-only="true" /> <tx:method name="query*" propagation="NOT_SUPPORTED" read-only="true" /> </tx:attributes> </tx:advice> <!--此处应想办法实现动态加载--> <aop:config> <aop:advisor pointcut="execution(* com.cetc.datamc.app.collect.bs.*.*(..)) or execution(* com.cetc.datamc.app.mdms.bs.*.*(..))" advice-ref="txAdviceHibernate" /> </aop:config>
<tx:advice id="txAdviceHibernate" transaction-manager="txManager"> ... </tx:advice><tx:advice>只是定义了AOP通知,用于把事务边界通知给方法。但是这只是事务通知,而不是完整的事务性切面。我们在<tx:advice>中没有声明哪些Bean应该被通知——我们需要一个切点来做这件事。为了完整定义事务性切面,我们必须定义一个通知器(advisor)。这就涉及aop命名空间了。通过上面的配置示例可以看到:
public interface IDataCollect { public String addDataItem(String data); } public class DataCollectService implements IDataCollect { private IMetadataService metaService; public void setMetaService(IMetadataService metaService) { this.metaService = metaService; } public String addDataItem(String data) { metaService.deleteMeta(data); return metaService.addMeta(data); } } public interface IMetadataService { public void deleteMeta(String data); public String addMeta(String data); public String queryMetaId(String data); } public class MetadataService implements IMetadataService { public void deleteMeta(String data) { System.out.println("delete metadata"); } public String addMeta(String data) { System.out.println("add metadata"); String id = queryMetaId(data); return data; } public String queryMetaId(String data) { return "id"; } }这样,在DataCollectService中有一个IMetadataService类型的对象。那么在实际中该类型则是被Spring替换为动态代理对象,而不是原来的MetaService对象。所以调用了IMetadataService中的deleteMeta和addMeta方法,这两个方法都是调用的织入事务切面代码的函数。而不是原来的deleteMeta和addMeta方法。
/** * General delegate for around-advice-based subclasses, delegating to several other template * methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager} * as well as regular {@link PlatformTransactionManager} implementations. * @param method the Method being invoked * @param targetClass the target class that we're invoking the method on * @param invocation the callback to use for proceeding with the target invocation * @return the return value of the method, if any * @throws Throwable propagated from the target invocation */ protected Object invokeWithinTransaction(Method method, Class targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. //这里返回的即是在tx:method中配置的属性值 //包括propagation, isolation, readonly, timeout, readonly等属性的配置值。 //如果一个方法没有在tx:method中配置,那么txAttr为null final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. //该方法主要就是根据配置来采取合适的事务策略。 //理解了该方法的调用原理,基本上spring的配置事务的各属性的涵义也就理解了 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } commitTransactionAfterReturning(txInfo); return retVal; } else { // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, new TransactionCallback<Object>() { public Object doInTransaction(TransactionStatus status) { TransactionInfo txInfo = prepareTransactionInfo( tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. return new ThrowableHolder(ex); } } finally { cleanupTransactionInfo(txInfo); } } }); // Check result: It might indicate a Throwable to rethrow. if (result instanceof ThrowableHolder) { throw ((ThrowableHolder) result).getThrowable(); } else { return result; } } catch (ThrowableHolderException ex) { throw ex.getCause(); } } }
/** * Create a transaction if necessary based on the given TransactionAttribute. * <p>Allows callers to perform custom TransactionAttribute lookups through * the TransactionAttributeSource. * @param txAttr the TransactionAttribute (may be {@code null}) * @param joinpointIdentification the fully qualified method name * (used for monitoring and logging purposes) * @return a TransactionInfo object, whether or not a transaction was created. * The {@code hasTransaction()} method on TransactionInfo can be used to * tell if there was a transaction created. * @see #getTransactionAttributeSource() */ @SuppressWarnings("serial") protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { //重点看此函数 status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } //如果没有在tx:method中配置,那么txAttr=null, 这样返回的TransactionStatus也为null return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
/** * This implementation handles propagation behavior. Delegates to * {@code doGetTransaction}, {@code isExistingTransaction} * and {@code doBegin}. * @see #doGetTransaction * @see #isExistingTransaction * @see #doBegin */ public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { //doGetTransaction在各继承该类的子类中实现。 //这里假设使用的是Hibernate的事务,那么实现类是HibernateTransactionManager类 //这里的object即表示我们通常所说的Transaction,即事务的类的定义 //对于Hibernate Transaction的具体类是HibernateTransactionObject Object transaction = doGetTransaction(); // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } //根据得到的transaction对象判断是否当前已经存在事务 //如果存在事务,那么调用handleExistingTransaction来根据配置的策略处理事务 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } // Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } //如果当前不存在事务,那么根据不同的策略配置来产生具体的事务行为。 //这里即是各种不同配置策略行为的具体代码实现过程 // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //开启新事务 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }下面我们暂时先抛开如何doGetTransaction以及如何判断isExistingTransaction不谈,先来看看如何doBegin一个新事务。
@Override protected void doBegin(Object transaction, TransactionDefinition definition) { HibernateTransactionObject txObject = (HibernateTransactionObject) transaction; if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) { throw new IllegalTransactionStateException( "Pre-bound JDBC Connection found! HibernateTransactionManager does not support " + "running within DataSourceTransactionManager if told to manage the DataSource itself. " + "It is recommended to use a single HibernateTransactionManager for all transactions " + "on a single DataSource, no matter whether Hibernate or JDBC access."); } Session session = null; try { //如果当前txObject没有已有的Session,那么打开一个新的Session. //txObject也可能已经有了Session, //例如如果配置了OpenSessionInViewFilter,且当前的线程是一个request线程, //那么就会在每个request线程中就先openSession,并且绑定到当前线程中。 //这样在doGetTransaction就会获取到该Session。详见doGetTransaction函数和OpenSessionInViewFilter实现 if (txObject.getSessionHolder() == null || txObject.getSessionHolder().isSynchronizedWithTransaction()) { Interceptor entityInterceptor = getEntityInterceptor(); //这里是openSession,而不是getCurrentSession Session newSession = (entityInterceptor != null ? getSessionFactory().openSession(entityInterceptor) : getSessionFactory().openSession()); if (logger.isDebugEnabled()) { logger.debug("Opened new Session [" + SessionFactoryUtils.toString(newSession) + "] for Hibernate transaction"); } //设置newSessionHolder和newSession状态为true txObject.setSession(newSession); } session = txObject.getSessionHolder().getSession(); if (this.prepareConnection && isSameConnectionForEntireSession(session)) { // We're allowed to change the transaction settings of the JDBC Connection. if (logger.isDebugEnabled()) { logger.debug( "Preparing JDBC Connection of Hibernate Session [" + SessionFactoryUtils.toString(session) + "]"); } Connection con = session.connection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); } else { // Not allowed to change the transaction settings of the JDBC Connection. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { // We should set a specific isolation level but are not allowed to... throw new InvalidIsolationLevelException( "HibernateTransactionManager is not allowed to support custom isolation levels: " + "make sure that its 'prepareConnection' flag is on (the default) and that the " + "Hibernate connection release mode is set to 'on_close' (SpringTransactionFactory's default). " + "Make sure that your LocalSessionFactoryBean actually uses SpringTransactionFactory: Your " + "Hibernate properties should *not* include a 'hibernate.transaction.factory_class' property!"); } if (logger.isDebugEnabled()) { logger.debug( "Not preparing JDBC Connection of Hibernate Session [" + SessionFactoryUtils.toString(session) + "]"); } } //如果是一个new Session并且事务是只读的,则设置该Session flushMode为FlushMode.MANUAL if (definition.isReadOnly() && txObject.isNewSession()) { // Just set to MANUAL in case of a new Session for this transaction. session.setFlushMode(FlushMode.MANUAL); } //如果事务不是只读的,且已有一个Session时,重新设置该Session的flushMode if (!definition.isReadOnly() && !txObject.isNewSession()) { // We need AUTO or COMMIT for a non-read-only transaction. FlushMode flushMode = session.getFlushMode(); if (flushMode.lessThan(FlushMode.COMMIT)) { session.setFlushMode(FlushMode.AUTO); txObject.getSessionHolder().setPreviousFlushMode(flushMode); } } Transaction hibTx; //设置timeout 然后beginTransaction // Register transaction timeout. int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { // Use Hibernate's own transaction timeout mechanism on Hibernate 3.1+ // Applies to all statements, also to inserts, updates and deletes! hibTx = session.getTransaction(); hibTx.setTimeout(timeout); hibTx.begin(); } else { // Open a plain Hibernate transaction without specified timeout. hibTx = session.beginTransaction(); } // Add the Hibernate transaction to the session holder. txObject.getSessionHolder().setTransaction(hibTx); // Register the Hibernate Session's JDBC Connection for the DataSource, if set. if (getDataSource() != null) { Connection con = session.connection(); ConnectionHolder conHolder = new ConnectionHolder(con); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { conHolder.setTimeoutInSeconds(timeout); } if (logger.isDebugEnabled()) { logger.debug("Exposing Hibernate transaction as JDBC transaction [" + con + "]"); } //将当前的conHolder绑定到线程上,可以用来判断当前线程是否已有事务 TransactionSynchronizationManager.bindResource(getDataSource(), conHolder); txObject.setConnectionHolder(conHolder); } // Bind the session holder to the thread. if (txObject.isNewSessionHolder()) { //将当前的SessionHolder绑定到当前线程上,可以用来判断当前线程是否已有事务 TransactionSynchronizationManager.bindResource(getSessionFactory(), txObject.getSessionHolder()); } txObject.getSessionHolder().setSynchronizedWithTransaction(true); } catch (Throwable ex) { if (txObject.isNewSession()) { try { if (session.getTransaction().isActive()) { session.getTransaction().rollback(); } } catch (Throwable ex2) { logger.debug("Could not rollback Session after failed transaction begin", ex); } finally { SessionFactoryUtils.closeSession(session); txObject.setSessionHolder(null); } } throw new CannotCreateTransactionException("Could not open Hibernate Session for transaction", ex); } }好了,知道了doBegin函数的实现原理,让我们回过头来看doGetTransaction和isExistingTransaction函数的实现。这样就很容易理解了。
@Override protected Object doGetTransaction() { HibernateTransactionObject txObject = new HibernateTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); //获取当前线程中是否已经有已绑定的SessionHolder。如果当前线程已经存在事务,那么就会存在SessionHolder //具体可见doBegin函数的实现 SessionHolder sessionHolder = (SessionHolder) TransactionSynchronizationManager.getResource(getSessionFactory()); if (sessionHolder != null) { if (logger.isDebugEnabled()) { logger.debug("Found thread-bound Session [" + SessionFactoryUtils.toString(sessionHolder.getSession()) + "] for Hibernate transaction"); } //如果已经有了SessionHolder,那么直接就把当前已有的SessionHolder赋给新的transaction Object txObject.setSessionHolder(sessionHolder); } else if (this.hibernateManagedSession) { try { //当前线程中已经绑定了session,例如openSessionInViewFilter的情况 Session session = getSessionFactory().getCurrentSession(); if (logger.isDebugEnabled()) { logger.debug("Found Hibernate-managed Session [" + SessionFactoryUtils.toString(session) + "] for Spring-managed transaction"); } txObject.setExistingSession(session); } catch (HibernateException ex) { throw new DataAccessResourceFailureException( "Could not obtain Hibernate-managed Session for Spring-managed transaction", ex); } } //如果当前线程已经绑定了ConnectionHolder,如果当前线程中已经开启了事务,就会存在ConnectionHolder //直接将已有的ConnectionHolder赋值给该transaction if (getDataSource() != null) { ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(getDataSource()); txObject.setConnectionHolder(conHolder); } return txObject; }
@Override protected boolean isExistingTransaction(Object transaction) { HibernateTransactionObject txObject = (HibernateTransactionObject) transaction; //spring管理的事务或者是Hibernate自己管理的事务 return (txObject.hasSpringManagedTransaction() || (this.hibernateManagedSession && txObject.hasHibernateManagedTransaction())); }
public boolean hasSpringManagedTransaction() { //如果当前的transaction Object的sessionHolder不为null,说明当前存在事务 return (this.sessionHolder != null && this.sessionHolder.getTransaction() != null); }hasHibernateManagedTransaction函数实现如下:
public boolean hasHibernateManagedTransaction() { return this.sessionHolder != null && this.sessionHolder.getSession().getTransaction().isActive(); }这两个函数都很简单,结合doGetTransaction函数的实现原理,就很容易理解了。
/** * Create a TransactionStatus for an existing transaction. */ private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { //配置的策略为当前方法不应该运行在事务上下文中,如果当前正有一个事务运行,则会抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } //配置的策略为当前方法不应该运行在事务中,如果存在当前事务,在该方法运行期间,当前事务将被挂起 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } //配置的策略为当前方法必须运行在它自己的事务中,一个新的事务将被启动。 //如果存在当前事务,在该方法运行期间,当前事务将被挂起 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } catch (Error beginErr) { resumeAfterBeginException(transaction, suspendedResources, beginErr); throw beginErr; } } //配置的策略为如果当前已经存在一个事务,那么该方法将会在嵌套事务中运行。 //嵌套的事务可以独立于当前事务进行单独地提交或回滚。如果当前事务不存在, //那么其行为与PROPAGATION_REQUIRED一样 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
标签:
原文地址:http://blog.csdn.net/wei83523408/article/details/51388343