标签:sem 地方 latest discovery root 代码 ber catch rod
Lagom是出品Akka的Lightbend公司推出的一个微服务框架,目前最新版本为1.6.2。Lagom一词出自瑞典语,意为“适量”。
?? https://www.lagomframework.com/documentation/1.6.x/scala/Home.html
Lagom框架坚持,微服务是按服务边界Boundary将系统切分为若干个组成部分的结果,这意味着要使它们与限界上下文Bounded Context、业务功能和模块隔离等要求保持一致,才能达到可伸缩性和弹性要求,从而易于部署和管理。因此,在设计微服务时应考虑大小是否“Lagom”,而非是否足够“Micro”。
Lagom框架大量使用了Jonas Bonér所著Reactive Microservices Architecture: Design Principles For Distributed Systems一书的设计理念和思想,所以推荐在使用Lagom之前先阅读此书。]
“船大好顶浪,船小好调头”——Jonas认为,将庞大的系统分割为若干独立的更小粒度的部分,同时将管理权限适当下放,可以使这些独立的部分更快地做出决断,以适应外部环境的不断变化。
Lagom开发环境要求:
Lagom提供了Giter8模板,方便利用sbt构造一个Hello World项目结构,确保在开发前验证生成工具和项目已正确配置。Hello World包括Hello与Stream两个微服务,每个微服务包括API与实现两个子项目。Lagom会自动配置诸如持久化、服务定位等基础设施,并且支持发现并加载微服务的热更新。
sbt new lagom/lagom-scala.g8
hello → Project root
└ hello-api → hello api project
└ hello-impl → hello implementation project
└ hello-stream-api → hello-stream api project
└ hello-stream-impl → hello-stream implementation project
└ project → sbt configuration files
└ build.properties → Marker for sbt project
└ plugins.sbt → sbt plugins including the declaration for Lagom itself
└ build.sbt → Your project build file
使用sbt里的runAll一次性启动所有服务后(包括Cassandra、Kafka,若是手动则需要自己一个个启动包括基础服务在内的所有服务),在http://localhost:9000/api/hello/World
处将得到显示了一行“Hello World”的页面。
Service
派生,声明服务的API以及服务的回复消息。Service.descriptor
方法,该方法返回一个Descriptor
,用于定义服务的名称、REST端点路径、Kafka消息主题,以及REST路径与服务API方法的映射关系,等等。HelloWorldBehavior
采用Ask模式进行通信,按entityRef(id).ask[Message](replyTo => msg(replyTo)).map(reply => ...)
的样式进行ask与map配对。其中,entityRef是该Actor在集群里的引用:clusterSharding.entityRefFor(HelloWorldState.typeKey, id)
。hello(id)
实际是委托useGreeting(id)
方法完成的,所以将World换作其他内容亦可。使用sbt将服务绑定到特定网络地址(默认是localhost):
lazy val biddingImpl = (project in file("biddingImpl"))
.enablePlugins(LagomScala)
.settings(lagomServiceAddress := "0.0.0.0")
即便是将服务部署到不同的物理主机上,服务的端口号也将保持前后一致(总是使用特定的某个端口号),该端口号基于以下算法:
通常情况下,无需关心上述细节,因为很少会发生这样的冲突。当然,也可以按下列方式手动指定端口号:
lazy val usersImpl = (project in file("usersImpl"))
.enablePlugins(LagomScala)
.settings(lagomServiceHttpPort := 11000)
如果嫌默认的端口范围[49152, 65535]不合适,可以指定端口范围,不过范围越窄,发生冲突的机率就越大:
lagomServicesPortRange in ThisBuild := PortRange(40000, 45000)
开发模式,是指在sbt或者maven支持下,启动和调试各种服务。在这种环境下,对代码做出修改后,Lagom将负责后续的编译和重新加载工作,相应的服务会自动重启。
在sbt里进行如下配置,将会使用一个自签名的证书为服务启用HTTPS并指定其端口,确保服务之间能通过HTTPS进行调用,但同时服务网关Service Gateway将仍旧只能使用HTTP:
lagomServiceEnableSsl in ThisBuild := true
lagomServiceHttpsPort := 20443
在客户端,则建议使用诸如Play-WS或者Akka-HTTP Client API这样的HTTPS Client框架。
服务定位子Service Locator,是确保用于发现其他服务并与之联系的组件。Lagom内置的缺省Locator有以下特性:
localhost
,可使用lagomServiceLocatorAddress in ThisBuild := "0.0.0.0"
修改之。lagomServiceLocatorPort in ThisBuild := 10000
修改之。lagomUnmanagedServices in ThisBuild := Map("weather" -> "http://localhost:3333")
,然后再使用。网关Gateway,相当于Service Locator的代理,用于防止对Locator的不当访问。Lagom内置的缺省Gateway有以下特性:
localhost
,可使用lagomServiceGatewayAddress in ThisBuild := "0.0.0.0"
修改之。lagomServiceGatewayPort in ThisBuild := 9010
修改之。lagomServiceGatewayImpl in ThisBuild := "netty"
指定之。在开发环境下,使用runAll时默认会启动Locator与Gateway。需要手动启停时,分别执行lagomServiceLocatorStart
和lagomServiceLocatorStop
任务即可。
如要需要禁用内置的Locater与Gateway,则使用lagomServiceLocatorEnabled in ThisBuild := false
禁用之。之后便需要自己提供一个Locator的实现,并需要牢记每个服务的端口号以建立相互联系。
Cassandra是Apache提供的一个分布式、可扩展的NoSQL数据库。它以KeySpace为单位,其中包含若干个表Table或者列族Column Family,每个列族可以有不同的列(相当于RDBMS中的字段)并可自由添加列,每个行可以拥有不同的列,并支持索引。Cassandra支持的数据类型除常见的原生类型外,为List、Map和Set提供了直接支持,提供了TTL数据到期自动删除功能,并且可以自定义数据类型。
Lagom内置了一个Cassandra服务,作为Event Sourcing的事件持久化平台。
lagomCassandraPort in ThisBuild := 9042
指定之。lagomCassandraCleanOnStart in ThisBuild := true
使之在每次服务启动时清空数据库。dev-embedded-cassandra.yaml
对Cassandra进行配置,可以使用lagomCassandraYamlFile in ThisBuild := Some((baseDirectory in ThisBuild).value / "project" / "cassandra.yaml")
另行指定特定YAML配置文件。lagomCassandraJvmOptions in ThisBuild := Seq("-Xms256m", "-Xmx1024m", "-Dcassandra.jmx.local.port=4099")
指定相应的JVM参数。lagomCassandraMaxBootWaitingTime in ThisBuild := 0.seconds
修改之。lagomCassandraStart
和lagomCassandraStop
手动启停。lagomCassandraEnabled in ThisBuild := false
禁用之,再使用lagomUnmanagedServices in ThisBuild := Map("cas_native" -> "tcp://localhost:9042")
注册本地Cassandra实例即可使用之。Kafka是Apache提供的一个分布式的流处理平台,简单讲可以理解为一个生产者-消费者结构的、集群条件下的消息队列Message Queue。每条消息流Stream以主题Topic作为唯一区别,每个Topic下可以有多个相同的分区Partition,分区里的消息都有一个Offset作为序号以确保按顺序被消费。Kafka依赖ZooKeeper提供的集群功能部署其节点。
Lagom内置了一个Kafka服务:
lagomKafkaPort in ThisBuild := 10000
修改之lagomKafkaZookeeperPort in ThisBuild := 9999
修改之kafka-server.properties
配置Kafka运行参数,可以使用lagomKafkaPropertiesFile in ThisBuild := Some((baseDirectory in ThisBuild).value / "project" / "kafka-server.properties")
另行指定配置文件。lagomKafkaJvmOptions in ThisBuild := Seq("-Xms256m", "-Xmx1024m")
指定相应的JVM参数。<your-project-root>/target/lagom-dynamic-projects/lagom-internal-meta-project-kafka/target/log4j_output
,而Kafka的提交日志将保存在<your-project-root>/target/lagom-dynamic-projects/lagom-internal-meta-project-kafka/target/logs
。lagomKafkaStart
和lagomKafkaStop
手动启停。lagomKafkaEnabled in ThisBuild := false
禁用之,再使用lagomKafkaAddress in ThisBuild := "localhost:10000"
指定Kafka实例即可使用之。如果是本地实例,用lagomKafkaPort in ThisBuild := 10000
指定端口即可。每一个Lagom服务都由一个接口进行描述。该接口的内容不仅包括接口方法的声明和实现,同时还定义了接口的元数据如何被映射到底层的传输协议。
服务的每个接口方法都要求返回一个ServiceCall[Request, Response]
,其中Request或Response可以是Akka的NotUsed
:
trait ServiceCall[Request, Response] {
def invoke(request: Request): Future[Response]
}
每一个Lagom服务在覆写的descriptor中都将返回一个服务描述子Service Descriptor。以下便是声明了一个叫作hello的服务,该服务提供了sayHello的API:
trait HelloService extends Service {
def sayHello: ServiceCall[String, String]
override def descriptor = {
import Service._
named("hello").withCalls(call(sayHello))
}
}
每个服务调用都必须有一个唯一的标识符,以保证调用能最终映射到正确的API方法上。这个标识符可以是静态的一个字符串名称,也可以在运行时动态生成。默认情况下,API方法的名称即该调用的标识符。
使用namedCall指定强命名的标识符,服务hello里的API方法sayHello的调用名为hello,在REST架构下的相应路径为/hello
named("hello").withCalls(namedCall("hello", sayHello))
使用pathCall指定基于路径的标识符,类似于字符串中用$
引导的内插值,此处用:
引导内插变量。Lagom为此提供了一个隐式的PathParamSerializer
,用于从路径中提取String
、Int
、Boolean
或者UUID
类型的内插变量。比如以下便是提取了路径中类型为long
的orderId和类型为String
的itemId值,作为参数传递给API方法。在作参数映射时,默认将按从路径中从左至右提取的顺序进行映射。
?? 这与ASP.NET MVC等一些HTTP框架采用的方法是类似的,所以要注意以构建RESTful应用的思路贯彻学习始终。
def getItem(orderId: Long, itemId: String): ServiceCall[NotUsed, Item]
override def descriptor = {
import Service._
named("orders").withCalls(pathCall("/order/:orderId/item/:itemId", getItem _))
}
提取查询串中的参数时,则使用的?
起始、以&
分隔的形式:
def getItems(orderId: Long, pageNo: Int, pageSize: Int): ServiceCall[NotUsed, Seq[Item]]
override def descriptor = {
import Service._
named("orders").withCalls(pathCall("/order/:orderId/items?pageNo&pageSize", getItems _))
}
在REST架构下,Lagom会努力正确实现上述映射,并且在有Request消息时使用POST方法,否则使用GET方法。
REST标识符用于完全REST形式的调用,和pathCall非常类似,区别只是REST标识符可指定HTTP调用方法:
def addItem(orderId: Long): ServiceCall[Item, NotUsed]
def getItem(orderId: Long, itemId: String): ServiceCall[NotUsed, Item]
def deleteItem(orderId: Long, itemId: String): ServiceCall[NotUsed, NotUsed]
def descriptor = {
import Service._
import com.lightbend.lagom.scaladsl.api.transport.Method
named("orders").withCalls(
restCall(Method.POST, "/order/:orderId/item", addItem _),
restCall(Method.GET, "/order/:orderId/item/:itemId", getItem _),
restCall(Method.DELETE, "/order/:orderId/item/:itemId", deleteItem _)
)
}
每个服务API都需要指定Request和Response的消息类型,可以用akka.NotUsed
作为占位符,分为两种形式:
object
或者case class
表达的常见消息,它们将在内存的缓冲区中被序列化后进行传输。如果Request与Response均是这类严格消息,则调用将会是同步的,严格按请求-回复的顺序进行。Source
。Lagom将使用WebSocket协议进行流的传输。如果Request与Response均是流消息时,任何一方关闭时,WebSocket将完全关闭。如果只有一方是流消息,则严格消息仍按序列化方式进行传输,而WebSocket将始终保持打开状态,直到另一个方向关闭为止。以下分别是单向流和双向流消息的示例:
def tick(interval: Int): ServiceCall[String, Source[String, NotUsed]]
def descriptor = {
import Service._
named("clock").withCalls(pathCall("/tick/:interval", tick _))
}
def sayHello: ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]
def descriptor = {
import Service._
named("hello").withCalls(call(this.sayHello))
}
Lagom通过定义隐式的MessageSerializer
,为call、namedCall、pathCall和restCall提供了消息的序列化支持。对String类型的消息和Play框架JSON格式的消息,Lagom提供了内置的序列化器。除此以外,可以自定义序列化器。(参考:消息序列化器)
使用Play-JSON时,通常是用case class和companion object配合,case class定义消息的结构,companion object定义消息的格式:
case class User(
id: Long,
name: String,
email: Option[String]
)
object User {
import play.api.libs.json._
implicit val format: Format[User] = Json.format[User]
}
对应的JSON结果为:
{
"id": 12345,
"name": "John Smith",
"email": "john.smith@example.org"
}
属性可以是Option,这样如果为None,则Play-JSON在序列化时不会解析它、反序列化时不会生成该属性。如果case class还内嵌了其他case class,则被嵌入的case class也需要定义它的format。
服务的实现,即实现之前声明的服务描述子trait。对服务API中声明的每个方法,使用ServiceCall的工厂方法apply,传入一个Request => Future[Response]
,返回一个ServiceCall
。
(?? 注意:Lagom大量使用函数作为返回值,从而充分发挥了FP组合高阶函数的优势。)
class HelloServiceImpl extends HelloService {
override def sayHello = ServiceCall { name => Future.successful(s"Hello $name!") }
}
当消息不是普通的严格消息而是流消息时,需要使用Akka Stream来处理它。对应前面单向流的例子,它的实现如下:
override def tick(intervalMs: Int) = ServiceCall { tickMessage =>
Future.successful(
Source
.tick(
// 消息被发送前的延迟
intervalMs.milliseconds,
// 消息发送的间隔
intervalMs.milliseconds,
// 将被发送的消息
tickMessage
)
.mapMaterializedValue(_ => NotUsed)
)
}
如果某些时候需要处理消息的头部信息(通常是HTTP),那么Lagom提供了ServiceCall的派生类ServerServiceCall作为支持。ServerServiceCall将ServiceCall里的Header单独取出,提供了invokeWithHeaders方法,该方法第一个参数是RequestHeader,另一个参数才是Request本身,这样直接将invoke委托给invokeWithHeaders,从而方便在函数体中使用handleRequestHeader
和handleResponseHeader
对头部信息进行处理(尽管ServiceCall本身也支持这2个处理函数)。
override def sayHello = ServerServiceCall { (requestHeader, name) =>
val user = requestHeader.principal
.map(_.getName)
.getOrElse("No one")
val response = s"$user wants to say hello to $name"
val responseHeader = ResponseHeader.Ok.withHeader("Server", "Hello service")
Future.successful((responseHeader, response))
}
ServerServiceCall工厂方法有一个版本可以同时处理Request与Response的头部信息,但也有不带处理头部信息的版本。后者虽然看起来与ServiceCall没什么区别,从而显得多此一举,但实际这是为满足组合服务调用的需求而存在的。
组合服务调用,类似于把ServerServiceCall通过依赖注入传递给需要包裹在外层的日志、权限、过滤等切面服务,从而实现AOP切入到核心的服务API调用上。
在AOP切入的实现上,Lagom采取了组合高阶函数的方法。这就象是抹了一层又一层奶油的生日蛋糕,只有切开了才能看到最里层真正想要吃到的蛋糕。相比使用共享的线程变量等方法,不仅通过类型系统发挥了编译检查的优势而更加安全,并且还通过构造表达式树而提供了延迟计算的功能。
// AOP: Log
def logged[Request, Response](serviceCall: ServerServiceCall[Request, Response]) =
ServerServiceCall.compose { requestHeader =>
println(s"Received ${requestHeader.method} ${requestHeader.uri}")
serviceCall
}
override def sayHello = logged(ServerServiceCall { name =>
Future.successful(s"Hello $name!")
})
// AOP: Authentication
trait UserStorage {
def lookupUser(username: String): Future[Option[User]]
}
def authenticated[Request, Response](serviceCall: User => ServerServiceCall[Request, Response]) = {
// composeAsync允许异步地返回要调用的服务API
ServerServiceCall.composeAsync { requestHeader =>
// First lookup user
val userLookup = requestHeader.principal
.map(principal => userStorage.lookupUser(principal.getName))
.getOrElse(Future.successful(None))
// Then, if it exists, apply it to the service call
userLookup.map {
case Some(user) => serviceCall(user)
case None => throw Forbidden("User must be authenticated to access this service call")
}
}
}
override def sayHello = authenticated { user =>
ServerServiceCall { name =>
// 注意:因为闭包,此处可访问经AOP切入带来的user
Future.successful(s"$user is saying hello to $name")
}
}
依赖注入,Dependency Injection,是把本应由服务承担的创建其自身依赖的责任移交给外部的框架,改由DI框架负责生产对象并维护彼此的关联,最终形成完整的对象图(Object Graph),从而达到公示服务所需依赖、消灭代码中硬编码的new操作、降低耦合度的目的。
在Scala语言里最常见的一个DI模式,是用trait的Self Type特性实现的蛋糕模式(Thin Cake Pattern)。
更复杂的示例,请参见 ?? Real-World Scala: Dependency Injection
trait Stuffing {
val stuffing: String
}
// 使用Self Type特性,限定继承了Cake的子类,必须同时也继承了Stuffing
// 此处Stuffing还可以with其他trait,从而实现多重注入
trait Cake { this: Stuffing =>
def flavour: String = this.stuffing
}
object LemonCake extends Cake with Stuffing { ... }
在FP的世界,实现DI的另一个选择是Reader Monad。
case class Reader[R, A](run: R => A) {
def map[B](f: A => B): Reader[R, B] =
Reader(r => f(run(r)))
def flatMap[B](f: A => Reader[R, B]): Reader[R, B] =
Reader(r => f(run(r)).run(r))
}
def balance(accountNo: String) = Reader((repo: AccountRepository) => repo.balance(accountNo))
?? Dependency Injection in Scala using MacWire
?? MacWire使用指南中涉及的参考知识:轨道交通编组站
Lagom使用了MacWire作为默认的DI框架(当然也可以换Spring之类的其他DI框架)。
MacWire主要使用宏wire[T]
进行DI,它会从标注了@Inject
的构造子、非私有的主要构造子、Companion Object里的apply()
方法里查找依赖关系。然后再根据依赖项的类型,按以下顺序查找匹配的参数项:
使用MacWire相关事项:
如果是隐式参数,MacWire将会跳过它,使它仍按Scala语法关于隐式参数的方法进行处理。
只要类型匹配,此处的值可以是val
、lazy val
或者用def
定义的无参函数。
需要根据条件使用不同的依赖项实现时,用类似lazy val component = if (condition) then wire[implementationA] else wire[implementationB]
的方法,定义好component
的值即可。
需要在依赖项上切入拦截器Interceptor时,先在依赖项定义处声明一个拦截器def logInterceptor : Interceptor
,再用lazy val component = logInterceptor(wire[Component])
绑定拦截声明,最后在使用依赖项进行实现的地方用lazy val logInterceptor = { ... }
给出具体实现即可。
trait ShuntingModule {
lazy val pointSwitcher: PointSwitcher =
logEvents(wire[PointSwitcher])
lazy val trainCarCoupler: TrainCarCoupler =
logEvents(wire[TrainCarCoupler])
lazy val trainShunter = wire[TrainShunter]
def logEvents: Interceptor
}
object TrainStation extends App {
val modules = new ShuntingModule
with LoadingModule
with StationModule {
lazy val logEvents = ProxyingInterceptor { ctx =>
println("Calling method: " + ctx.method.getName())
ctx.proceed()
}
}
modules.trainStation.prepareAndDispatchNextTrain()
}
默认情况下,使用lazy val component = wire[Component]
声明在当前范围内唯一的依赖项(Singleton)。如果需要在每次调用时都创建新的依赖项(Dependent),则换作def
即可。
除Singleton和Dependent两种生命周期外,MacWire还支持其他形式的生命周期,方法类似使用拦截器。
trait StationModule extends ShuntingModule with LoadingModule {
lazy val trainDispatch: TrainDispatch =
session(wire[TrainDispatch])
lazy val trainStation: TrainStation =
wire[TrainStation]
def session: Scope
}
object TrainStation extends App {
val modules = new ShuntingModule
with LoadingModule
with StationModule {
lazy val session = new ThreadLocalScope
}
// implement a filter which attaches the session to the scope
// use the filter in the server
modules.trainStation.prepareAndDispatchNextTrain()
}
对需要参数进行构造的依赖项,使用lazy val component = (parameters) => wire[Component]
这样的工厂方法进行声明,展开后将变成new Component(parameters)
。对在trait里声明的依赖项,则改用def component(parameters: Parameter) = wire[Component]
的方式定义工厂方法。
在具体使用参数化的依赖项时,使用wireWith(real_parameter)代入实际参数。
需要区别同一类依赖项的不同实例时,可以用trait作为标记tag。一种方法是用new Component(...) with tag
细化其子类型,另一种是在声明依赖值的类型后面用@@ tag
作标记。
trait Regular
trait Liquid
class TrainStation(
trainShunter: TrainShunter,
regularTrainLoader: TrainLoader with Regular,
liquidTrainLoader: TrainLoader with Liquid,
trainDispatch: TrainDispatch) { ... }
lazy val regularTrainLoader = new TrainLoader(...) with Regular
lazy val liquidTrainLoader = new TrainLoader(...) with Liquid
lazy val trainStation = wire[TrainStation]
///////////////////////////////////////
class TrainStation(
trainShunter: TrainShunter,
regularTrainLoader: TrainLoader @@ Regular,
liquidTrainLoader: TrainLoader @@ Liquid,
trainDispatch: TrainDispatch) { ... }
lazy val regularTrainLoader = wire[TrainLoader].taggedWith[Regular]
lazy val liquidTrainLoader = wire[TrainLoader].taggedWith[Liquid]
lazy val trainStation = wire[TrainStation]
在Lagom中使用MacWire进行DI是可行的,但前述的服务定位子基本上只能满足开发环境的需求,生产环境还是要换用Akka Discovery Service Locator之类更强大的定位子框架。
在完成组件的拼装后,还需要一个启动子启动应用程序。Lagom为此提供了Play框架ApplicationLoader
的简化版本LagomApplicationLoader
。其中的loadDevMode
与load
是必须的,前者用with LagomDevModeComponents
加载Lagom内嵌的服务定位子,用于开发环境;后者可以指定生产环境下的服务定位子,如果返回NoServiceLoader
将意味着不提供服务定位。而describeService
是可选的,它主要用于声明服务的API,为外部管理框架或者脚本语言提供服务的Meta信息,以方便进行动态配置。服务无数据Metadata,又称为ServiceInfo,主要包括服务的名称以及一个访问控制列表ACL。通常这些Metadata会由框架自动生成,前提是在使用withCalls
声明描述子时,用withAutoAcl(true)
激活即可。
import com.lightbend.lagom.scaladsl.server._
import com.lightbend.lagom.scaladsl.api.ServiceLocator
import com.lightbend.lagom.scaladsl.devmode.LagomDevModeComponents
class HelloApplicationLoader extends LagomApplicationLoader {
override def loadDevMode(context: LagomApplicationContext) =
new HelloApplication(context) with LagomDevModeComponents
override def load(context: LagomApplicationContext) =
new HelloApplication(context) {
override def serviceLocator = ServiceLocator.NoServiceLocator
}
override def describeService = Some(readDescriptor[HelloService])
}
最后,只需在配置application.conf里用play.application.loader = com.example.HelloApplicationLoader
指定启动子,即完成了应用程序的装配。
Lagom根据用途不同,通常将组件分为若干类型:
?? 详细列表:https://www.lagomframework.com/documentation/1.6.x/scala/ScalaComponents.html
定义并实现服务后,该服务即可被其他服务或其他类型的客户消费使用。Lagom将根据服务描述子的内容,通过使用ServiceClient
的implement
宏进行绑定,自动生成一个调用该服务的框架,然后就可以象调用本地对象的方法一样invoke
服务提供的API了。
// 先绑定HelloService
abstract class MyApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with AhcWSComponents {
lazy val helloService = serviceClient.implement[HelloService]
}
// 然后在另一个服务MyService里消费HelloService
class MyServiceImpl(helloService: HelloService)(implicit ec: ExecutionContext) extends MyService {
override def sayHelloLagom = ServiceCall { _ =>
val result: Future[String] = helloService.sayHello.invoke("Lagom")
result.map { response => s"Hello service said: $response" }
}
}
当服务使用了流消息时,Lagom将在消费端使用带有最大帧长度参数的WebSocket进行通信。该参数定义了可以发送的消息的最大尺寸,可以在application.conf中配置。
#This configures the websocket clients used by this service.
#This is a global configuration and it is currently not possible to provide different configurations if multiple websocket services are consumed.
lagom.client.websocket {
#This parameter limits the allowed maximum size for the messages flowing through the WebSocket. A similar limit exists on the server side, see:
#https://www.playframework.com/documentation/2.6.x/ScalaWebSockets#Configuring-WebSocket-Frame-Length
frame.maxLength = 65536
}
断路器Circuit Breaker,如同电路保险丝,可以在服务崩溃时迅速熔断,从而避免产生更大面积的连锁反应。
断路器有以下3种状态:
call-timeout
,导致失败计数值增长,在超过设定的max-failures
,断路器跳到断开状态。CircuitBreakerOpenException
异常而快速失败。reset-timeout
后,断路器跳入半开状态。reset-timeout
周期后再重新进行试探。Lagom为所有消费端对服务的调用都默认启用了断路器。尽管断路器在消费端配置并使用,但用于绑定到服务的标识符则要由服务提供者定义和确定相应粒度。默认情况下,一个断路器实例将覆盖对一个服务所有API的调用。但通过设置断路器标识符,可以为每个API方法设置唯一的断路器标识符,以便为每个API方法使用单独的断路器实例。或者通过在某几个API方法上设置使用相同的标识符,实现对API调用的断路保护分组。
下例中,sayHi
将使用默认的断路器,而hiAgain
将使用断路器hello2
:
def descriptor: Descriptor = {
import Service._
named("hello").withCalls(
namedCall("hi", this.sayHi),
namedCall("hiAgain", this.hiAgain).withCircuitBreaker(CircuitBreaker.identifiedBy("hello2"))
)
}
对应的application.conf中配置如下:
lagom.circuit-breaker {
# will be used by sayHi method
hello.max-failures = 5
# will be used by hiAgain method
hello2 {
max-failures = 7
reset-timeout = 30s
}
# Change the default call-timeout will be used for both sayHi and hiAgain methods
default.call-timeout = 5s
}
默认情况下,Lagom的客户端会将所有4字头和5字头的HTTP响应都映射为相应的异常,而断路器会将所有的异常都视作失败,从而触发熔断。通过设置白名单,可以忽略某些类型的异常。断路器完整的断路器配置选项如下:
# Circuit breakers for calls to other services are configured
# in this section. A child configuration section with the same
# name as the circuit breaker identifier will be used, with fallback
# to the `lagom.circuit-breaker.default` section.
lagom.circuit-breaker {
# Default configuration that is used if a configuration section
# with the circuit breaker identifier is not defined.
default {
# Possibility to disable a given circuit breaker.
enabled = on
# Number of failures before opening the circuit.
max-failures = 10
# Duration of time after which to consider a call a failure.
call-timeout = 10s
# Duration of time in open state after which to attempt to close
# the circuit, by first entering the half-open state.
reset-timeout = 15s
# A whitelist of fqcn of Exceptions that the CircuitBreaker
# should not consider failures. By default all exceptions are
# considered failures.
exception-whitelist = []
}
}
Lightbend推荐使用ScalaTest和Spec2作为Lagom的测试框架。类似Akka ActorTestKit,可以使用Lagom提供的ServiceTest
工具包对服务进行测试。
为每个测试创建一个单独的服务实例,其关键步骤包括:
ServiceTest.withServer(config)(lagomApplication)(testBlock)
的结构进行测试。LocalServiceLocator
,以启用默认的定位子服务。import com.lightbend.lagom.scaladsl.server.LocalServiceLocator
import com.lightbend.lagom.scaladsl.testkit.ServiceTest
import org.scalatest.AsyncWordSpec
import org.scalatest.Matchers
// 1. 启用AsyncWordSpec
class HelloServiceSpec extends AsyncWordSpec with Matchers {
"The HelloService" should {
// 2. 使用ServiceTest.withServer进行测试
"say hello" in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
// 3. 启用默认的服务定位子
new HelloApplication(ctx) with LocalServiceLocator
} { server =>
// 4. 创建一个客户端进行服务调用
val client = server.serviceClient.implement[HelloService]
client.sayHello.invoke("Alice").map { response =>
response should ===("Hello Alice!")
}
}
}
}
若要由多个测试共享一个服务实例,则需要使用ServiceTest.startServer()代替withServer(),然后在beforeAll与afterAll中启动和停止服务。
import com.lightbend.lagom.scaladsl.server.LocalServiceLocator
import com.lightbend.lagom.scaladsl.testkit.ServiceTest
import org.scalatest.AsyncWordSpec
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll
class HelloServiceSpec extends AsyncWordSpec with Matchers with BeforeAndAfterAll {
lazy val server = ServiceTest.startServer(ServiceTest.defaultSetup) { ctx =>
new HelloApplication(ctx) with LocalServiceLocator
}
lazy val client = server.serviceClient.implement[HelloService]
"The HelloService" should {
"say hello" in {
client.sayHello.invoke("Alice").map { response =>
response should ===("Hello Alice!")
}
}
}
protected override def beforeAll() = server
protected override def afterAll() = server.stop()
}
若要启用Cluster、PubSub或者Persistence支持,则需要在withServer
第1个参数中启用。若需要调用其他服务,可以在第2个参数中构造要调用服务的Stub或者Mock。注意事项包括:
withCassandra
或者withJdbc
启用Persistence后。会自动启用Clusterlazy val server = ServiceTest.startServer(ServiceTest.defaultSetup.withCluster) { ctx =>
new HelloApplication(ctx) with LocalServiceLocator {
override lazy val greetingService = new GreetingService {
override def greeting = ServiceCall { _ =>
Future.successful("Hello")
}
}
}
}
Lagom没有为HTTPS提供客户端框架,因此只有借用Play-WS、Akka HTTP或者Akka gRPC等框架创建使用SSL连接的客户端。而在服务端,可以使用withSsl激活SSL支持,随后框架将会为测试端自动打开一个随机的端口并提供一个javax.net.ssl.SSLContext
类型的上下文环境。接下来,客户端就可以使用testServer提供的httpsPort
和sslContext
连接到服务端并发送Request。Lagom测试工具提供的证书仅限于CN=localhost,所以该上下文SSLContext也只会信任本地的testServer,这就要求在发送请求时也设置好相应的权限,否则服务器将会拒绝该请求。目前,Lagom还无法为测试服务器设置不同的SSL证书。
"complete a WS call over HTTPS" in {
val setup = defaultSetup.withSsl()
ServiceTest.withServer(setup)(new TestTlsApplication(_)) { server =>
implicit val actorSystem = server.application.actorSystem
implicit val ctx = server.application.executionContext
// To explicitly use HTTPS on a test you must create a client of your own
// and make sure it uses the provided SSLContext
val wsClient = buildCustomWS(server.clientSslContext.get)
// use `localhost` as authority
val url = s"https://localhost:${server.playServer.httpsPort.get}/api/sample"
val response = wsClient.url(url).get().map { _.body[String] }
whenReady(response, timeout) { r => r should be("sample response") }
}
}
在测试支持流消息的服务时,需要搭配Akka Streams TestKit进行测试。
"The EchoService" should {
"echo" in {
// Use a source that never terminates (concat Source.maybe)
// so we don‘t close the upstream, which would close the downstream
val input = Source(List("msg1", "msg2", "msg3")).concat(Source.maybe)
client.echo.invoke(input).map { output =>
val probe = output.runWith(TestSink.probe(server.actorSystem))
probe.request(10)
probe.expectNext("msg1")
probe.expectNext("msg2")
probe.expectNext("msg3")
probe.cancel
succeed
}
}
}
在服务测试中,可以通过额外编写PersistEntityTestDriver,使用持久化实体Persistent Entity进行与数据库无关的功能测试。
Play、Akka和Lagom均出自Lightbend,因此Lagom使用Play JSON作为消息的序列化框架,算是开箱即用了。
Lagom的序列化器,通常是与服务描述子放在一起的一个MessageSerializer类型的隐式变量。也可以在withCalls()
声明服务API时,在call、namedCall、pathCall、restCall或者topic方法里显式地指定分别用于Request和Response的序列化器。
Lagom通过借用Play的JSON序列化器,对case class
进行JSON格式的序列化。该JSON序列化器有一个主要方法是jsValueFormatMessageSerializer
,可以使用它在case classs
的companion object
里指定其他格式的JSON模板。同时,Lagom的MessageSerializer也为NotUsed
、Done
、String
等类型提供了缺省的非JSON格式的序列化支持,比如MessageSerializer.StringMessageSerializer
。
trait HelloService extends Service {
def sayHello: ServiceCall[String, String]
override def descriptor = {
import Service._
named("hello").withCalls(
call(sayHello)(
MessageSerializer.StringMessageSerializer,
MessageSerializer.StringMessageSerializer
)
)
}
}
在companion object里定义不同的JSON格式,随后在服务描述子中显式地选择使用:
import play.api.libs.json._
import play.api.libs.functional.syntax._
case class MyMessage(id: String)
object MyMessage {
implicit val format: Format[MyMessage] = Json.format
val alternateFormat: Format[MyMessage] = {
// 将id映射为JSON串里的identifier
(__ \ "identifier")
.format[String]
.inmap(MyMessage.apply, _.id)
}
}
trait MyService extends Service {
def getMessage: ServiceCall[NotUsed, MyMessage]
def getMessageAlternate: ServiceCall[NotUsed, MyMessage]
override def descriptor = {
import Service._
named("my-service").withCalls(
call(getMessage),
call(getMessageAlternate)(
// Request的序列化器
implicitly[MessageSerializer[NotUsed, ByteString]],
// Response的序列化器
MessageSerializer.jsValueFormatMessageSerializer(
implicitly[MessageSerializer[JsValue, ByteString]],
// 指定JSON格式
MyMessage.alternateFormat
)
)
)
}
}
Lagom的trait MessageSerializer也可以用来实现自定义的序列化器,派生的StrictMessageSerializer和StreamedMessageSerializer分别用于严格消息与流消息。其中,严格消息的序列化类型为二进制串ByteString
,而流消息的则是Source[ByteString, _]
。
在实现自定义的序列化器之前,有几个关键性概念:
Content-Type
和Accept
,以及MIME Type Scheme的版本号,或者根据服务的配置方式直接从URL中提取。NegotiatedSerializer
和NegotiatedDeserializer
负责。内容协商在多数情况下并不是必要的,所以并不是一定需要实现的。
/* ------------ String Protocol ------------ */
import akka.util.ByteString
import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedSerializer
import com.lightbend.lagom.scaladsl.api.transport.DeserializationException
import com.lightbend.lagom.scaladsl.api.transport.MessageProtocol
import com.lightbend.lagom.scaladsl.api.transport.NotAcceptable
import com.lightbend.lagom.scaladsl.api.transport.UnsupportedMediaType
// 注意:`charset`由构造子传入,传递给MessageProtocol用于定义协议使用的字符集。
class PlainTextSerializer(val charset: String) extends NegotiatedSerializer[String, ByteString] {
override val protocol = MessageProtocol(Some("text/plain"), Some(charset))
def serialize(s: String) = ByteString.fromString(s, charset)
}
import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedDeserializer
class PlainTextDeserializer(val charset: String) extends NegotiatedDeserializer[String, ByteString] {
def deserialize(bytes: ByteString) =
bytes.decodeString(charset)
}
/* ------------ JSON Protocol ------------ */
import play.api.libs.json.Json
import play.api.libs.json.JsString
class JsonTextSerializer extends NegotiatedSerializer[String, ByteString] {
override val protocol = MessageProtocol(Some("application/json"))
def serialize(s: String) =
ByteString.fromString(Json.stringify(JsString(s)))
}
import scala.util.control.NonFatal
class JsonTextDeserializer extends NegotiatedDeserializer[String, ByteString] {
def deserialize(bytes: ByteString) = {
try {
Json.parse(bytes.iterator.asInputStream).as[String]
} catch {
case NonFatal(e) => throw DeserializationException(e)
}
}
}
import com.lightbend.lagom.scaladsl.api.deser.StrictMessageSerializer
import scala.collection.immutable
class TextMessageSerializer extends StrictMessageSerializer[String] {
// 支持的协议
override def acceptResponseProtocols = List(
MessageProtocol(Some("text/plain")),
MessageProtocol(Some("application/json"))
)
/* ------ Serializer -----*/
// Client发出Request时还不需要协商协议,故使用最简单的文本协议
def serializerForRequest = new PlainTextSerializer("utf-8")
def serializerForResponse(accepted: immutable.Seq[MessageProtocol]) = accepted match {
case Nil => new PlainTextSerializer("utf-8")
case protocols =>
protocols
.collectFirst {
case MessageProtocol(Some("text/plain" | "text/*" | "*/*" | "*"), charset, _) =>
new PlainTextSerializer(charset.getOrElse("utf-8"))
case MessageProtocol(Some("application/json"), _, _) =>
new JsonTextSerializer
}
.getOrElse {
throw NotAcceptable(accepted, MessageProtocol(Some("text/plain")))
}
}
/* ------ Deserializer for Client and Service -----*/
def deserializer(protocol: MessageProtocol) = protocol.contentType match {
case Some("text/plain") | None =>
new PlainTextDeserializer(protocol.charset.getOrElse("utf-8"))
case Some("application/json") =>
new JsonTextDeserializer
case _ =>
// 抛出异常在生产环境并不可取,因为对于诸如WebSocket之类的应用,浏览器不允许设置ContentType
// 此时应返回一个缺省的反序列化器更为妥当。
throw UnsupportedMediaType(protocol, MessageProtocol(Some("text/plain")))
}
}
另一个用Protocol buffers实现的例子:
import akka.util.ByteString
import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedDeserializer
import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedSerializer
import com.lightbend.lagom.scaladsl.api.deser.StrictMessageSerializer
import com.lightbend.lagom.scaladsl.api.transport.MessageProtocol
import scala.collection.immutable
class ProtobufSerializer extends StrictMessageSerializer[Order] {
private final val serializer = {
new NegotiatedSerializer[Order, ByteString]() {
override def protocol: MessageProtocol =
MessageProtocol(Some("application/octet-stream"))
def serialize(order: Order) = {
val builder = ByteString.createBuilder
order.writeTo(builder.asOutputStream)
builder.result
}
}
}
private final val deserializer = {
new NegotiatedDeserializer[Order, ByteString] {
override def deserialize(bytes: ByteString) =
Order.parseFrom(bytes.iterator.asInputStream)
}
}
override def serializerForRequest = serializer
override def deserializer(protocol: MessageProtocol) = deserializer
override def serializerForResponse(acceptedMessageProtocols: immutable.Seq[MessageProtocol]) = serializer
}
在服务描述子中可以加入头部过滤器Header Filter,用于实现协商协议、身份验证或访问授权的沟通。过滤器会根据预设的条件,对服务与客户端双方通信的消息进行转换或修改。
下例是一个典型的过滤器实现。如果没有特意进行绑定,那么所有的服务默认都将会使用它,并使用ServicePrincipal来标识带有服务名称的客户端。在客户端,当Client发出Request时,过滤器将会在头部附加User-Agent,Lagom默认会自动将服务名称作为ServicePrinciple。而在服务端,则会读取Request中的User-Agent,并将其值设置为Request的Principle。
?? 切记:头部过滤器仅用于通信双方的协商,以确定双方采取何种方式进行后续的通信,而不应用来执行实际的验证逻辑。验证逻辑属于业务逻辑的组成部分,应当放在服务API及其组合当中。
object UserAgentHeaderFilter extends HeaderFilter {
override def transformClientRequest(request: RequestHeader): RequestHeader = {
request.principal match {
case Some(principal: ServicePrincipal) =>
request.withHeader(HeaderNames.USER_AGENT, principal.serviceName)
case _ => request
}
}
override def transformServerRequest(request: RequestHeader): RequestHeader = {
request.getHeader(HeaderNames.USER_AGENT) match {
case Some(userAgent) =>
request.withPrincipal(ServicePrincipal.forServiceNamed(userAgent))
case _ =>
request
}
}
override def transformServerResponse(response: ResponseHeader,request: RequestHeader): ResponseHeader = response
override def transformClientResponse(response: ResponseHeader, request: RequestHeader): ResponseHeader = response
}
类似服务使用compose进行组合,头部过滤器也可以使用HeaderFilter.composite
方法进行组合。
?? 注意:过滤器在发送消息与接收消息时适用的顺序是刚好相反的。对于Request,越后加入的过滤器越新鲜就越早被运用。
class VerboseFilter(name: String) extends HeaderFilter {
private val log = LoggerFactory.getLogger(getClass)
def transformClientRequest(request: RequestHeader) = {
log.debug(name + " - transforming Client Request")
request
}
def transformServerRequest(request: RequestHeader) = {
log.debug(name + " - transforming Server Request")
request
}
def transformServerResponse(response: ResponseHeader, request: RequestHeader) = {
log.debug(name + " - transforming Server Response")
response
}
def transformClientResponse(response: ResponseHeader, request: RequestHeader) = {
log.debug(name + " - transforming Client Response")
response
}
}
/* ----- 按下列顺序组合过滤器 ----- */
def descriptor = {
import Service._
named("hello")
.withCalls(
call(sayHello)
)
.withHeaderFilter(
HeaderFilter.composite(
new VerboseFilter("Foo"),
new VerboseFilter("Bar")
)
)
}
在服务端,控制台得到的输出将是如下的顺序:
[debug] Bar - transforming Server Request
[debug] Foo - transforming Server Request
[debug] Foo - transforming Server Response
[debug] Bar - transforming Server Response
Lagom在设计错误处理机制时,遵循了以下一些原则:
Lagom为异常提供了trait ExceptionSerializer
,用于将异常信息序列化为JSON等序列化格式,或者是特定的错误编码或响应代码。ExceptionSerializer会将异常转换为RawExceptionMessage
,其中包括对应HTTP响应代码或WebSocket关闭代码的状态码、消息主体,以及一个关于协议的描述子(在HTTP,此处对应响应头部信息中的Content Type)。
默认的ExceptionSerializer使用Play JSON将异常信息序列化为JSON格式。除非是在开发模式下,否则它只会返回TransportException
派生异常类的详细信息,最常见的派生类包括NotFound
和PolicyViolation
。Lagom通常也允许Client抛出这一类的异常,也允许自己创建或实现一个TransportException的派生类实例,不过前提是Client得认识这个异常并且知道如何进行反序列化。
从Lagom 1.5.0版本开始,允许使用Play Router对Lagom的服务进行扩展。该功能在需要将Lagom服务与既存的Play Router进行集成时显得更为实用。
在Lagom做DI时,可以注入额外的Router:
override lazy val lagomServer = serverFor[HelloService](wire[HelloServiceImpl])
.additionalRouter(wire[SomePlayRouter])
此例基于ScalaSirdRouter
实现,它将为服务添加一个/api/files
的路径,用于接收支持断点续传数据的POST请求。
import play.api.mvc.DefaultActionBuilder
import play.api.mvc.PlayBodyParsers
import play.api.mvc.Results
import play.api.routing.Router
import play.api.routing.sird._
class FileUploadRouter(action: DefaultActionBuilder, parser: PlayBodyParsers) {
val router = Router.from {
case POST(p"/api/files") =>
action(parser.multipartFormData) { request =>
val filePaths = request.body.files.map(_.ref.getAbsolutePath)
Results.Ok(filePaths.mkString("Uploaded[", ", ", "]"))
}
}
}
override lazy val lagomServer =
serverFor[HelloService](wire[HelloServiceImpl])
.additionalRouter(wire[FileUploadRouter].router)
在控制台使用命令curl -X POST -F "data=@somefile.txt" -v http://localhost:65499/api/files
,即可发出上传请求。
由于额外的路由并没有在服务描述子中定义,因此当服务使用了服务网关时,这些外部的路由将不会自动被服务网关暴露,因此需要显式地将它的路径加入访问控制列表ACL(Access Control List)里,然后通过服务网关访问它们。
trait HelloService extends Service {
def hello(id: String): ServiceCall[NotUsed, String]
final override def descriptor = {
import Service._
named("hello")
.withCalls(
pathCall("/api/hello/:id", hello _).withAutoAcl(true)
)
.withAcls(
// extra ACL to expose additional router endpoint on ServiceGateway
ServiceAcl(pathRegex = Some("/api/files"))
)
}
}
然后在控制台使用命令curl -X POST -F "data=@somefile.txt" -v http://localhost:9000/api/files
访问它们。
由于额外的路由不是服务API的一部分,所以无法从Lagom生成的客户端进行直接的访问,而只能改用Play-WS之类的客户端去访问其暴露的HTTP端点。
这部分将使用Akka Typed按照DDD的方法实现一个CQRS架构的Lagom服务。
在这个ShoppingCart的示例里(?? 完整代码),使用了Dock作为容器,包装了Zookeeper、Kafka和PostGres服务,所以在演示前需要用docker-compose up -d
进行初始化。同时,因为Lagom将使用Read-Side Processor
和Topic Producer
对AggregateEventTag标记的Event进行消费,所以需要用AkkaTaggerAdapter.fromLagom把AggregateEventTag转换为Akka能理解的Tag类型。而在读端,Lagom提供的ReadSideProcessor
,在Cassandra和Relational数据库插件支持下,可以为实现CQRS的读端提供完整的支持。
?? 使用JDBC驱动数据库存储Journal时,分片标记数不能超过10。这是该插件已知的一个Bug,如果超过了10,将会导致某些事件被多次传递。
/* ----- State & Handlers ----- */
final case class ShoppingCart(
items: Map[String, Int],
// checkedOutTime defines if cart was checked-out or not:
// case None, cart is open
// case Some, cart is checked-out
checkedOutTime: Option[Instant] = None){
/* ----- Command Handlers ----- */
def applyCommand(cmd: ShoppingCartCommand): ReplyEffect[ShoppingCartEvent, ShoppingCart] =
if (isOpen) {
cmd match {
case AddItem(itemId, quantity, replyTo) => onAddItem(itemId, quantity, replyTo)
case Checkout(replyTo) => onCheckout(replyTo)
case Get(replyTo) => onGet(replyTo)
}
} else {
cmd match {
case AddItem(_, _, replyTo) => Effect.reply(replyTo)(Rejected("Cannot add an item to a checked-out cart"))
case Checkout(replyTo) => Effect.reply(replyTo)(Rejected("Cannot checkout a checked-out cart"))
case Get(replyTo) => onGet(replyTo)
}
}
private def onAddItem(itemId: String, quantity: Int, replyTo: ActorRef[Confirmation]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
if (items.contains(itemId))
Effect.reply(replyTo)(Rejected(s"Item ‘$itemId‘ was already added to this shopping cart"))
else if (quantity <= 0)
Effect.reply(replyTo)(Rejected("Quantity must be greater than zero"))
else
Effect
.persist(ItemAdded(itemId, quantity))
.thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart)))
}
private def onCheckout(replyTo: ActorRef[Confirmation]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
if (items.isEmpty)
Effect.reply(replyTo)(Rejected("Cannot checkout an empty shopping cart"))
else
Effect
.persist(CartCheckedOut(Instant.now()))
.thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart)))
}
private def onGet(replyTo: ActorRef[Summary]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
Effect.reply(replyTo)(toSummary(shoppingCart = this))
}
private def toSummary(shoppingCart: ShoppingCart): Summary = {
Summary(shoppingCart.items, shoppingCart.checkedOut)
}
/* ----- Event Handlers ----- */
def applyEvent(evt: ShoppingCartEvent): ShoppingCart =
evt match {
case ItemAdded(itemId, quantity) => onItemAdded(itemId, quantity)
case CartCheckedOut(checkedOutTime) => onCartCheckedOut(checkedOutTime)
}
private def onItemAdded(itemId: String, quantity: Int): ShoppingCart =
copy(items = items + (itemId -> quantity))
private def onCartCheckedOut(checkedOutTime: Instant): ShoppingCart = {
copy(checkedOutTime = Option(checkedOutTime))
}
}
Lagom借用Akka Cluster Sharding实现了服务的集群部署,确保在任意时刻,有且只有一个聚合的实例在集群内活动,相同类型的多个聚合实例则均匀分布在各个节点之上。为此,需要给ShoppingCart设定一个EntityKey,并向其工厂方法传入EntityContext。
?? 在Command-Reply-Event的设计上,要注意区别Reply是回复调用者的副作用,它可以是确认Command是否成功执行,或者是返回聚合的内部状态(要注意区别查询命令与Read-Side)。而Event才是因Command而导致聚合发生变化的正作用,是要持久化的。相应的,Effect.Reply()
用于聚合未发生变化的场合,而Effect.persist().thenReply()
则用于聚合发生变化之后。
/* ----- Factory & Protocols ----- */
object ShoppingCart {
val empty = ShoppingCart(items = Map.empty)
val typeKey: EntityTypeKey[ShoppingCartCommand] = EntityTypeKey[ShoppingCartCommand]("ShoppingCart")
/* ----- Commands ----- */
trait CommandSerializable
sealed trait ShoppingCartCommand extends CommandSerializable
final case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[Confirmation])
extends ShoppingCartCommand
final case class Checkout(replyTo: ActorRef[Confirmation]) extends ShoppingCartCommand
final case class Get(replyTo: ActorRef[Summary]) extends ShoppingCartCommand
/* ----- Replies (will not be persisted) ----- */
sealed trait Confirmation
final case class Accepted(summary: Summary) extends Confirmation
final case class Rejected(reason: String) extends Confirmation
final case class Summary(items: Map[String, Int], checkedOut: Boolean)
/* ----- Events (will be persisted) ----- */
sealed trait ShoppingCartEvent extends AggregateEvent[ShoppingCartEvent] {
override def aggregateTag: AggregateEventTagger[ShoppingCartEvent] = ShoppingCartEvent.Tag
}
final case class ItemAdded(itemId: String, quantity: Int) extends ShoppingCartEvent
final case class CartCheckedOut(eventTime: Instant) extends ShoppingCartEvent
/* ----- Tag for read-side consuming ----- */
object ShoppingCartEvent {
// will produce tags with shard numbers from 0 to 9
val Tag: AggregateEventShards[ShoppingCartEvent] =
AggregateEventTag.sharded[ShoppingCartEvent](numShards = 10)
}
def apply(entityContext: EntityContext[ShoppingCartCommand]): Behavior[ShoppingCartCommand] = {
EventSourcedBehavior
.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
emptyState = ShoppingCart.empty,
commandHandler = (cart, cmd) => cart.applyCommand(cmd),
eventHandler = (cart, evt) => cart.applyEvent(evt)
)
// convert tag of Lagom to tag of Akka
.withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))
// snapshot every 100 events and keep at most 2 snapshots on db
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
}
}
class ShoppingCartLoader extends LagomApplicationLoader {
override def load(context: LagomApplicationContext): LagomApplication =
new ShoppingCartApplication(context) with AkkaDiscoveryComponents
override def loadDevMode(context: LagomApplicationContext): LagomApplication =
new ShoppingCartApplication(context) with LagomDevModeComponents
override def describeService = Some(readDescriptor[ShoppingCartService])
}
trait ShoppingCartComponents
extends LagomServerComponents
with SlickPersistenceComponents
with HikariCPComponents
with AhcWSComponents {
implicit def executionContext: ExecutionContext
override lazy val lagomServer: LagomServer =
serverFor[ShoppingCartService](wire[ShoppingCartServiceImpl])
override lazy val jsonSerializerRegistry: JsonSerializerRegistry =
ShoppingCartSerializerRegistry
// Initialize the sharding for the ShoppingCart aggregate.
// See https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html
clusterSharding.init(
Entity(ShoppingCart.typeKey) { entityContext =>
ShoppingCart(entityContext)
}
)
}
abstract class ShoppingCartApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with ShoppingCartComponents
with LagomKafkaComponents {}
服务API的相关操作实际将由幕后的Actor负责实现,因此在服务的实现里需要访问Actor的实例,为此需要通过ClusterSharding.entityRefFor获取其EntityRef。
class ShoppingCartServiceImpl(
clusterSharding: ClusterSharding,
persistentEntityRegistry: PersistentEntityRegistry
)(implicit ec: ExecutionContext) extends ShoppingCartService {
def entityRef(id: String): EntityRef[ShoppingCartCommand] =
clusterSharding.entityRefFor(ShoppingCart.typeKey, id)
}
在获得Reference后,便可使用Ask模式与Actor进行交互。Ask返回的是Future[Response],因此示例将Future[Summary]
投射为ShoppingCartView
,方便Read-Side使用。
implicit val timeout = Timeout(5.seconds)
override def get(id: String): ServiceCall[NotUsed, ShoppingCartView] = ServiceCall { _ =>
entityRef(id)
.ask(reply => Get(reply))
.map(cartSummary => asShoppingCartView(id, cartSummary))
}
final case class ShoppingCartItem(itemId: String, quantity: Int)
final case class ShoppingCartView(id: String, items: Seq[ShoppingCartItem], checkedOut: Boolean)
private def asShoppingCartView(id: String, cartSummary: Summary): ShoppingCartView = {
ShoppingCartView(
id,
cartSummary.items.map((ShoppingCartItem.apply _).tupled).toSeq,
cartSummary.checkedOut
)
}
这部分内容是对比上一节直接使用Akka Persistence Typed建立领域模型的方式,改从传统的Lagom Persistence迁移到Akka Persistence Typed的角度进行了详细的分步讲解。所以,如果是全新开始设计的Lagom的服务,建议直接使用Akka Persistence Typed进行实现,只有此前用Lagom Persistence实现的服务才需要考虑迁移。
由于内容主要涉及Akka Typed,可参考我的博客内容:
Lagom与下列数据库平台兼容:
参考链接:
Cassandra需要至少3个KeySpace:
cassandra-journal.keyspace = my_service_journal
cassandra-snapshot-store.keyspace = my_service_snapshot
lagom.persistence.read-side.cassandra.keyspace = my_service_read_side
这只是Lagom官方文档的一小部分内容,算是对如何使用该框架实现服务的初窥,有兴趣的请移步官方网站寻找更多的内容。
标签:sem 地方 latest discovery root 代码 ber catch rod
原文地址:https://www.cnblogs.com/Abbey/p/13380265.html