ScalikeJDBC在覆盖JDBC基本功能上是比较完整的,而且实现这些功能的方式比较简洁,运算效率方面自然会稍高一筹了。理论上用ScalikeJDBC作为一种JDBC-Engine还是比较理想的:让它处于各种JDBC工具库和数据库实例之间接收JDBC运算指令然后连接目标数据库进行相关运算后返回结果。一般来说,各种JDBC工具库如ORM,FRM软件通过各自的DSL在复杂的数据库表关系环境内进行数据管理编程,最终产生相关的SQL语句即(prepared)statement+parameters传递给指定类型的数据库JDBC驱动程序去运算并产生结果。如果这样描述,那么JDBC-Engine主要的功能就是支持下面这个函数:
jdbcRunSQL(context: JDBCContext): JDBCResultSet
这个函数的用户提供一个JDBCContext类型值,然后由jdbcRunSQL进行接下来的运算并返回结果。从这个角度分析,JDBCContext最起码需要提供下面的属性:
1、数据库连接:选择数据库连接池
2、运算参数:fetchSize, queryTimeout,queryTag。这几个参数都针对当前运算的SQL
3、Query参数:
Query类型:select/execute/update、单条/成批、前置/后置query、generateKey
SQL语句:statement:Seq[String]、parameters: Seq[Option[Seq[Any]]]
下面就是JDBCContext类型定义:
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import scalikejdbc._
object JDBCContext {
type SQLTYPE = Int
val SQL_SELECT: Int = 0
val SQL_EXECUTE = 1
val SQL_UPDATE = 2
def returnColumnByIndex(idx: Int) = Some(idx)
def returnColumnByName(col: String) = Some(col)
}
case class JDBCContext(
dbName: Symbol,
statements: Seq[String],
parameters: Seq[Seq[Any]] = Nil,
fetchSize: Int = 100,
queryTimeout: Option[Int] = None,
queryTags: Seq[String] = Nil,
sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
batch: Boolean = false,
returnGeneratedKey: Option[Any] = None,
// no return: None, return by index: Some(1), by name: Some("id")
preAction: Option[PreparedStatement => Unit] = None,
postAction: Option[PreparedStatement => Unit] = None)
重新考虑了一下,觉着把jdbc读写分开两个函数来实现更容易使用,因为这样比较符合编程模式和习性。所以最好把sqlType=SQL_SELECT类型SQL独立一个函数出来运算:
def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
ctx.sqlType match {
case SQL_SELECT => {
val params: Seq[Any] = ctx.parameters match {
case Nil => Nil
case p@_ => p.head
}
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor(""))
ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
ctx.queryTags.foreach(rawSql.tags(_))
rawSql.fetchSize(ctx.fetchSize)
implicit val session = NamedAutoSession(ctx.dbName)
val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
sql.collection.apply[C]()
}
case _ => throw new IllegalStateException("sqlType must be ‘SQL_SELECT‘!")
}
}
还需要提供noExtractor函数来符合SQLToCollectionImpl类型的参数款式要求:
private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => throw new IllegalStateException(message) }
我们来测试用一下jdbcQueryResult:
import scalikejdbc._
import JDBCEngine._
import configdbs._
import org.joda.time._
object JDBCQueryDemo extends App {
ConfigDBsWithEnv("dev").setupAll()
val ctx = JDBCContext(
dbName = ‘h2,
statements = Seq("select * from members where id = ?"),
parameters = Seq(Seq(2))
)
//data model
case class Member(
id: Long,
name: String,
description: Option[String] = None,
birthday: Option[LocalDate] = None,
createdAt: DateTime)
//data row converter
val toMember = (rs: WrappedResultSet) => Member(
id = rs.long("id"),
name = rs.string("name"),
description = rs.stringOpt("description"),
birthday = rs.jodaLocalDateOpt("birthday"),
createdAt = rs.jodaDateTime("created_at")
)
val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)
println(s"members in vector: $vecMember")
val ctx1 = ctx.copy(dbName = ‘mysql)
val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})
println(s"selected name: $names")
val ctx2 = ctx1.copy(dbName = ‘postgres)
val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})
println(s"selected id+name: $idname")
}
如果我们使用Slick-DSL进行数据库管理编程后应该如何与JDBC-Engine对接:
object SlickDAO {
import slick.jdbc.H2Profile.api._
case class CountyModel(id: Int, name: String)
case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
def name = column[String]("NAME",O.Length(64))
def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
}
val CountyQuery = TableQuery[CountyTable]
val filter = "Kansas"
val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
val statement = qry.result.statements.head
}
import SlickDAO._
val slickCtx = JDBCContext(
dbName = ‘h2,
statements = Seq(statement),
)
val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{
rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))})
vecCounty.foreach(r => println(s"${r.id},${r.name}"))
输出正确。
下面就是本次示范的源代码:
build.sbt
name := "learn-scalikeJDBC"
version := "0.1"
scalaVersion := "2.12.4"
// Scala 2.10, 2.11, 2.12
libraryDependencies ++= Seq(
"org.scalikejdbc" %% "scalikejdbc" % "3.1.0",
"org.scalikejdbc" %% "scalikejdbc-test" % "3.1.0" % "test",
"org.scalikejdbc" %% "scalikejdbc-config" % "3.1.0",
"com.h2database" % "h2" % "1.4.196",
"mysql" % "mysql-connector-java" % "6.0.6",
"org.postgresql" % "postgresql" % "42.2.0",
"commons-dbcp" % "commons-dbcp" % "1.4",
"org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
"com.zaxxer" % "HikariCP" % "2.7.4",
"com.jolbox" % "bonecp" % "0.8.0.RELEASE",
"com.typesafe.slick" %% "slick" % "3.2.1",
"ch.qos.logback" % "logback-classic" % "1.2.3"
)
resources/application.conf 包括H2,MySQL,PostgreSQL
# JDBC settings
test {
db {
h2 {
driver = "org.h2.Driver"
url = "jdbc:h2:tcp://localhost/~/slickdemo"
user = ""
password = ""
poolInitialSize = 5
poolMaxSize = 7
poolConnectionTimeoutMillis = 1000
poolValidationQuery = "select 1 as one"
poolFactoryName = "commons-dbcp2"
}
}
db.mysql.driver = "com.mysql.cj.jdbc.Driver"
db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
db.mysql.user = "root"
db.mysql.password = "123"
db.mysql.poolInitialSize = 5
db.mysql.poolMaxSize = 7
db.mysql.poolConnectionTimeoutMillis = 1000
db.mysql.poolValidationQuery = "select 1 as one"
db.mysql.poolFactoryName = "bonecp"
# scallikejdbc Global settings
scalikejdbc.global.loggingSQLAndTime.enabled = true
scalikejdbc.global.loggingSQLAndTime.logLevel = info
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
dev {
db {
h2 {
driver = "org.h2.Driver"
url = "jdbc:h2:tcp://localhost/~/slickdemo"
user = ""
password = ""
poolFactoryName = "hikaricp"
numThreads = 10
maxConnections = 12
minConnections = 4
keepAliveConnection = true
}
mysql {
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/testdb"
user = "root"
password = "123"
poolInitialSize = 5
poolMaxSize = 7
poolConnectionTimeoutMillis = 1000
poolValidationQuery = "select 1 as one"
poolFactoryName = "bonecp"
}
postgres {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/testdb"
user = "root"
password = "123"
poolFactoryName = "hikaricp"
numThreads = 10
maxConnections = 12
minConnections = 4
keepAliveConnection = true
}
}
# scallikejdbc Global settings
scalikejdbc.global.loggingSQLAndTime.enabled = true
scalikejdbc.global.loggingSQLAndTime.logLevel = info
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
HikariConfig.scala HikariCP连接池实现
package configdbs
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import com.typesafe.config._
import java.util.concurrent.TimeUnit
import java.util.Properties
import scalikejdbc.config._
import com.typesafe.config.Config
import com.zaxxer.hikari._
import scalikejdbc.ConnectionPoolFactoryRepository
/** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal {
import scala.collection.JavaConverters._
def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default
def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
def getDurationOr(path: String, default: => Duration = Duration.Zero) =
if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default
def getPropertiesOr(path: String, default: => Properties = null): Properties =
if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default
def toProperties: Properties = {
def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
val props = new Properties(null)
m.foreach { case (k, cv) =>
val v =
if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
else if(cv.unwrapped eq null) null
else cv.unwrapped.toString
if(v ne null) props.put(k, v)
}
props
}
toProps(c.root.asScala)
}
def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
def getStringOpt(path: String) = Option(getStringOr(path))
def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
}
object ConfigExtensionMethods {
@inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
}
trait HikariConfigReader extends TypesafeConfigReader {
self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix =>
import ConfigExtensionMethods.configExtensionMethods
def getFactoryName(dbName: Symbol): String = {
val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
}
def hikariCPConfig(dbName: Symbol): HikariConfig = {
val hconf = new HikariConfig()
val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
// Connection settings
if (c.hasPath("dataSourceClass")) {
hconf.setDataSourceClassName(c.getString("dataSourceClass"))
} else {
Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
}
hconf.setJdbcUrl(c.getStringOr("url", null))
c.getStringOpt("user").foreach(hconf.setUsername)
c.getStringOpt("password").foreach(hconf.setPassword)
c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)
// Pool configuration
hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
val numThreads = c.getIntOr("numThreads", 20)
hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
hconf.setPoolName(c.getStringOr("poolName", dbName.name))
hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))
// Equivalent of ConnectionPreparer
hconf.setReadOnly(c.getBooleanOr("readOnly", false))
c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
hconf.setCatalog(c.getStringOr("catalog", null))
hconf
}
}
import scalikejdbc._
trait ConfigDBs {
self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>
def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
getFactoryName(dbName) match {
case "hikaricp" => {
val hconf = hikariCPConfig(dbName)
val hikariCPSource = new HikariDataSource(hconf)
if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
Class.forName(hconf.getDriverClassName)
}
ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))
}
case _ => {
val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
val cpSettings = readConnectionPoolSettings(dbName)
if (driver != null && driver.trim.nonEmpty) {
Class.forName(driver)
}
ConnectionPool.add(dbName, url, user, password, cpSettings)
}
}
}
def setupAll(): Unit = {
loadGlobalSettings()
dbNames.foreach { dbName => setup(Symbol(dbName)) }
}
def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
ConnectionPool.close(dbName)
}
def closeAll(): Unit = {
ConnectionPool.closeAll
}
}
object ConfigDBs extends ConfigDBs
with TypesafeConfigReader
with StandardTypesafeConfig
with HikariConfigReader
case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
with TypesafeConfigReader
with StandardTypesafeConfig
with HikariConfigReader
with EnvPrefix {
override val env = Option(envValue)
}
JDBCEngine.scala jdbcQueryResult函数实现
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import scalikejdbc._
object JDBCContext {
type SQLTYPE = Int
val SQL_SELECT: Int = 0
val SQL_EXECUTE = 1
val SQL_UPDATE = 2
def returnColumnByIndex(idx: Int) = Some(idx)
def returnColumnByName(col: String) = Some(col)
}
case class JDBCContext(
dbName: Symbol,
statements: Seq[String],
parameters: Seq[Seq[Any]] = Nil,
fetchSize: Int = 100,
queryTimeout: Option[Int] = None,
queryTags: Seq[String] = Nil,
sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
batch: Boolean = false,
returnGeneratedKey: Option[Any] = None,
// no return: None, return by index: Some(1), by name: Some("id")
preAction: Option[PreparedStatement => Unit] = None,
postAction: Option[PreparedStatement => Unit] = None)
object JDBCEngine {
import JDBCContext._
private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
throw new IllegalStateException(message)
}
def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
ctx.sqlType match {
case SQL_SELECT => {
val params: Seq[Any] = ctx.parameters match {
case Nil => Nil
case p@_ => p.head
}
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("boom!"))
ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
ctx.queryTags.foreach(rawSql.tags(_))
rawSql.fetchSize(ctx.fetchSize)
implicit val session = NamedAutoSession(ctx.dbName)
val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
sql.collection.apply[C]()
}
case _ => throw new IllegalStateException("sqlType must be ‘SQL_SELECT‘!")
}
}
}
JDBCQueryDemo.scala 功能测试代码
import scalikejdbc._
import JDBCEngine._
import configdbs._
import org.joda.time._
object JDBCQueryDemo extends App {
ConfigDBsWithEnv("dev").setupAll()
val ctx = JDBCContext(
dbName = ‘h2,
statements = Seq("select * from members where id = ?"),
parameters = Seq(Seq(2))
)
//data model
case class Member(
id: Long,
name: String,
description: Option[String] = None,
birthday: Option[LocalDate] = None,
createdAt: DateTime)
//data row converter
val toMember = (rs: WrappedResultSet) => Member(
id = rs.long("id"),
name = rs.string("name"),
description = rs.stringOpt("description"),
birthday = rs.jodaLocalDateOpt("birthday"),
createdAt = rs.jodaDateTime("created_at")
)
val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)
println(s"members in vector: $vecMember")
val ctx1 = ctx.copy(dbName = ‘mysql)
val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})
println(s"selected name: $names")
val ctx2 = ctx1.copy(dbName = ‘postgres)
val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})
println(s"selected id+name: $idname")
object SlickDAO {
import slick.jdbc.H2Profile.api._
case class CountyModel(id: Int, name: String)
case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
def name = column[String]("NAME",O.Length(64))
def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
}
val CountyQuery = TableQuery[CountyTable]
val filter = "Kansas"
val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
val statement = qry.result.statements.head
}
import SlickDAO._
val slickCtx = JDBCContext(
dbName = ‘h2,
statements = Seq(statement),
)
val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{
rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))})
vecCounty.foreach(r => println(s"${r.id},${r.name}"))
}