标签:nlog was ESS gettime executor 次方 routing 需要 open
本文章比较枯燥,源码居多。都是本人一步一步debug出来的,如果有问题欢迎指出。为了体现流程连贯性,所以由很多无用步骤。读者可以一边看一边debug。如果简单可以自行略过。
在前面的章节中我们已经知道mybatis在初始化过程。(org.mybatis.spring.SqlSessionFactoryBean的afterPropertiesSet())
在初始化mybatis的时候会将所有配置封装到Configuration类中,由JVM加载到内存中。这样做的好处是内存级操作是最快的,无需重复读取配置文件信息。Configuration类几乎包含所有的mytatis配置信息。sqlSessionFactoryBuilder也将会以此作为参数,创建SqlSessionFactory(实现类DefaultSqlSessionFactory)。
在创建SqlSessionFactory时我们可能记得有这样一段代码:
public void afterPropertiesSet() throws Exception {
notNull(dataSource, "Property ‘dataSource‘ is required");
notNull(sqlSessionFactoryBuilder, "Property ‘sqlSessionFactoryBuilder‘ is required");
state((configuration == null && configLocation == null) || !(configuration != null && configLocation != null),
"Property ‘configuration‘ and ‘configLocation‘ can not specified with together");
this.sqlSessionFactory = buildSqlSessionFactory();
}
protected SqlSessionFactory buildSqlSessionFactory() throws IOException {
Configuration configuration;
XMLConfigBuilder xmlConfigBuilder = null;
if (this.configuration != null) {
configuration = this.configuration;
if (configuration.getVariables() == null) {
configuration.setVariables(this.configurationProperties);
} else if (this.configurationProperties != null) {
configuration.getVariables().putAll(this.configurationProperties);
}
} else if (this.configLocation != null) {
xmlConfigBuilder = new XMLConfigBuilder(this.configLocation.getInputStream(), null, this.configurationProperties);
configuration = xmlConfigBuilder.getConfiguration();
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Property `configuration` or ‘configLocation‘ not specified, using default MyBatis Configuration");
}
configuration = new Configuration();
configuration.setVariables(this.configurationProperties);
}
... ...
//当我们不指定transactionFactory时会创建SpringManagedTransactionFactory。这个是mybatis由spring管理事务的桥梁
if (this.transactionFactory == null) {
this.transactionFactory = new SpringManagedTransactionFactory();
}
configuration.setEnvironment(new Environment(this.environment, this.transactionFactory, this.dataSource));
|
在不指定transactionFactory的时候会创建默认SpringManagedTransactionFactory。SpringManagedTransactionFactory用于创建Transaction。Transaction是ibatis底层的一个数据源链接connection的包装类。管理数据库连接的声明周期,包括creation, preparation, commit/rollback and close。mytatis将此Transaction作为参数,创建sql执行的Executor。(mytatis的sql执行者是Executor)
/**
* Wraps a database connection.
* Handles the connection lifecycle that comprises: its creation, preparation, commit/rollback and close.
*
* @author Clinton Begin
*/
public interface Transaction {
/**
* Retrieve inner database connection
* @return DataBase connection
* @throws SQLException
*/
Connection getConnection() throws SQLException;
|
那么SpringManagedTransactionFactory在项目中是如何流转的,又是如何和spring进行沟通的呢?接着往下走。
在mytatis中是通过sqlSession开启会话的,它相当于一个connection链接。我们既可以通过sqlSession直接查询也可以通过sqlSession获取映射器,通过动态代理方式查询。
无论哪种方式,总归都要获取sqlSession。在上面初始化的过程中,我们知到创建的是DefaultSqlSessionFactory。进入源码看一看有啥
public class DefaultSqlSessionFactory implements SqlSessionFactory {
private final Configuration configuration;
public DefaultSqlSessionFactory(Configuration configuration) {
this.configuration = configuration;
}
@Override
public SqlSession openSession() {
return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false);
}
@Override
public SqlSession openSession(ExecutorType execType) {
return openSessionFromDataSource(execType, null, false);
}
省略略干方法... ...
@Override
public SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level) {
return openSessionFromDataSource(execType, level, false);
}
@Override
public SqlSession openSession(ExecutorType execType, boolean autoCommit) {
return openSessionFromDataSource(execType, null, autoCommit);
}
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();//1
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);//2
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);//3
final Executor executor = configuration.newExecutor(tx, execType);//4
return new DefaultSqlSession(configuration, executor, autoCommit);//5
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
}
|
我们可以看见这个类中提供了茫茫多openSession的方法。通过数据源创建session最终调用的都是openSessionFromDataSource。这个方法很简单。
1、通过全局configuration获取当前环境
2、再通过环境拿到当初创建的SpringManagedTransactionFactory。
3、再通过SpringManagedTransactionFactory创建Transaction(SpringManagedTransaction)
4、创建sql执行器Executor(SimpleExecutor)
5、创建sqlsession会话(DefaultSqlSession)。
在会话创建完成后,就是进行具体的数据库操作,获取链接,查询,映射器解析结果,返回等操作。在DefaultSqlSession我们可以看到很多查询方法,具体用哪个mytatis经过解析自己选择,我们不做分析。拿一个案例getById做一下简单分析。(值得注意的是mybatis并没有在创建session会话时就建立数据库连接的)
当一个查询getById进入系统后,经过前面的初始化,创建会话,参数解析等步骤后进入DefaultSqlSession的public <T> T selectOne(String statement, Object parameter)方法
public class DefaultSqlSession implements SqlSession {
@Override
public <T> T selectOne(String statement) {
return this.<T>selectOne(statement, null);
}
//1
@Override
public <T> T selectOne(String statement, Object parameter) {
// Popular vote was to return null on 0 results and throw exception on too many.
List<T> list = this.<T>selectList(statement, parameter);
if (list.size() == 1) {
return list.get(0);
} else if (list.size() > 1) {
throw new TooManyResultsException("Expected one result (or null) to be returned by selectOne(), but found: " + list.size());
} else {
return null;
}
}
}
|
public class DefaultSqlSession implements SqlSession {
//2
@Override
public <E> List<E> selectList(String statement, Object parameter) {
return this.selectList(statement, parameter, RowBounds.DEFAULT);
}
//3
@Override
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
try {
MappedStatement ms = configuration.getMappedStatement(statement);
//executor执行器具体执行sql
return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
}
|
public class CachingExecutor implements Executor {
//4
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameterObject);
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
//5
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
Cache cache = ms.getCache();
//第一次是没有缓存的走下边的
if (cache != null) {
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, boundSql);
@SuppressWarnings("unchecked")
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
}
|
public abstract class BaseExecutor implements Executor {
//6
@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}
//7
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
}
|
public class SimpleExecutor extends BaseExecutor {
//8
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
//9
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
//获取数据库连接
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
}
|
在执行器SimpleExecutor中我们看到一个getConnection的方法。这是具体获取数据源连接的过程。我们记得在开启会话的时候第三步是创建执行器,参数是SpringManagedTransaction和执行器类型simple。所以继续往下跟,看SpringManagedTransaction如何拿到数据库连接的
public abstract class BaseExecutor implements Executor {
//10
protected Connection getConnection(Log statementLog) throws SQLException {
//transaction是一个接口,创建会话时指定的是SpringManagedTransaction
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
}
|
public class SpringManagedTransaction implements Transaction {
@Override
public Connection getConnection() throws SQLException {
//如果Transaction包装类中有数据源就返回,没有就创建
if (this.connection == null) {
openConnection();
}
return this.connection;
}
private void openConnection() throws SQLException {
//创建数据源,并赋值到包装类中(装饰者)
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"JDBC Connection ["
+ this.connection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}
}
|
public abstract class DataSourceUtils {
public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
try {
return doGetConnection(dataSource);
}
catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);
}
catch (IllegalStateException ex) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage());
}
}
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
// Else we either got no holder or an empty thread-bound holder here.
logger.debug("Fetching JDBC Connection from DataSource");
Connection con = fetchConnection(dataSource);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
logger.debug("Registering transaction synchronization for JDBC Connection");
// Use same Connection for further JDBC actions within the transaction.
// Thread-bound object will get removed by synchronization at transaction completion.
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
return con;
}
private static Connection fetchConnection(DataSource dataSource) throws SQLException {
//使用指定的dataSource创建链接。具体步骤需要看使用的是什么线程池技术。如果是druid就看druid源码。如果是c3p0就看c3p0的源码。
Connection con = dataSource.getConnection();
if (con == null) {
throw new IllegalStateException("DataSource returned null from getConnection(): " + dataSource);
}
return con;
}
}
|
在dataSource.getConnection()中具体需要看用的什么线程池技术(一般都会用到线程池)。我们自己的项目使用的是druid连接池。由于流量大所以加入了主从复制多数据源,再继承spring的AbstractRoutingDataSource作为包装。使用Aspect动态切换数据源。所以我们看我项目自己的包装类即可。配置和源码如下:
//数据源包装类
public class MultipleDataSource extends AbstractRoutingDataSource {
private static final ThreadLocal<String> dataSourceKey = new InheritableThreadLocal<>();
public static void setDataSourceKey(String dataSource) {
dataSourceKey.set(dataSource);
}
//注意这个重写的方法。它是后面从数据源Map中获取key的方法
@Override
protected Object determineCurrentLookupKey() {
return dataSourceKey.get();
}
}
|
AbstractRoutingDataSource源码如下。AbstractRoutingDataSource实现了InitializingBean接口,spring在完成对象创建和成员变量赋值后,会调用afterPropertiesSet()方法。这一点和druid数据源创建很类似。在这个方法中会做一些校验和其他成员变量的赋值工作。将所有的数据源以key=value的方式放到一个Map中。
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
@Nullable
private Map<Object, Object> targetDataSources;
@Nullable
private Object defaultTargetDataSource;
@Nullable
private Map<Object, DataSource> resolvedDataSources;
@Nullable
private DataSource resolvedDefaultDataSource;
@Override
public void afterPropertiesSet() {
if (this.targetDataSources == null) {
throw new IllegalArgumentException("Property ‘targetDataSources‘ is required");
}
this.resolvedDataSources = new HashMap<>(this.targetDataSources.size());
this.targetDataSources.forEach((key, value) -> {
Object lookupKey = resolveSpecifiedLookupKey(key);
DataSource dataSource = resolveSpecifiedDataSource(value);
this.resolvedDataSources.put(lookupKey, dataSource);
});
if (this.defaultTargetDataSource != null) {
this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
}
}
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
@Nullable
protected abstract Object determineCurrentLookupKey();
}
|
回到上面的问题我们看我项目自己的包装类MultipleDataSource中并没有getConnection() 方法,调用的是父类AbstractRoutingDataSource的getConnection()。接着调用determineTargetDataSource(),再次方法中调用determineCurrentLookupKey();这个方法是在包装类中重写的方法。它的返回值是从ThreadLocal副本中获取。在每个线程链接mysql之前,都会用Aspect动态在ThreadLocal副本保存一个数据源key,当getConnection() 时会根据这个key,从数据源Map(afterPropertiesSet()方法设置的)中get出来对应的数据源,这样就能动态的从spring中获取指定的数据源。
mybatis如何由spring管理数据源(mybatis和spring的交互流程)
标签:nlog was ESS gettime executor 次方 routing 需要 open
原文地址:https://www.cnblogs.com/Houz/p/11957908.html