这次开发功能是OEM统计报表。统计报表的数据由大数据平台部的同事收集,数据的展示由我们部门开发。
大数据那边使用 Kylin 分布式分析引擎(kylin官方文档)。
Kylin 虽比较偏向大数据相关,但最终大佬们决定把访问 Kylin 的 Dubbo 接口也由我们开发,比较坑。
解决方案一:Mybatis
首先我们搭建一个 shop-mod-bigdata (+ shop-mod-bigdata-interface) 模块用于访问 Kylin,暴露 Dubbo 服务。
Kylin 官方文档提供了 JDBC 的方式访问Kylin。
为了长期考虑,是否可以用上连接池?
Kylin 有提供 JDBC 的方式访问,那就应该会遵循 JDBC 的规范,
参考 Mysql 的连接池配置进行配置
<!-- Kylin 数据源(OEM) --> <bean id="kylinOemDataSource" class="org.apache.tomcat.jdbc.pool.DataSource" destroy-method="close"> <property name="driverClassName" value="${ds.kylin.jdbc.Driver}" /> <property name="url" value="${ds.kylin.oem.jdbc.url}" /> <property name="username" value="${ds.kylin.oem.jdbc.username}" /> <property name="password" value="${ds.kylin.oem.jdbc.password}" /> <!-- 连接池配置 --> <property name="testWhileIdle" value="${ds.kylin.testWhileIdle}" /> <property name="testOnBorrow" value="${ds.kylin.testOnBorrow}" /> <property name="testOnReturn" value="${ds.kylin.testOnReturn}" /> <property name="validationQuery" value="${ds.kylin.validationQuery}" /> <property name="validationInterval" value="${ds.kylin.validationInterval}" /> <property name="timeBetweenEvictionRunsMillis" value="${ds.kylin.timeBetweenEvictionRunsMillis}" /> <property name="maxActive" value="${ds.kylin.maxActive}" /> <property name="maxIdle" value="${ds.kylin.maxIdle}" /> <property name="minIdle" value="${ds.kylin.minIdle}" /> <property name="maxWait" value="${ds.kylin.maxWait}" /> <property name="initialSize" value="${ds.kylin.initialSize}" /> <property name="removeAbandonedTimeout" value="${ds.kylin.removeAbandonedTimeout}" /> <property name="removeAbandoned" value="${ds.kylin.removeAbandoned}" /> <property name="logAbandoned" value="${ds.kylin.logAbandoned}" /> <property name="minEvictableIdleTimeMillis" value="${ds.kylin.minEvictableIdleTimeMillis}" /> <property name="jmxEnabled" value="${ds.kylin.jmxEnabled}" /> <property name="jdbcInterceptors" value="${ds.kylin.jdbcInterceptors}" /> </bean>
加上连接池配置后,下面就要验证和测试了。
1:调试,跟源码。
2:为便于观察运行细节是否正常,修改 log4j 日志配置
log4j.logger.org.apache.tomcat.jdbc=DEBUG log4j.logger.org.springframework.jdbc=DEBUG log4j.logger.org.apache.kylin.jdbc=DEBUG log4j.logger.org.apache.ibatis=DEBUG log4j.logger.org.mybatis.spring=DEBUG
3:测试有没有内存泄漏,长时间运行时是否有问题:CPU 是否异常,线程数是否异常,连接池是否异常,JVM垃圾回收是否异常,
刚好用用学到的 JVM 相关的东东,这里我就直接使用 JDK 自带的工具(jvisualvm),
入口的话就直接用单元测试+线程池来跑
/** * Kylin 访问压测 * * @author zhangjl * @date 2018年3月9日 */ public class KylinPressureTest extends TestBase { @Resource private PersonFacade personFacade; @Test public void pressureTest() { ExecutorService threadPool = Executors.newFixedThreadPool(10); final AtomicInteger counter = new AtomicInteger(); final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); // 生成垃圾 Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { @Override public void run() { int index = generateGarbage(); LOGGER.info("index={}", index); } }, 5, 5, TimeUnit.SECONDS); // 访问 Kylin for (int i = 0; i < 10000; i++) { LOGGER.info("第" + (i + 1) + "轮开始"); for (int j = 0; j < 10; j++) { threadPool.execute(new Runnable() { @Override public void run() { int currNum = counter.addAndGet(1); try { PageInfo page = new PageInfo(); page.setPagination(true); PersonDto condition = new PersonDto(); condition.setGender(counter.get()); ApiResult<PageResult<PersonDto>> result = personFacade.page(condition, page); LOGGER.info("Kylin访问结果, result={}", result); } catch (Exception e) { LOGGER.error("Kylin访问异常", e); } LOGGER.info("Kylin访问,当前为第{}个,time={}", currNum, sdf.format(new Date())); } }); } try { Thread.sleep(1L * 60L * 1000L); } catch (InterruptedException e) { LOGGER.error("线程中断", e); } } // 等待 waitingUserInputExit(); } private void waitingUserInputExit() { Scanner scanner = null; try { scanner = new Scanner(System.in); while (true) { String input = scanner.next(); if ("exit".equals(input)) { break; } } } catch (Exception e) { LOGGER.error("输出异常", e); } finally { if (null != scanner) { scanner.close(); } } } private int generateGarbage() { int index = 0; byte[] array1 = new byte[100 * 1024 * 1024]; // 100M // byte[] array2 = new byte[100 * 1024 * 1024]; // 100M array1[index++] = new Byte("1"); // array2[index++] = new Byte("2"); return index; } }
修改 VM arguments 参数:
-Xmx1024m -Xms1024m -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -Xloggc:mygc.log
结果图:
根据测试结果,连接池应该没什么大问题。
感觉应该可以用 Mybatis ?那就加上 Mybatis 的配置,再试一波。
简单 SQL 验证没问题。
又遇到一个问题:分页。
Mysql分页使用的是:LIMIT ${pageSize}, ${offset}
Kylin分页使用的是:LIMIT ${pageSize} OFFSET ${offset}
好像区别并不大,嗯嗯,完全可以基于以前的 Mysql Dialect 改代码。
<!-- Kylin jdbcTemplate(OEM) --> <!-- <bean id="kylinOemJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="kylinOemDataSource"></property> </bean> --> <!-- Kylin 不配置统一事务管理器(OEM) --> <!-- Kylin sqlSessionFactory(OEM) --> <bean id="kylinOemSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="kylinOemDataSource" /> <property name="plugins"> <list> <bean class="yunnex.shop.bigdata.common.kylin.KylinMybatisLogInterceptor"></bean> <bean class="yunnex.shop.bigdata.common.kylin.KylinMybatisPaginationInterceptor"></bean> </list> </property> <property name="mapperLocations"> <array> <value>classpath*:/yunnex/shop/bigdata/oem/**/*.xml</value> </array> </property> </bean> <!-- Kylin MapperScannerConfigurer(OEM) --> <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <property name="basePackage" value="yunnex.shop.bigdata.oem.**.mapper,yunnex.shop.bigdata.oem.**.dao" /> <property name="sqlSessionFactoryBeanName" value="kylinOemSqlSessionFactory"></property> </bean>
/** * 分页拦截器 * * @author zhangjl * @date 2018年3月9日 */ @Intercepts({@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})}) public class KylinMybatisPaginationInterceptor implements Interceptor { private Logger LOGGER = LoggerFactory.getLogger(KylinMybatisPaginationInterceptor.class); Dialect dialect = new KylinDialect(); @Value("${sql.rows.max.return}") private Integer rowlimit; @SuppressWarnings("unchecked") public Object intercept(Invocation invocation) throws Throwable { MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0]; Object parameter = invocation.getArgs()[1]; BoundSql boundSql = mappedStatement.getBoundSql(parameter); String sqlId = mappedStatement.getId(); String originalSql = boundSql.getSql().trim(); RowBounds rowBounds = (RowBounds) invocation.getArgs()[2]; Object parameterObject = boundSql.getParameterObject(); if (boundSql.getSql() == null || "".equals(boundSql.getSql())) { return null; } // 分页参数--上下文传参 PageInfo page = null; // map传参每次都将currentPage重置,先判读map再判断context if (parameterObject instanceof PageInfo) { page = (PageInfo) parameterObject; } else if (parameterObject instanceof Map) { Map<String, Object> map = (Map<String, Object>) parameterObject; if (map.containsKey("page")) { page = (PageInfo) map.get("page"); } } else if (null != parameterObject) { Field pageField = ReflectionUtil.getFieldByFieldName(parameterObject, "page"); if (pageField != null) { page = (PageInfo) ReflectionUtil.getValueByFieldName(parameterObject, "page"); } } // 后面用到了context的东东 if (page != null && page.isPagination() == true) { if (page.getPageSize() > rowlimit) { LOGGER.warn("[toolarge_pagesize] page size greater than {},#sqlid:{}#,#pagesize:{}#,#sql:{}#", rowlimit, sqlId, page.getPageSize(), originalSql); page.setPageSize(rowlimit); } int totalRows = page.getTotalRows(); // 得到总记录数 if (totalRows == 0 && page.isNeedCount()) { StringBuilder countSql = new StringBuilder(); countSql.append(KylinPageHepler.getCountString(originalSql)); Connection connection = mappedStatement.getConfiguration().getEnvironment().getDataSource().getConnection(); PreparedStatement countStmt = connection.prepareStatement(countSql.toString()); BoundSql countBS = new BoundSql(mappedStatement.getConfiguration(), countSql.toString(), boundSql.getParameterMappings(), parameterObject); Field metaParamsField = ReflectionUtil.getFieldByFieldName(boundSql, "metaParameters"); if (metaParamsField != null) { MetaObject mo = (MetaObject) ReflectionUtil.getValueByFieldName(boundSql, "metaParameters"); ReflectionUtil.setValueByFieldName(countBS, "metaParameters", mo); } setParameters(countStmt, mappedStatement, countBS, parameterObject); ResultSet rs = countStmt.executeQuery(); if (rs.next()) { totalRows = rs.getInt(1); } rs.close(); countStmt.close(); connection.close(); } // 分页计算 page.init(totalRows, page.getPageSize(), page.getCurrentPage()); if (rowBounds == null || rowBounds == RowBounds.DEFAULT) { rowBounds = new RowBounds(page.getPageSize() * (page.getCurrentPage() - 1), page.getPageSize()); } // 分页查询 本地化对象 修改数据库注意修改实现 String pagesql = dialect.getLimitString(originalSql, rowBounds.getOffset(), rowBounds.getLimit()); invocation.getArgs()[2] = new RowBounds(RowBounds.NO_ROW_OFFSET, RowBounds.NO_ROW_LIMIT); BoundSql newBoundSql = new BoundSql(mappedStatement.getConfiguration(), pagesql, boundSql.getParameterMappings(), boundSql.getParameterObject()); Field metaParamsField = ReflectionUtil.getFieldByFieldName(boundSql, "metaParameters"); if (metaParamsField != null) { MetaObject mo = (MetaObject) ReflectionUtil.getValueByFieldName(boundSql, "metaParameters"); ReflectionUtil.setValueByFieldName(newBoundSql, "metaParameters", mo); } MappedStatement newMs = copyFromMappedStatement(mappedStatement, new BoundSqlSqlSource(newBoundSql)); invocation.getArgs()[0] = newMs; } return invocation.proceed(); } public static class BoundSqlSqlSource implements SqlSource { BoundSql boundSql; public BoundSqlSqlSource(BoundSql boundSql) { this.boundSql = boundSql; } public BoundSql getBoundSql(Object parameterObject) { return boundSql; } } public Object plugin(Object arg0) { return Plugin.wrap(arg0, this); } public void setProperties(Properties arg0) { } /** * 对SQL参数(?)设值,参考org.apache.ibatis.executor.parameter.DefaultParameterHandler * * @param ps * @param mappedStatement * @param boundSql * @param parameterObject * @throws SQLException */ @SuppressWarnings({"rawtypes", "unchecked"}) private void setParameters(PreparedStatement ps, MappedStatement mappedStatement, BoundSql boundSql, Object parameterObject) throws SQLException { ErrorContext.instance().activity("setting parameters").object(mappedStatement.getParameterMap().getId()); List<ParameterMapping> parameterMappings = boundSql.getParameterMappings(); if (parameterMappings != null) { Configuration configuration = mappedStatement.getConfiguration(); TypeHandlerRegistry typeHandlerRegistry = configuration.getTypeHandlerRegistry(); MetaObject metaObject = parameterObject == null ? null : configuration.newMetaObject(parameterObject); for (int i = 0; i < parameterMappings.size(); i++) { ParameterMapping parameterMapping = parameterMappings.get(i); if (parameterMapping.getMode() != ParameterMode.OUT) { Object value; String propertyName = parameterMapping.getProperty(); PropertyTokenizer prop = new PropertyTokenizer(propertyName); if (parameterObject == null) { value = null; } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) { value = parameterObject; } else if (boundSql.hasAdditionalParameter(propertyName)) { value = boundSql.getAdditionalParameter(propertyName); } else if (propertyName.startsWith(ForEachSqlNode.ITEM_PREFIX) && boundSql.hasAdditionalParameter(prop.getName())) { value = boundSql.getAdditionalParameter(prop.getName()); if (value != null) { value = configuration.newMetaObject(value).getValue(propertyName.substring(prop.getName().length())); } } else { value = metaObject == null ? null : metaObject.getValue(propertyName); } TypeHandler typeHandler = parameterMapping.getTypeHandler(); if (typeHandler == null) { throw new ExecutorException( "There was no TypeHandler found for parameter " + propertyName + " of statement " + mappedStatement.getId()); } typeHandler.setParameter(ps, i + 1, value, parameterMapping.getJdbcType()); } } } } private MappedStatement copyFromMappedStatement(MappedStatement ms, SqlSource newSqlSource) { Builder builder = new MappedStatement.Builder(ms.getConfiguration(), ms.getId(), newSqlSource, ms.getSqlCommandType()); builder.resource(ms.getResource()); builder.fetchSize(ms.getFetchSize()); builder.statementType(ms.getStatementType()); builder.keyGenerator(ms.getKeyGenerator()); builder.keyProperty(buildKeyProperty(ms.getKeyProperties())); builder.timeout(ms.getTimeout()); builder.parameterMap(ms.getParameterMap()); builder.resultMaps(ms.getResultMaps()); builder.useCache(ms.isUseCache()); builder.cache(ms.getCache()); MappedStatement newMs = builder.build(); return newMs; } private static String buildKeyProperty(String[] props) { if (null != props && props.length > 0) { StringBuilder sb = new StringBuilder(); for (String p : props) { sb.append(p).append(","); } return sb.substring(0, sb.length() - 1); } return null; } }
/** * Kylin Dialect * * @author zhangjl * @date 2018年3月9日 */ public class KylinDialect extends Dialect { protected static final String SQL_END_DELIMITER = ";"; public String getLimitString(String sql, boolean hasOffset) { return KylinPageHepler.getLimitString(sql, -1, -1); } @Override public String getLimitString(String sql, int offset, int limit) { return KylinPageHepler.getLimitString(sql, offset, limit); } @Override public boolean supportsLimit() { return true; } }
/** * 分页帮助类 * * @author zhangjl * @date 2018年3月9日 */ public class KylinPageHepler { /** * 得到查询总数的sql */ public static String getCountString(String querySelect) { querySelect = getLineSql(querySelect); int orderIndex = getLastOrderInsertPoint(querySelect); int formIndex = getAfterFormInsertPoint(querySelect); String select = querySelect.substring(0, formIndex); // 如果SELECT 中包含 DISTINCT 只能在外层包含COUNT if (select.toLowerCase().indexOf("select distinct") != -1 || querySelect.toLowerCase().indexOf("group by") != -1) { return new StringBuffer(querySelect.length()).append("select count(1) from (").append(querySelect.substring(0, orderIndex)) .append(" ) t").toString(); } else { return new StringBuffer(querySelect.length()).append("select count(1) ").append(querySelect.substring(formIndex, orderIndex)) .toString(); } } /** * 得到最后一个Order By的插入点位置 * * @return 返回最后一个Order By插入点的位置 */ private static int getLastOrderInsertPoint(String querySelect) { int orderIndex = querySelect.toLowerCase().lastIndexOf("order by"); if (orderIndex == -1 || !isBracketCanPartnership(querySelect.substring(orderIndex, querySelect.length()))) { throw new RuntimeException("Kylin SQL 分页必须要有Order by 语句!"); } return orderIndex; } /** * 得到分页的SQL * * @param offset 偏移量 * @param limit 位置 * @return 分页SQL */ public static String getLimitString(String querySelect, int offset, int limit) { querySelect = getLineSql(querySelect); // String sql = querySelect + " limit " + offset + " ," + limit; String sql = querySelect + " limit " + limit + " offset " + offset; return sql; } /** * 将SQL语句变成一条语句,并且每个单词的间隔都是1个空格 * * @param sql SQL语句 * @return 如果sql是NULL返回空,否则返回转化后的SQL */ private static String getLineSql(String sql) { return sql.replaceAll("[\r\n]", " ").replaceAll("\\s{2,}", " "); } /** * 得到SQL第一个正确的FROM的的插入点 */ private static int getAfterFormInsertPoint(String querySelect) { String regex = "\\s+FROM\\s+"; Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE); Matcher matcher = pattern.matcher(querySelect); while (matcher.find()) { int fromStartIndex = matcher.start(0); String text = querySelect.substring(0, fromStartIndex); if (isBracketCanPartnership(text)) { return fromStartIndex; } } return 0; } /** * 判断括号"()"是否匹配,并不会判断排列顺序是否正确 * * @param text 要判断的文本 * @return 如果匹配返回TRUE,否则返回FALSE */ private static boolean isBracketCanPartnership(String text) { if (text == null || (getIndexOfCount(text, ‘(‘) != getIndexOfCount(text, ‘)‘))) { return false; } return true; } /** * 得到一个字符在另一个字符串中出现的次数 * * @param text 文本 * @param ch 字符 */ private static int getIndexOfCount(String text, char ch) { int count = 0; for (int i = 0; i < text.length(); i++) { count = (text.charAt(i) == ch) ? count + 1 : count; } return count; } }
OK,改造完成,又得一波验证跟测试。
解决方案二:Freemarker
解决方案验证没问题,其实有一个更简洁的方案。
1 使用 Freemarker 的渲染功能 ,渲染SQL模板(达到类似 Mybatis 的能力)
2 使用数据库保存 SQL 模板;
个人觉得这个方案有比用 Mybatis 更好的优势,SQL 有问题了,用 DML 改下 SQL 模板,都不用改代码了!
下面放出一个 Freemaker 渲染 SQL 模板示例代码:
import java.io.StringWriter; import java.util.HashMap; import java.util.Map; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import freemarker.cache.StringTemplateLoader; import freemarker.template.Configuration; import freemarker.template.Template; /** * Freemarker 测试 * @author zhangjl * @date 2018年3月9日 */ public class FreemarkerTest { private static final Logger LOG = LoggerFactory.getLogger(FreemarkerTest.class); @Test public void test1() { try { /** * <pre> * SELECT * FROM shop WHERE 1 = 1 <#if id??>AND id = ${id?c} </#if> <#if order??>ORDER BY ${order} <#if sort??> ${sort} </#if> </#if> * </pre> */ String sqlTemplate = "SELECT * FROM shop WHERE 1 = 1 <#if id??>AND id = ${id?c} </#if> <#if order??>ORDER BY ${order} <#if sort??> ${sort} </#if> </#if>"; String sqlId = "shop.list"; Map<String, Object> paramMap = new HashMap<>(); paramMap.put("id", 1L); String sql = getSql(sqlId, sqlTemplate, paramMap); LOG.info("解析后的SQL={}", sql); } catch (Exception e) { LOG.error("解析SQL异常", e); } } public String getSql(String sqlId, String sqlTemplate, Map<String, Object> paramMap) throws Exception { StringTemplateLoader loader = new StringTemplateLoader(); loader.putTemplate(sqlId, sqlTemplate); Configuration conf = new Configuration(); conf.setTemplateLoader(loader); Template template = conf.getTemplate(sqlId, "utf-8"); StringWriter writer = new StringWriter(); template.process(paramMap, writer); String sql = writer.toString(); writer.close(); return sql; } }