标签:style http color os io 使用 ar for 数据
我们在前面介绍Actor系统时说过每个Actor都是其子Actor的管理员,并且每个Actor定义了发生错误时的管理策略,策略一旦定义好,之后不能修改,就像是Actor系统不可分割的一部分。
实用错误处理
首先我们来看一个例子来显示一种处理数据存储错误的情况,这是现实中一个应用可能出现的典型错误。当然实际的应用可能针对数据源不存在时有不同的处理,这里我们使用重新连接的处理方法。
下面是例子的源码,比较长,需要仔细阅读,最好是实际运行,参考日志来理解:
2 | import akka.actor.SupervisorStrategy._ |
3 | import scala.concurrent.duration._ |
4 | import akka.util.Timeout |
5 | import akka.event.LoggingReceive |
6 | import akka.pattern.{ask, pipe} |
7 | import com.typesafe.config.ConfigFactory |
12 | object FaultHandlingDocSample extends App { |
16 | val config = ConfigFactory.parseString( """ |
17 | akka.loglevel = "DEBUG" |
24 | val system = ActorSystem("FaultToleranceSample", config) |
25 | val worker = system.actorOf(Props[Worker], name = "worker") |
26 | val listener = system.actorOf(Props[Listener], name = "listener") |
30 | worker.tell(Start, sender = listener) |
37 | class Listener extends Actor with ActorLogging { |
42 | context.setReceiveTimeout(15 seconds) |
45 | case Progress(percent) => |
46 | log.info("Current progress: {} %", percent) |
47 | if (percent >= 100.0) { |
48 | log.info("That’s all, shutting down") |
49 | context.system.shutdown() |
51 | case ReceiveTimeout => |
53 | log.error("Shutting down due to unavailable service") |
54 | context.system.shutdown() |
64 | final case class Progress(percent: Double) |
73 | class Worker extends Actor with ActorLogging { |
76 | import CounterService._ |
78 | implicit val askTimeout = Timeout(5 seconds) |
80 | override val supervisorStrategy = OneForOneStrategy() { |
81 | case _: CounterService.ServiceUnavailable => Stop |
85 | var progressListener: Option[ActorRef] = None |
86 | val counterService = context.actorOf(Props[CounterService], name ="counter") |
89 | import context.dispatcher |
92 | def receive = LoggingReceive { |
93 | case Start if progressListener.isEmpty => |
94 | progressListener = Some(sender()) |
95 | context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do) |
97 | counterService ! Increment(1) |
98 | counterService ! Increment(1) |
99 | counterService ! Increment(1) |
101 | counterService ? GetCurrentCount map { |
102 | case CurrentCount(_, count) => Progress(100.0 * count / totalCount) |
103 | } pipeTo progressListener.get |
107 | object CounterService { |
109 | final case class Increment(n: Int) |
111 | case object GetCurrentCount |
113 | final case class CurrentCount(key: String, count: Long) |
115 | class ServiceUnavailable(msg: String) extends RuntimeException(msg) |
117 | private case object Reconnect |
126 | class CounterService extends Actor { |
128 | import CounterService._ |
134 | override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, |
135 | withinTimeRange = 5 seconds) { |
136 | case _: Storage.StorageException => Restart |
138 | val key = self.path.name |
139 | var storage: Option[ActorRef] = None |
140 | var counter: Option[ActorRef] = None |
141 | var backlog = IndexedSeq.empty[(ActorRef, Any)] |
142 | val MaxBacklog = 10000 |
144 | import context.dispatcher |
147 | override def preStart() { |
158 | storage = Some(context.watch(context.actorOf(Props[Storage], name ="storage"))) |
161 | _ ! UseStorage(storage) |
164 | storage.get ! Get(key) |
167 | def receive = LoggingReceive { |
168 | case Entry(k, v) if k == key && counter == None => |
170 | val c = context.actorOf(Props(classOf[Counter], key, v)) |
173 | c ! UseStorage(storage) |
175 | for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo) |
176 | backlog = IndexedSeq.empty |
177 | case msg@Increment(n) => forwardOrPlaceInBacklog(msg) |
179 | case msg@GetCurrentCount => forwardOrPlaceInBacklog(msg) |
180 | case Terminated(actorRef) if Some(actorRef) == storage => |
189 | context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect) |
195 | def forwardOrPlaceInBacklog(msg: Any) { |
200 | case Some(c) => c forward msg |
202 | if (backlog.size >= MaxBacklog) |
203 | throw new ServiceUnavailable( |
204 | "CounterService not available, lack of initial value") |
205 | backlog :+= (sender() -> msg) |
212 | final case class UseStorage(storage: Option[ActorRef]) |
221 | class Counter(key: String, initialValue: Long) extends Actor { |
224 | import CounterService._ |
227 | var count = initialValue |
228 | var storage: Option[ActorRef] = None |
230 | def receive = LoggingReceive { |
231 | case UseStorage(s) => |
237 | case GetCurrentCount => |
238 | sender() ! CurrentCount(key, count) |
245 | _ ! Store(Entry(key, count)) |
252 | import Storage.StorageException |
254 | private var db = Map[String, Long]() |
256 | @throws(classOf[StorageException]) |
257 | def save(key: String, value: Long): Unit = synchronized { |
258 | if (11 <= value && value <= 14) |
259 | throw new StorageException("Simulated store failure " + value) |
263 | @throws(classOf[StorageException]) |
264 | def load(key: String): Option[Long] = synchronized { |
271 | final case class Store(entry: Entry) |
273 | final case class Get(key: String) |
275 | final case class Entry(key: String, value: Long) |
277 | class StorageException(msg: String) extends RuntimeException(msg) |
286 | class Storage extends Actor { |
292 | def receive = LoggingReceive { |
293 | case Store(Entry(key, count)) => db.save(key, count) |
294 | case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L)) |
这个例子定义了五个Actor,分别是Worker, Listener, CounterService ,Counter 和 Storage,下图给出了系统正常运行时的流程(无错误发生的情况):

其中Worker是CounterService的父Actor(管理员),CounterService是Counter和Storage的父Actor(管理员)图中浅红色,白色代表引用,其中Worker引用了Listener,Listener也引用了Worker,它们之间不存在父子关系,同样Counter也引用了Storage,但Counter不是Storage的管理员。
正常流程如下:
| 步骤 | 描述 |
| 1 | progress Listener 通知Worker开始工作. |
| 2 | Worker通过定时发送Do消息给自己来完成工作 |
| 3,4,5 | Worker接受到Do消息时,通知其子Actor CounterService 三次递增计数器, CounterService 将Increment消息转发给Counter,它将递增计数器变量然后把当前值发送给Storeage保存 |
| 6,7 | Workier询问CounterService 当前计数器的值,然后通过管道把结果传给Listener |
下图给出系统出错的情况,例子中Worker和CounterService作为管理员分别定义了两个管理策略,Worker在收到CounterService 的ServiceUnaviable上终止CounterService的运行,而CounterService在收到StorageException时重启Storage。

出错时的流程
| 步骤 | 描述 |
| 1 | Storage抛出StorageException异常 |
| 2 | Storage的管理员CounterService根据策略在接受到StorageException异常后重启Storage |
| 3,4,5,6 | Storage继续出错并重启 |
| 7 | 如果在5秒钟之内Storage出错三次并重启,其管理员(CounterService)就终止Storage运行 |
| 8 | CounterService 同时监听Storage的Terminated消息,它在Storeage终止后接受到Terminated消息 |
| 9,10,11 | 并且通知Counter 暂时没有Storage |
| 12 | CounterService 延时一段时间给自己发生Reconnect消息 |
| 13,14 | 当它收到Reconnect消息时,重新创建一个Storage |
| 15,16 | 然后通知Counter使用新的Storage |
这里给出运行的一个日志供参考。
Akka 编程(20):容错处理(一)
标签:style http color os io 使用 ar for 数据
原文地址:http://blog.csdn.net/mapdigit/article/details/38977153