标签:
角色模型对编写并发、分布式系统进行了高度抽象。它减轻了开发者必须对互斥锁与线程管理的负担,更容易编写出正确的并发与并行系统。早在1973 年 Carl Hewitt 发表的论文中定义了角色,但一直流行于Erlang 语言中,随后被爱立信公司应用于建立高并发、可靠通信系统,取得了巨大成功。
Akka 框架里面角色的API 跟Scala 框架里面角色相似,后者一些语法曾经模仿Erlang语言。
注意:由于Akka强迫父级监管者监督每一个角色和(潜在的子级)监管者,建议你熟悉角色系统、监管、监控,这将可能帮助你阅读角色参考、路径和地址。
在Java里面,角色是通过继承UntypedActor 类及实现onReceive方法来实现的.这个方法将message作为参数。
这里有个例子:
01 | import akka.actor.UntypedActor; |
02 | import akka.event.Logging; |
03 | import akka.event.LoggingAdapter; |
04 |
05 | public class MyUntypedActor extends UntypedActor { |
06 | LoggingAdapter log = Logging.getLogger(getContext().system(), this); |
07 |
08 | public void onReceive(Object message) throws Exception { |
09 | if (message instanceof String) { |
10 | log.info("Received String message: {}", message); |
11 | getSender().tell(message, getSelf()); |
12 | } else |
13 | unhandled(message); |
14 | } |
15 | } |
Props 是一个配置类,它的作用是对创建角色确认选项。把它作为不可变的、因此可自由共享规则对创建一个角色包括相关部署信息(例如:使用调度,详见下文)。下面是如何创建一个Props 实例的一些例子:
1 | import akka.actor.Props; |
2 | import akka.japi.Creator; |
1 | static class MyActorC implements Creator<MyActor> { |
2 | @Override public MyActor create() { |
3 | return new MyActor("..."); |
4 | } |
5 | } |
6 |
7 | Props props1 = Props.create(MyUntypedActor.class); |
8 | Props props2 = Props.create(MyActor.class, "..."); |
9 | Props props3 = Props.create(new MyActorC()); |
第二行显示如何传递构造参数给Actor去创建。在构建Props对象时,存在匹配的构造是被验证的,如果发现不存在或者存在多个匹配构造,会导致一个IllegalArgumentEception。
第三行验证Creator使用。用来验证Props构造的Creator必须是静态。类型参数是用来判断生成角色类的,如果充分擦除,将落回到Actor类,一个参数化工厂例子,可以是:
1 | static class ParametricCreator<T extends MyActor> implements Creator<T> { |
2 | @Override public T create() { |
3 | // ... fabricate actor here |
4 | } |
5 | } |
注意:
由于邮箱要求——如使用双端队列为基础的邮箱使用的隐藏角色——被拾起,在创建之前,角色类型需要已知的,这是Creator类型参数允许的。因此对你用到角色一定尽可能使用特定类型。
这是个好的主意在UntypedActor类里面提供静态工厂方法,该方法帮助创建尽可能接近角色定义的合适Props 类。这也允许使用基于Creator方法,该方法静态验证所使用的构造函数确实存在,而不是只在运行时检查依赖。
01 | public class DemoActor extends UntypedActor { |
02 |
03 | /** |
04 | * Create Props for an actor of this type. |
05 | * @param magicNumber The magic number to be passed to this actor’s constructor. |
06 | * @return a Props for creating this actor, which can then be further configured |
07 | * (e.g. calling `.withDispatcher()` on it) |
08 | */ |
09 | public static Props props(final int magicNumber) { |
10 | return Props.create(new Creator<DemoActor>() { |
11 | private static final long serialVersionUID = 1L; |
12 |
13 | @Override |
14 | public DemoActor create() throws Exception { |
15 | return new DemoActor(magicNumber); |
16 | } |
17 | }); |
18 | } |
19 |
20 | final int magicNumber; |
21 |
22 | public DemoActor(int magicNumber) { |
23 | this.magicNumber = magicNumber; |
24 | } |
25 |
26 | @Override |
27 | public void onReceive(Object msg) { |
28 | // some behavior here |
29 | } |
30 |
31 | } |
32 |
33 | system.actorOf(DemoActor.props(42), "demo"); |
角色通过传入Props实例进入actorOf 工厂方法,该工厂方法在ActorSystem 和ActorContext类中提供使用。
1 | import akka.actor.ActorRef; |
2 | import akka.actor.ActorSystem; |
1 | // ActorSystem is a heavy object: create only one per application |
2 | final ActorSystem system = ActorSystem.create("MySystem"); |
3 | final ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class), |
4 | "myactor"); |
使用ActorSystem 将创建顶级角色,由角色系统提供守护的角色监管,同时使用一个角色的上下文将创建一个子角色。
1 | class A extends UntypedActor { |
2 | final ActorRef child = |
3 | getContext().actorOf(Props.create(MyUntypedActor.class), "myChild"); |
4 | // plus some behavior ... |
5 | } |
建议创建一个包含子类、超子类等等的层次结构,使得它适合具有逻辑性故障处理应用程序结构,详见Actor Systems。
actorOf 方法调用返回ActorRef实例。这是对角色实例处理,并与它进行交互的唯一途径。该ActorRef是不可变的,并有一个与它代表的一对一关系角色。该ActorRef是可序列化的和具备网络意识的。这意味着,你可以把它进行序列化,将它通过网络发送,在远程主机上使用它,它仍然代表着在原始的节点上相同的角色,横跨网络。
名称参数是可选的,但是你应该给你的角色起个更好名称,因为这是用在日志消息里面,并确定角色。该名称不能为空或以$开头,但它可能包含URL编码的字符(例如,%20代表空格)。如果给定的名称已被相同父类中的其他子类使用,那将抛出InvalidActorNameException异常。
角色是自动异步启动当被创建时候。
如果你的未类型化的角色有一个携带参数的构造函数,然后那些需要Prosp的一部分,以及,如上所述。但在某些情况下,必须使用一个工厂方法,例如当实际构造函数参数由一个依赖注入框架决定时。
1 | import akka.actor.Actor; |
2 | import akka.actor.IndirectActorProducer; |
01 | class DependencyInjector implements IndirectActorProducer { |
02 | final Object applicationContext; |
03 | final String beanName; |
04 |
05 | public DependencyInjector(Object applicationContext, String beanName) { |
06 | this.applicationContext = applicationContext; |
07 | this.beanName = beanName; |
08 | } |
09 |
10 | @Override |
11 | public Class<? extends Actor> actorClass() { |
12 | return MyActor.class; |
13 | } |
14 |
15 | @Override |
16 | public MyActor produce() { |
17 | MyActor result; |
18 | // obtain fresh Actor instance from DI framework ... |
19 | return result; |
20 | } |
21 | } |
22 |
23 | final ActorRef myActor = getContext().actorOf( |
24 | Props.create(DependencyInjector.class, applicationContext, "MyActor"), |
25 | "myactor3"); |
警告:
你可能有时会倾向于提供一个IndirectActorProducer它总是返回相同的实例,例如:通过使用一个静态字段。这是不支持的,因为它违背了一个角色重启含义,这是这里所描述的含义:什么重新启动方式。当使用一个依赖注入框架时,角色Beans 一定不能是单例模式范围。
依赖注入和依赖注入框架集成技术更深入地介绍了使用Akka与依赖注入指导方针和在类型安全的活化剂方面的Akka Java Spring 指导。
当写在角色外面的代码,应与角色进行沟通,在ask模式可以是一个解决方案(见下文),但有两个事情不能做:接收多个回复(例如:通过订阅的ActorRef到通知服务)和监控其他角色的生命周期。为了这些目的这里有个Inbox 类:
1 | final Inbox inbox = Inbox.create(system); |
2 | inbox.send(target, "hello"); |
3 | assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)).equals("world"); |
send方法包装一个标准的tell和提供一个内部的角色引用作为发送者。在最后一行将允许该回复被接收。监控一个角色同时也十分简单。
1 | final Inbox inbox = Inbox.create(system); |
2 | inbox.watch(target); |
3 | target.tell(PoisonPill.getInstance(), ActorRef.noSender()); |
4 | assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)) instanceof Terminated; |
UntypedActor 类仅仅定义一个抽象方法,就是上面提到onReceive(Object message)方法,该方法实现了角色行为。如果当前角色行为不匹配一个接收信息,建议调用unhandled 方法,该方法默认将发出一个new akka.actor.UnhandledMessage(message, sender, recipient)在系统角色事件流中(设置配置项akka.actor.debug.unhandled 到on 让它们转化为实际调试信息)。另外,它提供:
剩余的可见的方法是用户可重写的生命周期钩子,将在以下描述:
01 | public void preStart() { |
02 | } |
03 |
04 | public void preRestart(Throwable reason, scala.Option<Object> message) { |
05 | for (ActorRef each : getContext().getChildren()) { |
06 | getContext().unwatch(each); |
07 | getContext().stop(each); |
08 | } |
09 | postStop(); |
10 | } |
11 |
12 | public void postRestart(Throwable reason) { |
13 | preStart(); |
14 | } |
15 |
16 | public void postStop() { |
17 | } |
上面显示实现是默认由UntypedActor 类提供。
在角色系统中的路径代表一个“地方”,这可能被一个存活着的的角色占用着。最初,(除了系统初始化角色)的路径是空的。当actorOf()被调用时,指定一个由通过Props 描述给定的路径角色的化身。一个角色化身由路径和一个UID确定。重新启动仅仅交换Props 定义的Actor 实例,但化身与UID依然是相同的。
当该角色停止时,化身的生命周期也相应结束了。在这一刻时间上相对应的生命周期事件也将被调用和监管角色也被通知终止结束。化身被停止之后,路径也可以重复被通过actorOf() 方法创建的角色使用。在这种情况下,新的化身的名称跟与前一个将是相同的而是UIDs将会有所不同。
一个ActorRef 总是代表一个化身(路径和UID)而不只是一个给定的路径。因此,如果一个角色停止,一个新的具有相同名称创建的旧化身的ActorRef不会指向新的。
在另一方面ActorSelection 指向该路径(或多个路径在使用通配符时),并且是完全不知道其化身当前占用着它。 由于这个原因导致ActorSelection 不能被监视到。通过发送识别信息到将被回复包含正确地引用(见通过角色选择集识别角色)的ActorIdentity 的ActorSelection 来解决当前化身ActorRef 存在该路径之下。这也可以用ActorSelection 类的resolveOne方法来解决,这将返回一个匹配ActorRef 的Future 。
当另一个角色终止时,为了通知被通知(即永久性地停止,而不是暂时的失败和重新启动),一个角色可以自己注册为接收在终止上层的其他角色发送的终止消息,其他演员出动(请参阅停止演员)。这项服务是由角色系统的临终看护组件提供。
注册一个监视器是很容易的(见第四行,剩下的就是用于展示整个功能):
1 | import akka.actor.Terminated; |
01 | public class WatchActor extends UntypedActor { |
02 | final ActorRef child = this.getContext().actorOf(Props.empty(), "child"); |
03 | { |
04 | this.getContext().watch(child); // <-- the only call needed for registration |
05 | } |
06 | ActorRef lastSender = getContext().system().deadLetters(); |
07 |
08 | @Override |
09 | public void onReceive(Object message) { |
10 | if (message.equals("kill")) { |
11 | getContext().stop(child); |
12 | lastSender = getSender(); |
13 | } else if (message instanceof Terminated) { |
14 | final Terminated t = (Terminated) message; |
15 | if (t.getActor() == child) { |
16 | lastSender.tell("finished", getSelf()); |
17 | } |
18 | } else { |
19 | unhandled(message); |
20 | } |
21 | } |
22 | } |
但是应当注意的是,产生的终止消息独立于注册和终止发生的顺序。特别是,监控角色将接收一个终止信息即使被监控角色已经被终止在注册时候。
注册多次并不必然导致对多个消息被产生,但不保证只有一个对应这样的消息被接收:如果被监控角色终止已经发生和发送的消息排队等候着,在另一个注册完成之前,该消息已经处理完,然后第二消息将会排队,是因为已经结束角色的监控的注册导致终止信息立刻产生。
使用getContext().unwatch(target)方法从监控另一个角色生命活力撤销下来也是有可能的。这个工作即使已终止消息已经排队于邮箱中,在调用unwatch方法后对于那个角色将没有终止消息被处理。
在正确启动角色之后,preStart方法被调用。
1 | @Override |
2 | public void preStart() { |
3 | child = getContext().actorOf(Props.empty()); |
4 | } |
第一次创建角色时,该方法被调用。在重新启动期间,它被postRestart的默认实现调用,这意味着通过重写该方法,你可以选择此方法中初始化代码是否被调用,对这个角色或每次重启仅只调用一次。在一个角色类的实例创建时,角色的构造函数的一部分的初始化代码将每次都被调用,这发生在每次重启时。
所有角色被监督着,即用故障处理策略链接到另一个角色。当处理一个消息是,抛出一个异常的情况下,演员可能重新启动(见监管与监控)抛出一个异常。这重启涉及上述提到钩子:
1. 旧角色是通过调用preRestart方法进行通知的,这伴随着造成重启的异常与绑定该异常的消息;处理一个消息没有造成这个重启发生,则后者可能也没有发生,例如,当一个监管者不捕获该异常,则由其监管者重启又或者如果由于一个同类的失败,一个角色将被重新启动。如果消息是可用的,那么该消息的发件人也可以通过正常方式访问的(即通过调用getSender())。
这个方法用在这些地方时最好的,例如:清除,准备交到新的角色实例等等。默认它停止所有子实例和调用postStop方法。
2. 来自actorOf方法调用的初始化工厂用来产生新的实例。
3. 新角色的postRestart方法被调用时这引起了重启异常。默认情况下,preStart 是被调用,就如同在正常启动的情况下。
一个角色重启仅替换实际角色的对象;邮箱中的内容是不受重启影响,所以消息的处理将在postRestart钩子返回后恢复。引发异常的消息将不会再接收。当重启时候,发送到角色的任何消息将像平常一样排队到它的邮箱。
注意:要知道,相关用户失败消息的顺序是不确定的。特别是,一个父类可能会重新启动其子类之前它已经处理了在失败之前子类发送故障的的最后消息。见讨论:消息顺序的详细信息。
终止一个角色之后,其postStop钩子被调用时,其可能用于例如从其他服务注销这个角色。在这个角色的消息队列已禁用之后,这个钩子仍保证运行,即送到已终止角色的信息将被重定向到ActorSystem的deadLetters。
作为角色的引用,路径和地址描述,每个角色都有一个唯一的逻辑路径,这是由以下的子类到父类直到达到角色系统的根的角色的链得到的,它有一个物理路径,如果监管链包括任何远程监管者,这可能会有所不同。这些路径是由系统使用来查找角色,如当接收到一个远程的消息和收件人进行搜索,但他们也有更直接用法:角色可以查找其他角色通过指定绝对或相对路径,逻辑或物理,并接收返回的结果的ActorSelection:
1 | // will look up this absolute path |
2 | getContext().actorSelection("/user/serviceA/actor"); |
3 | // will look up sibling beneath same supervisor |
4 | getContext().actorSelection("../joe"); |
其中指定的路径被解释为一个java.net.URI, 它以 / 分隔成路径段. 如果路径以 /开始, 表示一个绝对路径,从根监管者 ( “/user”的父亲)开始查找; 否则是从当前角色开始。如果某一个路径段为 .., 会找到当前所遍历到的角色的上一级, 否则则会向下一级寻找具有该名字的子角色。 必须注意的是角色路径中的.. 总是表示逻辑结构,也就是其监管者。
一个角色选择集的路径元素可以包含通配符,允许消息额广播到该选择集:
1 | // will look all children to serviceB with names starting with worker |
2 | getContext().actorSelection("/user/serviceB/worker*"); |
3 | // will look up all siblings beneath same supervisor |
4 | getContext().actorSelection("../*"); |
信息可以通过ActorSelection发送和当传送的每个消息时,查找ActorSelection的路径。如果选择集不匹配任何角色的消息将被丢弃。
为了获得一个ActorSelection的ActorRef,你需要发送一个消息到选择集和使用来自橘色的回复的getSender引用。有一个内置的识别信息,即所有角色都理解并自动回复一个包含ActorRef的ActorIdentity消息。此消息由该角色特殊处理,在这个意义上说是穿越的,如果一个具体的名称查找失败(即非通配符路径元素不符合一个存在的角色)然后产生一个消极结果。请注意,这并不意味着传递的答复是有保障的,但它仍然是一个正常的消息。
1 | import akka.actor.ActorIdentity; |
2 | import akka.actor.ActorSelection; |
3 | import akka.actor.Identify; |
01 | public class Follower extends UntypedActor { |
02 | final String identifyId = "1"; |
03 | { |
04 | ActorSelection selection = |
05 | getContext().actorSelection("/user/another"); |
06 | selection.tell(new Identify(identifyId), getSelf()); |
07 | } |
08 | ActorRef another; |
09 |
10 | final ActorRef probe; |
11 | public Follower(ActorRef probe) { |
12 | this.probe = probe; |
13 | } |
14 |
15 | @Override |
16 | public void onReceive(Object message) { |
17 | if (message instanceof ActorIdentity) { |
18 | ActorIdentity identity = (ActorIdentity) message; |
19 | if (identity.correlationId().equals(identifyId)) { |
20 | ActorRef ref = identity.getRef(); |
21 | if (ref == null) |
22 | getContext().stop(getSelf()); |
23 | else { |
24 | another = ref; |
25 | getContext().watch(another); |
26 | probe.tell(ref, getSelf()); |
27 | } |
28 | } |
29 | } else if (message instanceof Terminated) { |
30 | final Terminated t = (Terminated) message; |
31 | if (t.getActor().equals(another)) { |
32 | getContext().stop(getSelf()); |
33 | } |
34 | } else { |
35 | unhandled(message); |
36 | } |
37 | } |
38 | } |
您也可以取得一个ActorSelection的ActorRef通过ActorSelection的resolveOne方法。它返回匹配ActorRef的Future,如果这样一个角色存在。如果没有这样的角色存在或鉴定所提供的超时时间内没有完成,它将已失败告终akka.actor.ActorNotFound。
远程角色地址也可以查找,如果远程被启用:
1 | getContext().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB"); |
一个关于actor查找的示例见 远程查找.
注意:在支持actorSelection,actorFor是被废弃,因为用actorFor获得的角色引用对本地与远程角色表现不同。在一个本地角色引用的情况下,查找之前命名的演员需要存在,否则所获取的引用将是一个EmptyLocalActorRef。即使在获取角色引用之后,一个真实路径的角色才被创建,这时也是可以获取的。对于远程角色引用通过actorFor来获取的行为不同的,发送信息到该引用上将在覆盖下通过在远程系统给每一个消息发送的路径查找角色。
重要信息:消息可以是任何类型的对象,但必须是不可变的。阿Aka不能强制不变性,所以这必须按照约定。 这里是一个不变的消息的示例:
01 | public class ImmutableMessage { |
02 | private final int sequenceNumber; |
03 | private final List<String> values; |
04 |
05 | public ImmutableMessage(int sequenceNumber, List<String> values) { |
06 | this.sequenceNumber = sequenceNumber; |
07 | this.values = Collections.unmodifiableList(new ArrayList<String>(values)); |
08 | } |
09 |
10 | public int getSequenceNumber() { |
11 | return sequenceNumber; |
12 | } |
13 |
14 | public List<String> getValues() { |
15 | return values; |
16 | } |
17 | } |
向actor发送消息是使用下列方法之一。
每一个消息发送者分别保证自己的消息的次序.
注意:使用ask会造成性能影响,因为当超时是,一些事情需要保持追踪。这需要一些东西来将一个Promise连接进入ActorRef,并且需要通过远程连接可到达的。所以总是使用tell更偏向性能,除非必须才用ask.
在所有这些方法你可以传递自己的ActorRef。让它这样做,因为这将允许接收的角色才能够回复您的邮件,因为发件人引用随该信息一起发送的。
这是发送消息的推荐方式。 不会阻塞地等待消息。它拥有最好的并发性和可扩展性。
1 | // don’t forget to think about who is the sender (2nd argument) |
2 | target.tell(message, getSelf()); |
发送者引用是伴随着消息传递的,在接收角色可用范围内,当处理该消息时,通过getSender方法。在一个角色内部通常是getSelf,这应该为发送者,但也可能是这种情况,回复被路由到一些其他角色即该父类的第二个参数tell将是不同的一个。在角色外部,如果没有回复,第二个参数可以为null;如果在角色外部需要一个回复,你可以使用问答模式描,下面描述..
ask 模式既包含actor也包含future, 所以它是作为一种使用模式,而不是ActorRef的方法:
1 | import static akka.pattern.Patterns.ask; |
2 | import static akka.pattern.Patterns.pipe; |
3 | import scala.concurrent.Future; |
4 | import scala.concurrent.duration.Duration; |
5 | import akka.dispatch.Futures; |
6 | import akka.dispatch.Mapper; |
7 | import akka.util.Timeout; |
01 | final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS)); |
02 |
03 | final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>(); |
04 | futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout |
05 | futures.add(ask(actorB, "another request", t)); // using timeout from |
06 | // above |
07 |
08 | final Future<Iterable<Object>> aggregate = Futures.sequence(futures, |
09 | system.dispatcher()); |
10 |
11 | final Future<Result> transformed = aggregate.map( |
12 | new Mapper<Iterable<Object>, Result>() { |
13 | public Result apply(Iterable<Object> coll) { |
14 | final Iterator<Object> it = coll.iterator(); |
15 | final String x = (String) it.next(); |
16 | final String s = (String) it.next(); |
17 | return new Result(x, s); |
18 | } |
19 | }, system.dispatcher()); |
20 |
21 | pipe(transformed, system.dispatcher()).to(actorC); |
上面的例子展示了将 ask与 future上的 pipe 模式一起使用,因为这是一种非常常用的组合。 请注意上面所有的调用都是完全非阻塞和异步的: ask 产生 Future, 两个通过Futures.sequence和map方法组合成一个新的Future,然后用 pipe 在future上安装一个 onComplete-处理器来完成将收集到的 Result 发送到其它actor的动作。
使用 ask 将会像tell 一样发送消息给接收方, 接收方必须通过getSender().tell(reply, getSelf()) 发送回应来为返回的 Future填充数据。ask 操作包括创建一个内部actor来处理回应,必须为这个内部actor指定一个超时期限,过了超时期限内部actor将被销毁以防止内存泄露。详见下面:
注意:如果要以异常来填充future你需要发送一个 Failure 消息给发送方。这个操作不会在actor处理消息发生异常时自动完成。
1 | try { |
2 | String result = operation(); |
3 | getSender().tell(result, getSelf()); |
4 | } catch (Exception e) { |
5 | getSender().tell(new akka.actor.Status.Failure(e), getSelf()); |
6 | throw e; |
7 | } |
如果一个actor 没有完成future , 它会在超时时限到来时过期, 明确作为一个参数传给ask方法,以 AskTimeoutException来完成Future。
关于如何等待或查询一个future,更多信息请见Futures 。
Future的onComplete, onResult, 或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。
在使用future回调时,在角色内部你要小心避免关闭该角色的引用, 即不要在回调中调用该角色的方法或访问其可变状态。这会破坏角色的封装,会引用同步bugbug和race condition, 因为回调会与此角色一同被并发调度。 不幸的是目前还没有一种编译时的方法能够探测到这种非法访问。 参阅: 角色与共享可变状态
你可以将消息从一个角色转发给另一个。虽然经过了一个‘中转’,但最初的发送者地址/引用将保持不变。当实现功能类似路由器、负载均衡器、备份等的角色时会很有用。同时你需要传递你的上下文变量。
1 | target.forward(result, getContext()); |
当一个角色收到被传递到onReceive方法的消息,这是在需要被定义的UntypedActor基类的抽象方法。
下面是个例子:
01 | import akka.actor.UntypedActor; |
02 | import akka.event.Logging; |
03 | import akka.event.LoggingAdapter; |
04 |
05 | public class MyUntypedActor extends UntypedActor { |
06 | LoggingAdapter log = Logging.getLogger(getContext().system(), this); |
07 |
08 | public void onReceive(Object message) throws Exception { |
09 | if (message instanceof String) { |
10 | log.info("Received String message: {}", message); |
11 | getSender().tell(message, getSelf()); |
12 | } else |
13 | unhandled(message); |
14 | } |
15 | } |
除了使用IF-instanceof检查,还有一种方法是使用Apache Commons MethodUtils调用指定的参数类型相匹配的消息类型方法。
如果你需要一个用来发送回应消息的目标,可以使用 getSender, 它是一个角色引用。 你可以用 getSender().tell(replyMsg, getSelf())向这个引用发送回应消息。 你也可以将这个ActorRef保存起来将来再作回应。如果没有sender (不是从actor发送的消息或者没有future上下文) 那么 sender 缺省为 ‘死信’ 角色的引用。
1 | @Override |
2 | public void onReceive(Object msg) { |
3 | Object result = |
4 | // calculate result ... |
5 |
6 | // do not forget the second argument! |
7 | getSender().tell(result, getSelf()); |
8 | } |
在一个ReceiveTimeout消息发送触发之后,该UntypedActorContext setReceiveTimeout定义不活动超时时间。当指定时,接收功能应该能够处理一个akka.actor.ReceiveTimeout消息。 1毫秒为最小支持超时。
请注意,接受超时可能会触发和在另一条消息是入队后,该ReceiveTimeout消息将重排队;因此,它不能保证在接收到接收超时的一定有预先通过该方法所配置的空闲时段。
一旦设置,接收超时保持有效(即持续重复触发超过不活动时间后)。传递Duration.Undefined关掉此功能。
01 | import akka.actor.ActorRef; |
02 | import akka.actor.ReceiveTimeout; |
03 | import akka.actor.UntypedActor; |
04 | import scala.concurrent.duration.Duration; |
05 |
06 | public class MyReceiveTimeoutUntypedActor extends UntypedActor { |
07 |
08 | public MyReceiveTimeoutUntypedActor() { |
09 | // To set an initial delay |
10 | getContext().setReceiveTimeout(Duration.create("30 seconds")); |
11 | } |
12 |
13 | public void onReceive(Object message) { |
14 | if (message.equals("Hello")) { |
15 | // To set in a response to a message |
16 | getContext().setReceiveTimeout(Duration.create("1 second")); |
17 | } else if (message instanceof ReceiveTimeout) { |
18 | // To turn it off |
19 | getContext().setReceiveTimeout(Duration.Undefined()); |
20 | } else { |
21 | unhandled(message); |
22 | } |
23 | } |
24 | } |
通过调用ActorRefFactory 即 ActorContext 或 ActorSystem 的 stop 方法来终止一个角色, 通常 context 用来终止子角色,而system 用来终止顶级角色. 实际的终止操作是异步执行的, 即stop 可能在角色被终止之前返回。
如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem 的 死信, 但是这取决于邮箱的实现。
角色的终止分两步: 第一步角色将停止对邮箱的处理,向所有子角色发送终止命令,然后处理来自子角色的终止消息直到所有的子角色都完成终止, 最后终止自己 (调用 postStop, 销毁邮箱, 向 DeathWatch 发布 Terminated , 通知其监管者). 这个过程保证角色系统中的子树以一种有序的方式终止, 将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个角色没有响应 (即由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。
在 ActorSystem.shutdown被调用时, 系统根监管角色会被终止,以上的过程将保证整个系统的正确终止。
postStop hook 是在角色被完全终止以后调用的。这是为了清理资源:
1 | @Override |
2 | public void postStop() { |
3 | // clean up resources here ... |
4 | } |
注意:由于角色的终止是异步的, 你不能马上使用你刚刚终止的子角色的名字;这会导致 InvalidActorNameException. 你应该 watch 正在终止的 介绍而在最终到达的 Terminated消息的处理中创建它的替代者。
你也可以向角色发送 akka.actor.PoisonPill 消息, 这个消息处理完成后角色会被终止。 PoisonPill 与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。
像下面使用:
1 | myActor.tell(akka.actor.PoisonPill.getInstance(), sender); |
如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop:
1 | import static akka.pattern.Patterns.gracefulStop; |
2 | import scala.concurrent.Await; |
3 | import scala.concurrent.Future; |
4 | import scala.concurrent.duration.Duration; |
5 | import akka.pattern.AskTimeoutException; |
1 | try { |
2 | Future<Boolean> stopped = |
3 | gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN); |
4 | Await.result(stopped, Duration.create(6, TimeUnit.SECONDS)); |
5 | // the actor has been stopped |
6 | } catch (AskTimeoutException e) { |
7 | // the actor wasn‘t stopped within 5 seconds |
8 | } |
01 | public class Manager extends UntypedActor { |
02 |
03 | public static final String SHUTDOWN = "shutdown"; |
04 |
05 | ActorRef worker = getContext().watch(getContext().actorOf( |
06 | Props.create(Cruncher.class), "worker")); |
07 |
08 | public void onReceive(Object message) { |
09 | if (message.equals("job")) { |
10 | worker.tell("crunch", getSelf()); |
11 | } else if (message.equals(SHUTDOWN)) { |
12 | worker.tell(PoisonPill.getInstance(), getSelf()); |
13 | getContext().become(shuttingDown); |
14 | } |
15 | } |
16 |
17 | Procedure<Object> shuttingDown = new Procedure<Object>() { |
18 | @Override |
19 | public void apply(Object message) { |
20 | if (message.equals("job")) { |
21 | getSender().tell("service unavailable, shutting down", getSelf()); |
22 | } else if (message instanceof Terminated) { |
23 | getContext().stop(getSelf()); |
24 | } |
25 | } |
26 | }; |
27 | } |
当gracefulStop()成功返回时,角色的postStop()钩子将被执行:存在一个情况,happens-before 边缘在postStop()结尾和gracefulStop()返回之间。
在上面的例子中一个自定义的Manager.SHUTDOWN消息被发送到目标角色为了初始化正在终止角色的处理。您可以使用PoisonPill为这一点,但在阻止目标的角色之前,你拥有很少机会与其他角色进行交互。简单的清除任务可以在postStop中处理。
注意:请记住,一个角色终止和它的名字被注销是互相异步发生的独立事件。因此,在gracefulStop()后返回,它可能是你会发现名称仍然在使用。为了保证正确的注销,只能重复使用来自你控制监管者内与一个终止的消息的回应的名称,即不属于顶级的角色。
Akka支持在运行时对角色消息循环 (例如它的的实现)进行实时替换: 在角色中调用getContext.become 方法。 热替换的代码被存在一个栈中,可以被pushed(replacing 或 adding 在顶部)和popped。
注意:请注意角色被其监管者重启后将恢复其最初的行为。
热替换角色使用getContext().become:
1 | import akka.japi.Procedure; |
01 | public class HotSwapActor extends UntypedActor { |
02 |
03 | Procedure<Object> angry = new Procedure<Object>() { |
04 | @Override |
05 | public void apply(Object message) { |
06 | if (message.equals("bar")) { |
07 | getSender().tell("I am already angry?", getSelf()); |
08 | } else if (message.equals("foo")) { |
09 | getContext().become(happy); |
10 | } |
11 | } |
12 | }; |
13 |
14 | Procedure<Object> happy = new Procedure<Object>() { |
15 | @Override |
16 | public void apply(Object message) { |
17 | if (message.equals("bar")) { |
18 | getSender().tell("I am already happy :-)", getSelf()); |
19 | } else if (message.equals("foo")) { |
20 | getContext().become(angry); |
21 | } |
22 | } |
23 | }; |
24 |
25 | public void onReceive(Object message) { |
26 | if (message.equals("bar")) { |
27 | getContext().become(angry); |
28 | } else if (message.equals("foo")) { |
29 | getContext().become(happy); |
30 | } else { |
31 | unhandled(message); |
32 | } |
33 | } |
34 | } |
become 方法还有很多其它的用处,一个特别好的例子是用它来实现一个有限状态机(FSM)。这将代替当前行为(即行为栈顶部),这意味着你不用使用unbecome,而是下一个行为将明确被安装。
使用become另一个方式:不代替而是添加到行为栈顶部。这种情况是必须要保证在长期运行中“pop”操作(即unbecome)数目匹配“push”数目,否则这个数目将导致内存泄露(这就是该行为不是默认原因)。
01 | public class UntypedActorSwapper { |
02 |
03 | public static class Swap { |
04 | public static Swap SWAP = new Swap(); |
05 |
06 | private Swap() { |
07 | } |
08 | } |
09 |
10 | public static class Swapper extends UntypedActor { |
11 | LoggingAdapter log = Logging.getLogger(getContext().system(), this); |
12 |
13 | public void onReceive(Object message) { |
14 | if (message == SWAP) { |
15 | log.info("Hi"); |
16 | getContext().become(new Procedure<Object>() { |
17 | @Override |
18 | public void apply(Object message) { |
19 | log.info("Ho"); |
20 | getContext().unbecome(); // resets the latest ‘become‘ |
21 | } |
22 | }, false); // this signals stacking of the new behavior |
23 | } else { |
24 | unhandled(message); |
25 | } |
26 | } |
27 | } |
28 |
29 | public static void main(String... args) { |
30 | ActorSystem system = ActorSystem.create("MySystem"); |
31 | ActorRef swap = system.actorOf(Props.create(Swapper.class)); |
32 | swap.tell(SWAP, ActorRef.noSender()); // logs Hi |
33 | swap.tell(SWAP, ActorRef.noSender()); // logs Ho |
34 | swap.tell(SWAP, ActorRef.noSender()); // logs Hi |
35 | swap.tell(SWAP, ActorRef.noSender()); // logs Ho |
36 | swap.tell(SWAP, ActorRef.noSender()); // logs Hi |
37 | swap.tell(SWAP, ActorRef.noSender()); // logs Ho |
38 | } |
39 |
40 | } |
该UntypedActorWithStash类使一个角色临时藏匿不能或不应该使用角色的当前行为处理的消息。在改变角色的消息处理函数,即调用getContext().become()或getContext().unbecome(),所有藏匿的消息可以“unstashed”,因此前面加上他们角色的邮箱。这样一来,藏消息可以在他们已经收到原先相同的顺序进行处理。扩展UntypedActorWithStash角色将自动获得一个双端队列为基础的邮箱。
注意:抽象类UntypedActorWithStash实现标记接口RequiresMessageQueue这要求系统能够为该角色自动选择基于双端队列的邮箱实现。如果你想更多的控制权邮箱,请见邮箱文档:邮箱
。
这里是UntypedActorWithStash类中操作的示例:
1 | import akka.actor.UntypedActorWithStash; |
01 | public class ActorWithProtocol extends UntypedActorWithStash { |
02 | public void onReceive(Object msg) { |
03 | if (msg.equals("open")) { |
04 | unstashAll(); |
05 | getContext().become(new Procedure<Object>() { |
06 | public void apply(Object msg) throws Exception { |
07 | if (msg.equals("write")) { |
08 | // do writing... |
09 | } else if (msg.equals("close")) { |
10 | unstashAll(); |
11 | getContext().unbecome(); |
12 | } else { |
13 | stash(); |
14 | } |
15 | } |
16 | }, false); // add behavior on top instead of replacing |
17 | } else { |
18 | stash(); |
19 | } |
20 | } |
21 | } |
调用stash()将当前的消息(即角色最后收到的消息)到角色的藏匿处。当在处理默认情况下在角色的消息处理函数来隐藏那些没有被其他案件处理的情况时,这是典型调用。同一消息的两次是非法藏匿;这样做会导致一个IllegalStateException被抛出。藏匿也可以此情况下,调用stath()能会导致容量违规,这导致StashOverflowException。藏匿的容量可以使用邮箱的配置的藏匿容量设置(一个Int类型)进行配置。
调用unstashAll()从藏匿到角色的邮箱进入队列消息,直到信箱(如果有的话)已经达到的能力(请注意,从藏匿处的消息前加上邮箱)。如果一个有界的邮箱溢出,一个MessageQueueAppendFailedException被抛出。在调用unstashAll()后,藏匿保证为空。
藏匿由scala.collection.immutable.Vector支持。这样一来,即使是非常大量的消息在不会对性能产生重大影响下被藏匿。
注意,藏匿是短暂的角色状态的一部分,该邮箱不像。因此,应该像具有相同属性的角色状态的其他部分进行管理。该preRestart的UntypedActorWithStash的实现将调用unstashAll(),它通常是所期望的行为。
注意:如果要强制执行,你的角色只能用一个无上限stash进行工作,那么你应该使用UntypedActorWithUnboundedStash类代替。
您可以通过发送一个
kill消息杀一个角色。这将导致角色抛出一个
ActorKilledException,引发故障。角色将暂停运作,其监管这将被要求如何处理失败,这可能意味着恢复的角色,重新启动,或完全终止它。请见 监管手段以获取更多信息。
使用Kill像下面:
1 | victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender()); |
在消息被actor处理的过程中可能会抛出异常,例如数据库异常。
如果消息处理过程中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。所以如果你希望重试对消息的处理,你需要自己抓住异常然后在异常处理流程中重试. 请确保你限制重试的次数,因为你不会希望系统产生活锁 (从而消耗大量CPU而于事无补)。另一种可能性请查看 PeekMailbox pattern
如果消息处理过程中发生异常,邮箱没有任何变化。如果actor被重启,邮箱会被保留。邮箱中的所有消息不会丢失。
如果角色内代码抛出了异常,那么角色将被暂停,接着监管者处理开始(见监管与监控)。依赖监管者决策角色将被恢复(就像什么事情没发生),重启(擦除内部状态重新开始)或终止。
角色钩子的丰富的生命周期提供了实现各种初始化模式的有用工具。在一个ActorRef的生命周期,一个角色可能会经历多次重新启动后,当旧的实例替换为新的,对外面观察这是不可见的,仅仅看见ActorRef。
有人可能会想到“化身”的新实例。初始化可能需要一个角色的每一个化身,但有时人们需要初始化仅发生在第一个实例诞生时,当ActorRef被创建。以下各节提供的模式为不同的初始化需求。
使用构造函数初始化有着各种好处。首先,它使得有可能使用的val字段来存储任何状态,这并不在角色实例的生命周期中变化,使得角色实现更加健壮。该构造函数被角色的每一个化身调用,所以角色的内部总是可以认为正确的初始化发生。这也是这种方法的缺点,因为有当一个人想避免在重新启动时重新初始化的内部情况。例如,在重启过程,保持整个子角色通常是有用。以下部分提供了这种情况下的模式。
在的第一个实例的初始化过程中,一个角色的preStart()方法仅仅被直接调用一次,那就是,在ActorRef的创建。在重新启动的情况下,preStart()从postRestart()被调用,因此,如果不重写,preStart()被每一个化身调用。然而,覆盖postRestart(),可以禁用此行为,并确保只调用一次preStart()。
这种模式的一个有用的用法是在重新启动时禁止创建子类新的ActorRef。这可以通过覆盖preRestart()来实现:
01 | @Override |
02 | public void preStart() { |
03 | // Initialize children here |
04 | } |
05 |
06 | // Overriding postRestart to disable the call to preStart() |
07 | // after restarts |
08 | @Override |
09 | public void postRestart(Throwable reason) { |
10 | } |
11 |
12 | // The default implementation of preRestart() stops all the children |
13 | // of the actor. To opt-out from stopping the children, we |
14 | // have to override preRestart() |
15 | @Override |
16 | public void preRestart(Throwable reason, Option<Object> message) |
17 | throws Exception { |
18 | // Keep the call to postStop(), but no stopping of children |
19 | postStop(); |
20 | } |
请注意,该子角色还在重新启动,但不会创建新的ActorRef。对子类可以递归应用相同的原则,确保他们的preStart()方法被只在创建自己的引用时调用。
了解更多信息,请参阅What Restarting Means:
有这样的情况,在构造函数中,当它是不可能传递所需的所有角色初始化的信息,例如,在存在循环的依赖关系。在这种情况下,角色应该听一个初始化消息,并利用become()或有限状态机的状态对角色的初始化和未初始化的状态进行编码。
01 | private String initializeMe = null; |
02 |
03 | @Override |
04 | public void onReceive(Object message) throws Exception { |
05 | if (message.equals("init")) { |
06 | initializeMe = "Up and running"; |
07 | getContext().become(new Procedure<Object>() { |
08 | @Override |
09 | public void apply(Object message) throws Exception { |
10 | if (message.equals("U OK?")) |
11 | getSender().tell(initializeMe, getSelf()); |
12 | } |
13 | }); |
14 | } |
15 | } |
如果在初始化之前,角色可以接收消息,一个有用的工具可能是Stash,可以保存消息直到初始化完成,在角色初始化之后重新放回。
注意:这个模式应小心使用,并且当上述的模式都不适用才使用。其中一个潜在的问题是,消息可能会在发送给远程角色丢失。另外,在未初始化状态发布一个ActorRef可能导致在其接收用户信息的初始化之前已经完
标签:
原文地址:http://my.oschina.net/u/2273085/blog/510182