标签:
该话题涉及几个概念:1.创建处理器状态对象
import java.io.Serializable; import java.util.ArrayList; import com.center.akka.simple.event.Event; public class ProcessorState implements Serializable { private final ArrayList<String> events ; public ProcessorState() { this(new ArrayList<String>()); } public ProcessorState(ArrayList<String> events) { this.events = events ; } public ProcessorState copy() { return new ProcessorState(new ArrayList<String>(events )); } public void update(Event event ) { events.add( event.toString()); } @Override public String toString() { return events .toString(); } }2.事件处理器
import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; public class EventHandler extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); @Override public void onReceive(Object msg ) throws Exception { log.info( "Handled Event: " + msg ); } }
import java.util.UUID; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Procedure; import akka.persistence.SnapshotOffer; import akka.persistence.UntypedEventsourcedProcessor; import com.center.akka.simple.command.Command; import com.center.akka.simple.event.Event; public class BaseProcessor extends UntypedEventsourcedProcessor { LoggingAdapter log = Logging.getLogger(getContext().system (), this ); /** * The state of the processor */ private ProcessorState processorState = new ProcessorState(); /** * Called on restart. Loads from Snapshot first, and then replays Journal Events to update state. * * @param msg */ public void onReceiveRecover(Object msg ) { log.info("Received Recover: " + msg); if (msg instanceof Event) { System. out.println("onReceiveRecover -- msg instanceof Event"); System. out.println("event --- " + ((Event) msg).getData()); processorState.update((Event) msg); } else if (msg instanceof SnapshotOffer) { System. out.println("onReceiveRecover -- msg instanceof SnapshotOffer"); processorState = (ProcessorState) ((SnapshotOffer) msg).snapshot(); } } /** * Called on Command dispatch * * @param msg */ public void onReceiveCommand(Object msg ) { log.info("Received Command: " + msg); if (msg instanceof Command) { final String data = ((Command) msg).getData(); // generate an event we will persist after being enriched with a uuid final Event event = new Event(data , UUID.randomUUID().toString()); // persist event and THEN update the state of the processor persist( event, new Procedure<Event>() { public void apply(Event evt) throws Exception { processorState.update(evt ); // broadcast event on eventstream 发布该事件 getContext().system().eventStream().publish( evt); } }); } else if (msg .equals("snapshot" )) { saveSnapshot( processorState.copy()); } else if (msg .equals("printstate" )) { log.info(processorState.toString()); } } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import com.center.akka.eventsourcing_persistence.event.EventHandler; import com.center.akka.eventsourcing_persistence.persistence.BaseProcessor; import com.center.akka.simple.command.Command; import com.center.akka.simple.event.Event; public class System { public static final Logger log = LoggerFactory.getLogger(System.class); public static void main(String... args) throws Exception { final ActorSystem actorSystem = ActorSystem.create("actor-server"); final ActorRef handler = actorSystem.actorOf(Props.create(EventHandler. class)); // 订阅 actorSystem.eventStream ().subscribe(handler , Event.class); Thread.sleep(5000); final ActorRef actorRef = actorSystem.actorOf(Props.create(BaseProcessor. class), "eventsourcing-processor" ); actorRef.tell( new Command("CMD 1" ), null); actorRef.tell( new Command("CMD 2" ), null); actorRef.tell( new Command("CMD 3" ), null); actorRef.tell( "snapshot", null );//发送保存快照命令 actorRef.tell( new Command("CMD 4" ), null); actorRef.tell( new Command("CMD 5" ), null); actorRef.tell( "printstate", null ); Thread.sleep(5000); log.debug( "Actor System Shutdown Starting..." ); actorSystem.shutdown(); } }先发送三条命令进行执行,接着执行生成快照命令,此时前三条命令会保存在快照中,然后发送4、5两条命令进行执行。
[INFO] [05/17/2015 18:09:56.037] [actor-server-akka.actor.default-dispatcher-2] [akka://actor-server/user/eventsourcing-processor] Received Recover: RecoveryCompleted [INFO] [05/17/2015 18:09:56.044] [actor-server-akka.actor.default-dispatcher-2] [akka://actor-server/user/eventsourcing-processor] Received Command: Command{data='CMD 1'} [INFO] [05/17/2015 18:09:56.272] [actor-server-akka.actor.default-dispatcher-2] [akka://actor-server/user/eventsourcing-processor] Received Command: Command{data='CMD 2'} [INFO] [05/17/2015 18:09:56.272] [actor-server-akka.actor.default-dispatcher-3] [akka://actor-server/user/$a] Handled Event: Event{data='CMD 1', uuid='21056184-f01d-47d5-aa37-fc08a521c8bd'} [INFO] [05/17/2015 18:09:56.273] [actor-server-akka.actor.default-dispatcher-2] [akka://actor-server/user/$a] Handled Event: Event{data='CMD 2', uuid='5dfad32f-a08c-43ac-99a5-ff644c600394'} [INFO] [05/17/2015 18:09:56.273] [actor-server-akka.actor.default-dispatcher-3] [akka://actor-server/user/eventsourcing-processor] Received Command: Command{data='CMD 3'} [INFO] [05/17/2015 18:09:56.275] [actor-server-akka.actor.default-dispatcher-4] [akka://actor-server/user/eventsourcing-processor] Received Command: snapshot [INFO] [05/17/2015 18:09:56.275] [actor-server-akka.actor.default-dispatcher-3] [akka://actor-server/user/$a] Handled Event: Event{data='CMD 3', uuid='fc185f7d-898d-4cc4-af20-fcff497f7f7c'} [INFO] [05/17/2015 18:09:56.276] [actor-server-akka.actor.default-dispatcher-4] [akka://actor-server/user/eventsourcing-processor] Received Command: Command{data='CMD 4'} [INFO] [05/17/2015 18:09:56.276] [actor-server-akka.actor.default-dispatcher-2] [akka://actor-server/user/eventsourcing-processor] Received Command: Command{data='CMD 5'} [INFO] [05/17/2015 18:09:56.276] [actor-server-akka.actor.default-dispatcher-4] [akka://actor-server/user/$a] Handled Event: Event{data='CMD 4', uuid='8b5c5202-7e23-480a-97af-c0841668118d'} [INFO] [05/17/2015 18:09:56.277] [actor-server-akka.actor.default-dispatcher-4] [akka://actor-server/user/eventsourcing-processor] Received Command: printstate [INFO] [05/17/2015 18:09:56.277] [actor-server-akka.actor.default-dispatcher-2] [akka://actor-server/user/$a] Handled Event: Event{data='CMD 5', uuid='e4a3f258-5d0b-431a-93cc-8f857a19a400'} [INFO] [05/17/2015 18:09:56.277] [actor-server-akka.actor.default-dispatcher-4] [akka://actor-server/user/eventsourcing-processor] [Event{data='CMD 1', uuid='21056184-f01d-47d5-aa37-fc08a521c8bd'}, Event{data='CMD 2', uuid='5dfad32f-a08c-43ac-99a5-ff644c600394'}, Event{data='CMD 3', uuid='fc185f7d-898d-4cc4-af20-fcff497f7f7c'}, Event{data='CMD 4', uuid='8b5c5202-7e23-480a-97af-c0841668118d'}, Event{data='CMD 5', uuid='e4a3f258-5d0b-431a-93cc-8f857a19a400'}] [INFO] [05/17/2015 18:09:56.283] [actor-server-akka.actor.default-dispatcher-3] [akka://actor-server/user/eventsourcing-processor] Received Command: SaveSnapshotSuccess(SnapshotMetadata(/user/eventsourcing-processor,3,1431857396276)) 18:10:00.417 [main] DEBUG c.c.a.e.app.System - Actor System Shutdown Starting...
标签:
原文地址:http://blog.csdn.net/liuchangqing123/article/details/45796871