标签:style http color os io 使用 ar for 数据
我们在前面介绍Actor系统时说过每个Actor都是其子Actor的管理员,并且每个Actor定义了发生错误时的管理策略,策略一旦定义好,之后不能修改,就像是Actor系统不可分割的一部分。
实用错误处理
首先我们来看一个例子来显示一种处理数据存储错误的情况,这是现实中一个应用可能出现的典型错误。当然实际的应用可能针对数据源不存在时有不同的处理,这里我们使用重新连接的处理方法。
下面是例子的源码,比较长,需要仔细阅读,最好是实际运行,参考日志来理解:
2 | import akka.actor.SupervisorStrategy. _ |
3 | import scala.concurrent.duration. _ |
4 | import akka.util.Timeout |
5 | import akka.event.LoggingReceive |
6 | import akka.pattern.{ask, pipe} |
7 | import com.typesafe.config.ConfigFactory |
12 | object FaultHandlingDocSample extends App { |
16 | val config = ConfigFactory.parseString( "" " |
17 | akka.loglevel = " DEBUG " |
24 | val system = ActorSystem( "FaultToleranceSample" , config) |
25 | val worker = system.actorOf(Props[Worker], name = "worker" ) |
26 | val listener = system.actorOf(Props[Listener], name = "listener" ) |
30 | worker.tell(Start, sender = listener) |
37 | class Listener extends Actor with ActorLogging { |
42 | context.setReceiveTimeout( 15 seconds) |
45 | case Progress(percent) = > |
46 | log.info( "Current progress: {} %" , percent) |
47 | if (percent > = 100.0 ) { |
48 | log.info( "That’s all, shutting down" ) |
49 | context.system.shutdown() |
51 | case ReceiveTimeout = > |
53 | log.error( "Shutting down due to unavailable service" ) |
54 | context.system.shutdown() |
64 | final case class Progress(percent : Double) |
73 | class Worker extends Actor with ActorLogging { |
76 | import CounterService. _ |
78 | implicit val askTimeout = Timeout( 5 seconds) |
80 | override val supervisorStrategy = OneForOneStrategy() { |
81 | case _: CounterService.ServiceUnavailable = > Stop |
85 | var progressListener : Option[ActorRef] = None |
86 | val counterService = context.actorOf(Props[CounterService], name = "counter" ) |
89 | import context.dispatcher |
92 | def receive = LoggingReceive { |
93 | case Start if progressListener.isEmpty = > |
94 | progressListener = Some(sender()) |
95 | context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do) |
97 | counterService ! Increment( 1 ) |
98 | counterService ! Increment( 1 ) |
99 | counterService ! Increment( 1 ) |
101 | counterService ? GetCurrentCount map { |
102 | case CurrentCount( _ , count) = > Progress( 100.0 * count / totalCount) |
103 | } pipeTo progressListener.get |
107 | object CounterService { |
109 | final case class Increment(n : Int) |
111 | case object GetCurrentCount |
113 | final case class CurrentCount(key : String, count : Long) |
115 | class ServiceUnavailable(msg : String) extends RuntimeException(msg) |
117 | private case object Reconnect |
126 | class CounterService extends Actor { |
128 | import CounterService. _ |
134 | override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3 , |
135 | withinTimeRange = 5 seconds) { |
136 | case _: Storage.StorageException = > Restart |
138 | val key = self.path.name |
139 | var storage : Option[ActorRef] = None |
140 | var counter : Option[ActorRef] = None |
141 | var backlog = IndexedSeq.empty[(ActorRef, Any)] |
142 | val MaxBacklog = 10000 |
144 | import context.dispatcher |
147 | override def preStart() { |
158 | storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage" ))) |
161 | _ ! UseStorage(storage) |
164 | storage.get ! Get(key) |
167 | def receive = LoggingReceive { |
168 | case Entry(k, v) if k == key && counter == None = > |
170 | val c = context.actorOf(Props(classOf[Counter], key, v)) |
173 | c ! UseStorage(storage) |
175 | for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo) |
176 | backlog = IndexedSeq.empty |
177 | case msg @ Increment(n) = > forwardOrPlaceInBacklog(msg) |
179 | case msg @ GetCurrentCount = > forwardOrPlaceInBacklog(msg) |
180 | case Terminated(actorRef) if Some(actorRef) == storage = > |
189 | context.system.scheduler.scheduleOnce( 10 seconds, self, Reconnect) |
195 | def forwardOrPlaceInBacklog(msg : Any) { |
200 | case Some(c) = > c forward msg |
202 | if (backlog.size > = MaxBacklog) |
203 | throw new ServiceUnavailable( |
204 | "CounterService not available, lack of initial value" ) |
205 | backlog : + = (sender() -> msg) |
212 | final case class UseStorage(storage : Option[ActorRef]) |
221 | class Counter(key : String, initialValue : Long) extends Actor { |
224 | import CounterService. _ |
227 | var count = initialValue |
228 | var storage : Option[ActorRef] = None |
230 | def receive = LoggingReceive { |
231 | case UseStorage(s) = > |
237 | case GetCurrentCount = > |
238 | sender() ! CurrentCount(key, count) |
245 | _ ! Store(Entry(key, count)) |
252 | import Storage.StorageException |
254 | private var db = Map[String, Long]() |
256 | @ throws(classOf[StorageException]) |
257 | def save(key : String, value : Long) : Unit = synchronized { |
258 | if ( 11 < = value && value < = 14 ) |
259 | throw new StorageException( "Simulated store failure " + value) |
263 | @ throws(classOf[StorageException]) |
264 | def load(key : String) : Option[Long] = synchronized { |
271 | final case class Store(entry : Entry) |
273 | final case class Get(key : String) |
275 | final case class Entry(key : String, value : Long) |
277 | class StorageException(msg : String) extends RuntimeException(msg) |
286 | class Storage extends Actor { |
292 | def receive = LoggingReceive { |
293 | case Store(Entry(key, count)) = > db.save(key, count) |
294 | case Get(key) = > sender() ! Entry(key, db.load(key).getOrElse( 0 L)) |
这个例子定义了五个Actor,分别是Worker, Listener, CounterService ,Counter 和 Storage,下图给出了系统正常运行时的流程(无错误发生的情况):

其中Worker是CounterService的父Actor(管理员),CounterService是Counter和Storage的父Actor(管理员)图中浅红色,白色代表引用,其中Worker引用了Listener,Listener也引用了Worker,它们之间不存在父子关系,同样Counter也引用了Storage,但Counter不是Storage的管理员。
正常流程如下:
步骤 | 描述 |
1 | progress Listener 通知Worker开始工作. |
2 | Worker通过定时发送Do消息给自己来完成工作 |
3,4,5 | Worker接受到Do消息时,通知其子Actor CounterService 三次递增计数器, CounterService 将Increment消息转发给Counter,它将递增计数器变量然后把当前值发送给Storeage保存 |
6,7 | Workier询问CounterService 当前计数器的值,然后通过管道把结果传给Listener |
下图给出系统出错的情况,例子中Worker和CounterService作为管理员分别定义了两个管理策略,Worker在收到CounterService 的ServiceUnaviable上终止CounterService的运行,而CounterService在收到StorageException时重启Storage。

出错时的流程
步骤 | 描述 |
1 | Storage抛出StorageException异常 |
2 | Storage的管理员CounterService根据策略在接受到StorageException异常后重启Storage |
3,4,5,6 | Storage继续出错并重启 |
7 | 如果在5秒钟之内Storage出错三次并重启,其管理员(CounterService)就终止Storage运行 |
8 | CounterService 同时监听Storage的Terminated消息,它在Storeage终止后接受到Terminated消息 |
9,10,11 | 并且通知Counter 暂时没有Storage |
12 | CounterService 延时一段时间给自己发生Reconnect消息 |
13,14 | 当它收到Reconnect消息时,重新创建一个Storage |
15,16 | 然后通知Counter使用新的Storage |
这里给出运行的一个日志供参考。
Akka 编程(20):容错处理(一)
标签:style http color os io 使用 ar for 数据
原文地址:http://blog.csdn.net/mapdigit/article/details/38977153