标签:接收 隐式 反向 初学者 例子 数据库 场景 发送消息 ++
前言
本文原文标题:《The Neophyte’s Guide to Scala Part 15: Dealing With Failure in Actor Systems》,作者:Daniel Westheide, 原文链接:http://danielwestheide.com/blog/2013/03/20/the-neophytes-guide-to-scala-part-15-dealing-with-failure-in-actor-systems.html, 本文是作者撰写的《Scala初学者指南》系列文章里的第15篇,是对Scala的Actor编程模型和Akka的一个入门介绍,在征得作者授权之后,翻译为中文发表在我的技术博客上。本文原文链接: http://blog.csdn.net/bluishglc/article/details/53195602 转载请注明出处!
目录
正文
在前一篇文章里,我向大家介绍了Scala并发处理的第二块基石:Actor模型,它是Future-Promise
模型的一种补充。你已经学习了如何定义和创建Actor,如何给它们发送消息以及Actor如何处理消息,比如它可能因为一条消息而修改它的内部状态或是异步地发送一个应答消息给发送方。
为了能激发你对Actor模型的兴趣,我们前面忽略了一些重要的概念,在你准备开发一个复杂的基于Actor的应用之前,你必须要先了解它们。
Actor模型意在帮助你提升系统的容错性,在这篇文章里,我们将了解到基于Actor的系统如何进行错误处理,这将完全不同于传统的分层的服务器架构。
错误处理与Akka里的一些核心概念密切相关,因此,本文会先对相关的理念和组件进行介绍。
想要弄清Actor内部发生错误时会发生什么,需要先了解一下Actor并发方案依赖的一个重要思想,这个思想是Akka允许我们创建容错的并发应用的基础,它就是:Actor是按照一定的层次结构组织的。
什么意思呢?首先,每个Actor必定有一个父Actor(后文也会称之为父节点),同时它也可以创建自己的子Actor(后文也会称之为子节点)。你可以把Actor系统看作是一个由众多Actor组成的金字塔,父Actor会监管它的子Actor,就像现实生活里那样,当孩子还在蹒跚学步时父母要帮他们站稳脚跟,后面你会看到这是怎样进行的。
前一篇文章里,我只引入了两个Actor,一个Barista
,一个Customer
。我不会再重复它们的实现细节,但会聚焦在如何实例化他们:
import akka.actor.ActorSystem
val system = ActorSystem("Coffeehouse")
val barista = system.actorOf(Props[Barista], "Barista")
val customer = system.actorOf(Props(classOf[Customer], barista), "Customer")
如你所见,我们通过ActorSystem
的actorOf
方法创建了两个Actor实例,那这两个Actor的父Actor是谁呢?是这个Actor系统吗?不,但是很接近了。Actor系统自己本身并不是一个Actor,但它有一个所谓的“守护”Actor(Guardian Actor),它会作为所有由用户定义的顶层Actor的共同父Actor,比如这里我们通过actorOf
方法创建的barista
和customer
.
Guardian Actor不应该成为系统里所有Actor的直接父节点,我们应该让顶层的Actor创建子节点并委派任务给它们。
当我们查看Actor的路径时,Actor系统的层次结构会显得非常清晰。这些Actor的URL可以清晰地描述它们的位置。你可以通过调用ActorRef
的path
方法来获取一个Actor的路径。
barista.path // => akka.actor.ActorPath = akka://Coffeehouse/user/Barista
customer.path // => akka.actor.ActorPath = akka://Coffeehouse/user/Customer
Actor的路径使用Akka定义的格式:开头是akka
协议,然后是你的Actor系统的名称,再后面是Guardian
Actor的名称(即user),再然后是通过actorOf
方法创建Actor时给Actor起的名字。对于远程的Actor,因为它们运行在不同的机器上,你还需要在路径上指定主机地址和端口。
Actor路径可以用来查找Actor。比如,不同于给Customer
的构造函数传递Barista
的引用,一个Customer
Actor可以通过一个相对路径从ActorContext
的 actorSelection
方法获取Barista
的引用:
context.actorSelection("../Barista")
通过路径查找一个Actor总是有用的,这比传递一个依赖给一个构造函数要好。 在Actor系统里,依赖越密集越容易出现问题, 并且日后很难重构。
为了说明父Actor如何监管子Actor以及如何保证系统容错性,我们再回到咖啡馆的例子,让我们给Barista
一个子Actor以便分摊一些咖啡馆的运营工作给它。
如果我们真的要给咖啡师的工作进行建模,我们很有可能会设计更多的角色来进行更细的分工,但为了不至于失去焦点,我们必须尽可能地简化我们的例子。
假设咖啡师现在有了一个Register
(收银机),它负责处理交易,打印收据以及合计销售额,下面是它的第一个版本:
import akka.actor._
object Register {
sealed trait Article
case object Espresso extends Article
case object Cappuccino extends Article
case class Transaction(article: Article)
}
class Register extends Actor {
import Register._
import Barista._
var revenue = 0
val prices = Map[Article, Int](Espresso -> 150, Cappuccino -> 250)
def receive = {
case Transaction(article) =>
val price = prices(article)
sender ! createReceipt(price)
revenue += price
}
def createReceipt(price: Int): Receipt = Receipt(price)
}
它有一份价格清单,是一个Map,它还有一个整型变量代表销售额,无论什么时候,当它收到一条“交易”信息时,它会把咖啡售价累计到销售额里,同时打印的一张收据给消息的发送方,也就是顾客。
Register
是Barista
的子节点,这意味着我们不会通过Actor系统直接创建它,而是在Barista
内部创建它,代码如下:
object Barista {
case object EspressoRequest
case object ClosingTime
case class EspressoCup(state: EspressoCup.State)
object EspressoCup {
sealed trait State
case object Clean extends State
case object Filled extends State
case object Dirty extends State
}
case class Receipt(amount: Int)
}
class Barista extends Actor {
import Barista._
import Register._
import EspressoCup._
import context.dispatcher
import akka.util.Timeout
import akka.pattern.ask
import akka.pattern.pipe
import concurrent.duration._
implicit val timeout = Timeout(4.seconds)
val register = context.actorOf(Props[Register], "Register")
def receive = {
case EspressoRequest =>
val receipt = register ? Transaction(Espresso)
receipt.map((EspressoCup(Filled), _)).pipeTo(sender)
case ClosingTime => context.stop(self)
}
}
首先, 我们定义了Barista
能够处理的消息类型。我们还引入了咖啡杯(EspressoCup
),它只有三个有限的状态,因此它的State
是sealed
。
更为有趣的部分是Barista
类的实现,导入dispatcher
、ask
和pipe
以及声明隐式变量timeout
这些工作都是必须的,因为在偏函数Reveive
里,我们要用到Akka的ask机制:当咖啡师(Barista
)接收到一份意式浓咖啡订单(EspressoRequest
)时,它会生成一条交易(Transaction
),发送给收银机,后者则会产生一个收据(Receipt
),然后咖啡师把返回的收据和一杯制作好的咖啡一起交给顾客,顾客那里收到的就是(EspressoCup, Receipt)
这样的一个Tuple。像这样委派任务给子节点然后再基于它们的返回结果进行聚合的工作方式在基于Actor的应用程序里是非常典型的。
还有,不要忘了我们不是用ActorSystem
而是用ActorContext
的actorOf
方法创建的子Actor,只有这样做,被创建出来的Actor才是调用这个方法的Actor的子Actor,这和父节点是Guardian Actor的顶层Actor的创建方法是不同的。
最后,同样作为顶层的Actor,我们的Customer
也是Guardian Actor的子节点,它是这样的:
object Customer {
case object CaffeineWithdrawalWarning
}
class Customer(coffeeSource: ActorRef) extends Actor with ActorLogging {
import Customer._
import Barista._
import EspressoCup._
def receive = {
case CaffeineWithdrawalWarning => coffeeSource ! EspressoRequest
case (EspressoCup(Filled), Receipt(amount)) =>
log.info(s"yay, caffeine for ${self}!")
}
}
抛开Barista
的Actor层次结构,让我们来看看第一次出现的ActorLogging
特质,它给我们的程序添加了日志功能,不再只是向控制台打印消息了。现在,让我们用一个Barista
和两个Customer
来组建我们的Actor系统,它们在一起应该可以很好的工作了:
import Customer._
val system = ActorSystem("Coffeehouse")
val barista = system.actorOf(Props[Barista], "Barista")
val customerJohnny = system.actorOf(Props(classOf[Customer], barista), "Johnny")
val customerAlina = system.actorOf(Props(classOf[Customer], barista), "Alina")
customerJohnny ! CaffeineWithdrawalWarning
customerAlina ! CaffeineWithdrawalWarning
试试看,你会得到来自两位满意的顾客输出的日志信息。
当然,我们真正感兴趣的不是顾客是否满意,而是出了问题会怎样?
我们的收银机是一台脆弱的设备,它的打印功能并不怎么稳定,纸张经常会卡住而导致无法打印收据。我们在收银机的伴生对象里添加一个PaperJamException
异常来代表我们所说的这个问题:
class PaperJamException(msg: String) extends Exception(msg)
然后相应地改变一下Register
的createReceipt
方法:
def createReceipt(price: Int): Receipt = {
import util.Random
if (Random.nextBoolean())
throw new PaperJamException("OMG, not again!")
Receipt(price)
}
现在,当处理交易的时候,收银机会有一半的概率会抛出PaperJamException
异常。这对我们的Actor系统或应用程序有什么影响吗?幸运的是,Akka非常健壮,它不会受到抛出异常的任何影响,但是,出问题的节点会把这种异常行为通知给它的父节点。记住,父节点总是在监管它的子节点,此时就是需要父节点决定如何处理问题的时候了。
处理子Actor异常的工作并不是由父Actor的Receive
偏函数来负责的,因为那将扰乱父Actor自己的业务逻辑,这两部分职责要被清晰地隔离开。每个Actor会定义它自己的“监管策略”(supervisor strategy),这个策略告诉Akka当它的子Actor发生某些类型的错误时它打算如何应对。
有两种基本的监管策略:OneForOneStrategy
和AllForOneStrategy
,前者意味着你只会处理你子Actor中的错误,并只会影响到出错的那个子Actor,而后者会影响到所有的子Actor。哪种策略更好取决于你的应用程序面临的场景。
不管你为你的Actor选择何种SupervisorStrategy
,你都要指定一个Decider
,它也是一个偏函数:PartialFunction[Throwable, Directive]
,Decider
允许你匹配Throwable
的某些子类型,然后决定针对出问题的子Actor(或所有的子Actor,如果你选的是all-for-one策略的话)要采取什么应对措施。
以下是Akka提供的一些在错误发生时可用的指令:
sealed trait Directive
case object Resume extends Directive
case object Restart extends Directive
case object Stop extends Directive
case object Escalate extends Directive
Resume
,意味着你认为你的子Actor并没有大问题,它的异常情况不会造成大的影响,你决定让子Actor恢复消息处理。Restart
指令会让Akka创建你的子Actor的新实例。这样做的原因是你假定子Actor或子子Actor的状态因某种方式崩溃之后无法再处理消息了。你希望通过重启Actor让它回到初始的状态。Escalate
,可能是因为当前你不知道如何处理这个错误,你把如何处理错误的决策权交给了你的父Actor。如果Actor进行了escalate,那它自己应该也做好被父Actor重启的准备。你不需要为你的每一个Actor指定监管策略。实际上,我们也没这么做过,因为默认的监管策略总会生效。它看起来像这样:
final val defaultStrategy: SupervisorStrategy = {
def defaultDecider: Decider = {
case _: ActorInitializationException ? Stop
case _: ActorKilledException ? Stop
case _: Exception ? Restart
}
OneForOneStrategy()(defaultDecider)
}
这意味着除了ActorInitializationException
和ActorKilledException
异常之外,出异常的子Actor将被重启。因此,当Register
抛出了一个PaperJamException
异常之后,它的父ActorBarista
的监管策略会让Register
重启,因为我们并不没有复盖过默认的监管策略。如果你这样试了,你会在日志里得到异常的stacktrace,但是没有关于Register
重启的信息。让我们来验证一下Register
是否真的重启了,要这样做,我们得先学习一下Actor的声明周期。
为了理解监管策略的相关指令实际做了什么,我们需要知道一些Actor生命周期相关的知识。基本上可以归结为:通过actorOf
创建时,Actor处于started
状态,之后,如果出现什么问题,它还以被restart
任意多次,最后,一个Actor会被stopped
进而消亡。在Actor的生命周期上有多个回调方法可以来重写,但了解它们的默认实现也是很重要的。让我们追一过一下:
stop
被调用之后会回调这个方法以便做一些资源释放工作,默认实现也是空的。postStop
方法去释放资源。preStart
。让我们给Register
的postRestart
方法加些日志输出来看一下它是不是在失效之后真的重启了。为此,我们让Register
继承ActorLogging
特质,然后添加如下方法:
override def postRestart(reason: Throwable) {
super.postRestart(reason)
log.info(s"Restarted because of ${reason.getMessage}")
}
现在,如果你给这两个Customer
发送一堆CaffeineWithdrawalWarning
消息,你会看到一个或者另一个的日志里会证实我们的Register
确实重启过了。
有时候反复重启一个Actor是没有意义的,比如:一个Actor要通过网络和其他服务进行通信,而服务可能在一段时间内暂时不可用。在这种情况下,一个好的解决方法是告诉Akka应该在多长时间内重启Actor,如果超出了限定的时间,就停止这个Actor进而导致了Actor的消亡。这个限制时间可以在监控策略的构造函数里配置:
import scala.concurrent.duration._
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Restart
OneForOneStrategy(10, 2.minutes) {
case _ => Restart
}
至此,我们的系统运转的平稳吗?如果纸张卡住的问题再次发生,它能自动恢复吗?让我们改一下日志输出:
override def postRestart(reason: Throwable) {
super.postRestart(reason)
log.info(s"Restarted, and revenue is $revenue cents")
}
同时也给Receive
偏函数加些日志打印,让它变成这样:
def receive = {
case Transaction(article) =>
val price = prices(article)
sender ! createReceipt(price)
revenue += price
log.info(s"Revenue incremented to $revenue cents")
}
有一个有问题的地方,在日志里,你可以看到销售额一直在增加,但是只要纸张卡住,收银机重启,销售额就会清零,因为重启意味着旧的实例会被抛弃,转而创建一个新的实例。当然,我们可以修改Barista
的监管策略,在发生PaperJamException
异常的时候让Register
只是去“恢复”Register
:
val decider: PartialFunction[Throwable, Directive] = {
case _: PaperJamException => Resume
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy()(decider.orElse(SupervisorStrategy.defaultStrategy.decider))
现在,Register
发生发生PaperJamException
异常之后就不会重启了,所以它的状态也不会被重置了。
Error Kernel
模式有时候简单地恢复一个Actor可能是最好的做法,但是如果不重启就无法解决问题怎么办呢?比方说我们假设不重启收银机就没有办法彻底清除卡住的纸张,那我们要怎么做呢?为了模拟这种情况,我们需要先设置一个boolean的flag去标记收银机是否已经处于卡住的状态(译者注:下面代码引入的变量paperJam
是为了在收银机被纸卡住的时候,维持住收银机“被卡住”的状态,这个变量将会永久地被置为true
,没有再被置为false的机会了,除非重启Actor),我们把Register
的代码改一下:
class Register extends Actor with ActorLogging {
import Register._
import Barista._
var revenue = 0
val prices = Map[Article, Int](Espresso -> 150, Cappuccino -> 250)
var paperJam = false
override def postRestart(reason: Throwable) {
super.postRestart(reason)
log.info(s"Restarted, and revenue is $revenue cents")
}
def receive = {
case Transaction(article) =>
val price = prices(article)
sender ! createReceipt(price)
revenue += price
log.info(s"Revenue incremented to $revenue cents")
}
def createReceipt(price: Int): Receipt = {
import util.Random
if (Random.nextBoolean()) paperJam = true
if (paperJam) throw new PaperJamException("OMG, not again!")
Receipt(price)
}
}
同时,我们把赋给Barista
的监管策略去掉(使用默认策略)。现在,如果不重启Register
,纸张卡住的问题会一直存在,但是重启又会丢失重要的销售额数据。这时就是error kernel
模式派上用场的时候了。它基本上就是一条简单的指导原则,你最好总是尽量去遵守:如果一个Actor要维持重要的内部状态,它就应该尽量把危险的工作委派给它的子Actor,从而避免它的状态信息在崩溃中丢失。
有时候,为每一个这样的任务去创建一个新的子Actor是有必要的,但不是必须的。这个模式的精髓是把重要的状态信息尽量地维持在顶层或接近顶层的Actor里,把容易出错的任务尽量地分配给低层Actor。我们来把这个模式应用到Register
上,我们让Register
来维护销售额,但把收据打印的工作委派给一个新的子Actor,我们可以叫它ReceiptPrinter
,以下是它的具体代码:
object ReceiptPrinter {
case class PrintJob(amount: Int)
class PaperJamException(msg: String) extends Exception(msg)
}
class ReceiptPrinter extends Actor with ActorLogging {
var paperJam = false
override def postRestart(reason: Throwable) {
super.postRestart(reason)
log.info(s"Restarted, paper jam == $paperJam")
}
def receive = {
case PrintJob(amount) => sender ! createReceipt(amount)
}
def createReceipt(price: Int): Receipt = {
if (Random.nextBoolean()) paperJam = true
if (paperJam) throw new PaperJamException("OMG, not again!")
Receipt(price)
}
}
我们再次使用一个boolean flag模拟了卡纸的异常,我们还引入了一个新的消息类型PrintJob
,可以说整个ReceiptPrinter
就是从Register
中抽取出来的一个子Actor。这样做的一个好处是它把危险的容易失败的操作从有状态的Register
里剥离了出去,同时也让代码更加简洁和易读:ReceiptPrinter
负责一个单一的任务,这让Register
也变得更加简单,它只需负责管理账务并委派任务给子Actor就可以了:
class Register extends Actor with ActorLogging {
import akka.pattern.ask
import akka.pattern.pipe
import context.dispatcher
implicit val timeout = Timeout(4.seconds)
var revenue = 0
val prices = Map[Article, Int](Espresso -> 150, Cappuccino -> 250)
val printer = context.actorOf(Props[ReceiptPrinter], "Printer")
override def postRestart(reason: Throwable) {
super.postRestart(reason)
log.info(s"Restarted, and revenue is $revenue cents")
}
def receive = {
case Transaction(article) =>
val price = prices(article)
val requester = sender
(printer ? PrintJob(price)).map((requester, _)).pipeTo(self)
case (requester: ActorRef, receipt: Receipt) =>
revenue += receipt.amount
log.info(s"revenue is $revenue cents")
requester ! receipt
}
}
我们没有为每一个Transaction
消息都创建一个ReceiptPrinter
,而是使用默认的监管策略让Printer在遇到错误时重启。对于这种看上去有些怪异的销售额记账方式,合理的解释是:我们首先向Printer索要收据,然后把交易消息的发送方和收据分装成Tuple发送给它自己(译者注:Future
的map
方法接受一个函数参数把Future代表的返回结果进行某种处理或者说转换,返回一个新的Future。(printer ? PrintJob(price)).map((requester, _)).pipeTo(self)
做的工作是:首先(printer ? PrintJob(price))
返回一个携带着一个收据Receipt
的Future
,map函数会把这个Receipt
连同交易信息的发送方(发送方是Barista
)一起封装成一个Tuple然后再发给Register
他自己,第二个case语句会处理这个发给自己的消息)。在处理这个发送给自己的消息时, 我们才最终进行了消费额的累加并把收据发送给请求方。
我们分两步做的原因是我们希望只有当收据成功打印之后再去记账。一条重要的原则是:永远不要在一个Future
里去修改一个Actor
的内部状态,我们必须采用这种间接的方式,这会确保我们只在Acotr内部去修改销售额,同时也意味着不会在别的线程上被修改(译者注,这才是最重要的原因,在Actor之外修改Actor状态有可能会是在别的线程上进行的,这样就会因为并发因导致状态出现不一致的可能)。
给sender
赋值一个val
也是有原因的:在map
一个future的时候,我们不再处于Actor的上下文里,既然sender
是一个方法,那此时它返回的引用可能就已经是别的Actor了。
现在,我们的Register
可以安全地重启了!
当然,把收据打印和账务管理放到一起的想法本身就是有问题的,前面这样做只是为演示error kernel
模式。把两者分离开本身就是更好的选择,因为它们是两个不相干的关注点。
另一件我们想提升的事情就是超时。现在,当ReceiptPrinter
发生异常时,会导致一个AskTimeoutException
异常,因为我们使用的是ask语法,当Barista
没能成功处理完一个Future
时就会抛出这个异常。既然Barista
会把经过map转换后的携带处理结果的Future传递给客户,客户就会收到一个包含AskTimeoutException
异常的Failure
。Customer
并没要求过什么东西,所以它也没有料到会有这样一条消息,事实上,它也不会处理这些消息。让我们友好一点,给顾客发送一条ComebackLater
消息,告诉他们咖啡要晚一会才能好。这么做显然会好很多,否则他们根本不知道他可能拿不到咖啡了。为了实现这个目标,让我们从AskTimeoutException
错误中恢复回来,把它转换成一条ComebackLater
消息。Barista
的偏函数Reveive
看起来会是这样的:
def receive = {
case EspressoRequest =>
val receipt = register ? Transaction(Espresso)
receipt.map((EspressoCup(Filled), _)).recover {
case _: AskTimeoutException => ComebackLater
} pipeTo(sender)
case ClosingTime => context.system.shutdown()
}
现在,Customer
知道他们后面可以碰碰运气,如果尝试的次数足够多,他们最后是可以等到他们的想要的咖啡的。
保证系统容错性的另一个重要原则是:对反向依赖于子节点的依赖要保持密切关注。有时候,你有一些Actor,它们依赖其他一些Actor,那些Actor并不是它的子Actor,这意味着你没有监管他们,所以密切关注那些Actor的状态,当出现糟糕的事情时及时地被通知是非常重要的。想一下,一个Actor负责数据库的访问,你想要那些依赖这个Actor的Actor能够很好地了解到这个Actor的状态,如果它有问题了,你可能想把你的系统切换到维护模式。针对其他一些场合,简单地使用某种备份Actor作为死亡的Actor一个替换也是一个可行的方案。任何情况下, 我们都需要监控你依赖的Actor,以便在它失效的时候你能得到消息。这是通过定义在ActorContext
上的watch
方法实现的。为了展示这一点,我们让Customer
监视Barista
,我们的顾客都是咖啡成瘾的,所以让它们依赖咖啡师也是合理的。
class Customer(coffeeSource: ActorRef) extends Actor with ActorLogging {
import context.dispatcher
context.watch(coffeeSource)
def receive = {
case CaffeineWithdrawalWarning => coffeeSource ! EspressoRequest
case (EspressoCup(Filled), Receipt(amount)) =>
log.info(s"yay, caffeine for ${self}!")
case ComebackLater =>
log.info("grumble, grumble")
context.system.scheduler.scheduleOnce(300.millis) {
coffeeSource ! EspressoRequest
}
case Terminated(barista) =>
log.info("Oh well, let‘s find another coffeehouse...")
}
}
我们让Customer
监控它依赖的coffeeSource
,我们添加一个新的消息类型Terminated
,如果我们监视的一个Actor失效,Akka会发送这种类型的消息。现在,如果我们发送一个ClosingTime
消息给Barista
告诉它关闭它自己,Customer
会收到通知,试试吧,你应该能在日志里看到相应的输出。
在本系列的这一部分里,是我们第二次讨论Akka和Actor,你已经知道了Actor系统里一些重要的组件,所有这些都是利用Akka提供的工具和背后的思想去让我们的系统变得更加的容错。仍然有很多关于Acotr模型和Akka的知识要学习,但我们现在先到这里,因为这超出了本系列的范围。在下一部分,我们会对这个系列做一个总结,同事也告诉你很多Scala的资源让你去继续深入的学习。如果你对Actor和Akka非常感兴趣,也会有相关的东西给到你。
标签:接收 隐式 反向 初学者 例子 数据库 场景 发送消息 ++
原文地址:http://blog.csdn.net/bluishglc/article/details/53195602