标签:blog tco array 解决 恢复 extends size 控制 sim
对于java web而言,一个用户的HTTP请求最终会转换为一条java线程处理。HTTP本身是无状态的,具体的请求逻辑一般也是无状态的。如果进程奔溃或者系统宕机,用户会发觉当前网页不可用之类的错误。虽然会影响一些用户体验,但是只要服务重启了,用户依然可以完成他的请求并满足其需要。但是有些情况下则势必会造成混乱甚至恐慌,例如跨行转账。用户从自己A银行的账户转账1万元至自己在B银行的账户,如果转出的动作成功了,但是转入却失败了,用户的心情是可想而知的,自己的财产不翼而飞了!一种解决的方式是引入事务,在此场景下还必须是分布式事务。如果只是银行内部实现分布式事务多少还是可行的,但是不同银行之间要实现的成本是可想而知的,甚至不可行的。如果A银行转出时对用户的状态作持久化,B银行对收到的转入请求也进行持久化,那么恢复用户的损失才有可能。
以上啰里啰嗦说了这么多,无非就是抛出个引子,进而介绍Akka提供的持久化功能。
有关Akka的日志持久化和快照持久化的配置如下:
persistence { journal { plugin = "akka.persistence.journal.leveldb" leveldb.dir = "target/example/journal" leveldb.native = false } snapshot-store { plugin = "akka.persistence.snapshot-store.local" local.dir = "target/example/snapshots" } }根据配置,我们知道日志插件使用了leveldb,leveldb的存储目录为当前项目编译路径下的example/journal路径下。快照插件使用了local,存储路径与前者相同。
本例子中需要用到Cmd和Evt两种消息,Cmd代表命令,Evt代表事件。ExampleState代表我们例子中的状态。以上三个类的定义如下:
public interface Persistence { public static class Cmd implements Serializable { private static final long serialVersionUID = 1L; private final String data; public Cmd(String data) { this.data = data; } public String getData() { return data; } } public static class Evt implements Serializable { private static final long serialVersionUID = 1L; private final String data; public Evt(String data) { this.data = data; } public String getData() { return data; } } public static class ExampleState implements Serializable { private static final long serialVersionUID = 1L; private final ArrayList<String> events; public ExampleState() { this(new ArrayList<String>()); } public ExampleState(ArrayList<String> events) { this.events = events; } public ExampleState copy() { return new ExampleState(new ArrayList<String>(events)); } public void update(Evt evt) { events.add(evt.getData()); } public int size() { return events.size(); } @Override public String toString() { return events.toString(); } } }上面代码展示的Cmd和Evt都很简单,它们有一样的data字段作为内容。ExampleState中维护了一个列表,次列表用于缓存所有的事件内容。
在具体介绍本例中持久化Actor之前,先看看其实现,其代码清单如下:
@Named("ExamplePersistentActor") @Scope("prototype") public class ExamplePersistentActor extends UntypedPersistentActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); @Override public String persistenceId() { return "sample-id-1"; } private ExampleState state = new ExampleState(); public int getNumEvents() { return state.size(); } @Override public void onReceiveRecover(Object msg) { if (msg instanceof Evt) { state.update((Evt) msg); } else if (msg instanceof SnapshotOffer) { state = (ExampleState) ((SnapshotOffer) msg).snapshot(); } else { unhandled(msg); } } @Override public void onReceiveCommand(Object msg) { if (msg instanceof Cmd) { final String data = ((Cmd) msg).getData(); final Evt evt1 = new Evt(data + "-" + getNumEvents()); final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1)); persistAll(Arrays.asList(evt1, evt2), new Procedure<Evt>() { public void apply(Evt evt) throws Exception { state.update(evt); if (evt.equals(evt2)) { getContext().system().eventStream().publish(evt); } } }); } else if (msg.equals("snap")) { // IMPORTANT: create a copy of snapshot // because ExampleState is mutable !!! saveSnapshot(state.copy()); } else if (msg.equals("print")) { log.info(state.toString()); } else { unhandled(msg); } } }
ExamplePersistentActor继承了UntypedPersistentActor,并覆盖实现了三个方法:
我们先使用一段代码向持久化Actor连续发送三个Cmd,内容分别是foo、baz、bar,最后再发送一条print消息,代码如下:
final ActorRef persistentActor = actorSystem.actorOf(springExt.props("ExamplePersistentActor"), "examplePersistentActor"); persistentActor.tell(new Cmd("foo"), null); persistentActor.tell(new Cmd("baz"), null); persistentActor.tell(new Cmd("bar"), null); persistentActor.tell("print", null);运行以上代码的结果如图1所示:
这说明当前的程序逻辑准确无误。下面我们开始验证消息持久化的功效了。
我们将上述代码修改为代码清单1所示。
代码清单1
final ActorRef persistentActor = actorSystem.actorOf(springExt.props("ExamplePersistentActor"), "examplePersistentActor"); persistentActor.tell("print", null);也就是只打印状态,再次运行,输出与图1一模一样。这是因为默认情况下,持久化Actor在启动或者重启的时候会“重播”日志化的消息。这些“重播”的消息被onReceiveRecover处理后,重新更新到ExampleState的缓存中了。
最后,将上述代码修改为代码清单2所示。
代码清单2
final ActorRef persistentActor = actorSystem.actorOf(springExt.props("ExamplePersistentActor"), "examplePersistentActor"); persistentActor.tell(new Cmd("buzz"), null); persistentActor.tell("print", null);再次运行,输出如图2所示。
可以看到持久化的消息依然被“重播”,并且新打印出了我们最新发送的内容为buzz的Cmd。这些消息依然被有序的放入ExampleState的缓存。这是由于在恢复期间,新发送给持久化Actor的不会干扰到“重播”的消息,新的消息将被缓存直到恢复阶段完成之后再由持久化Actor接收。
上面的例子我们只发送了4个Cmd消息,并对其恢复。一般而言这不会有什么问题,但是当系统中的消息频率很高时,那么通过一条一条消息“重播”的方式显然是低效的,假如应用本身能够每隔一段时间利用快照存储,会极大地缩短恢复过程所需要的时间。
我们对上面的例子先进行一些修改,加入快照的生成:
final ActorRef persistentActor = actorSystem.actorOf(springExt.props("ExamplePersistentActor"), "examplePersistentActor"); persistentActor.tell(new Cmd("foo"), null); persistentActor.tell(new Cmd("baz"), null); persistentActor.tell(new Cmd("bar"), null); persistentActor.tell("snap", null); persistentActor.tell("print", null);输出与图1一致。我们此时如果再次执行代码清单1,效果依然与图1一致。此时如果你通过debug方式,你会发现其中的不同:之前的恢复过程会“重播”6条消息,这次只会收到一条SnapshotOffer消息,并直接从快照恢复。
我们再次执行代码清单2,其输出也仍然与图2一致。
通过本文的介绍,发现使用Akka的持久化功能,类似于使用java多线程中的wait/notify,lock/unlock,将功能从语法层面解决,对程序员而言能更多地关注于自身业务。
其它Akka应用的博文如下:
后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。
京东:http://item.jd.com/11846120.html
当当:http://product.dangdang.com/23838168.html
标签:blog tco array 解决 恢复 extends size 控制 sim
原文地址:http://blog.csdn.net/beliefer/article/details/53925622