码迷,mamicode.com
首页 > 其他好文 > 详细

akka入门-基于信道进行消息可靠传输

时间:2015-05-17 21:56:12      阅读:126      评论:0      收藏:0      [点我收藏+]

标签:

程序的演示场景是:处理器发送命令,接收者接收到消息后进行处理并且对发送方发送消息确认表明已经成功收到消息。如果没有发送确认则表明该消息没有被接收并正确处理。失败消息会到达死信箱,系统下次启动时后继续发送死信箱中的发送失败的消息。

1.创建信道回复命令对象

import com.center.akka.simple.command.Command;

public class ChannelReply {


  private Command command;
  private long sequenceNbr ;

  public ChannelReply(Command command, long sequenceNbr ) {
    this. command = command;
    this. sequenceNbr = sequenceNbr ;
  }

  public Command getCommand() {
    return command;
  }

  public long getSequenceNbr() {
    return sequenceNbr;
  }

  @Override
  public String toString() {
    return "ChannelReply{" + "command=" + command + ", sequenceNbr=" + sequenceNbr + '}';
  }
}
2.编写接收者对象

import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.persistence.ConfirmablePersistent;

import com.center.akka.persistent_channel.command.ChannelReply;
import com.center.akka.simple.command.Command;

public class Receiver extends UntypedActor {

  LoggingAdapter log = Logging.getLogger(getContext().system(), this);

  @Override
  public void onReceive(Object msg ) throws Exception {

    if (msg instanceof ConfirmablePersistent) {

      ConfirmablePersistent confirmablePersistent = (ConfirmablePersistent) msg;

      log.info("Incoming Paylod: " + confirmablePersistent.payload() + " #: " + confirmablePersistent.sequenceNr());

      getSender().tell( new ChannelReply((Command) confirmablePersistent.payload(), confirmablePersistent.sequenceNr()), null);
      confirmablePersistent. confirm();

    }
  }
}
3.编写处理器

import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.persistence.Deliver;
import akka.persistence.Persistent;
import akka.persistence.PersistentChannel;
import akka.persistence.PersistentChannelSettings;

import com.center.akka.persistent_channel.command.ChannelReply;

public class BaseProcessor extends UntypedActor {

  LoggingAdapter log = Logging.getLogger(getContext().system(), this);

  private ActorRef receiver;
  private ActorRef channel;

  public BaseProcessor(ActorRef receiver) {
    this. receiver = receiver;
    this. channel =
        getContext().actorOf(
            PersistentChannel.props(PersistentChannelSettings .create().withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS ))
                .withPendingConfirmationsMax(10000) // max # of pending confirmations. suspend
                                                    // delivery until <
                .withPendingConfirmationsMin(2000) // min # of pending confirmation. suspend
                                                   // delivery until >
                .withReplyPersistent( true) // ack
                .withRedeliverMax(15)) , "channel" );
  }

  @Override
  public void onReceive(Object msg ) {

    if (msg instanceof Persistent) {

      log.info("Send to Channel: " + (( Persistent) msg).payload());

      channel.tell(Deliver.create (((Persistent) msg).withPayload(((Persistent ) msg ).payload()), receiver.path()), getSelf());
    } else if (msg instanceof ChannelReply) {
      log.info(msg.toString());
    }
  }


}

4.测试类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.persistence.Persistent;

import com.center.akka.persistent_channel.actor.BaseProcessor;
import com.center.akka.persistent_channel.actor.Receiver;
import com.center.akka.simple.command.Command;

public class System {

  public static final Logger log = LoggerFactory.getLogger(System.class);

  public static void main(String... args) throws Exception {

    final ActorSystem actorSystem = ActorSystem.create("channel-system");

    Thread.sleep(2000);

    final ActorRef receiver = actorSystem.actorOf(Props.create(Receiver. class));
    final ActorRef processor = actorSystem.actorOf(Props.create(BaseProcessor. class, receiver), "channel-processor" );


    for ( int i = 0; i < 10; i++) {
      processor.tell(Persistent.create (new Command("CMD " + i )), null);
    }

    Thread.sleep(2000);

    log.debug( "Actor System Shutdown Starting..." );

    actorSystem.shutdown();
  }
}

5.输出结果

[INFO] [05/17/2015 18:29:46.699] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 0'}
[INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 1'}
[INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 2'}
[INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 3'}
[INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 4'}
[INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 5'}
[INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 6'}
[INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 7'}
[INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 8'}
[INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 9'}
[INFO] [05/17/2015 18:29:46.965] [channel-system-akka.actor.default-dispatcher-2] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 0'} #: 1
[INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 0'}, sequenceNbr=1}
[INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-2] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 1'} #: 2
[INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 1'}, sequenceNbr=2}
[INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 2'} #: 3
[INFO] [05/17/2015 18:29:46.987] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 2'}, sequenceNbr=3}
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 3'} #: 4
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 3'}, sequenceNbr=4}
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 4'} #: 5
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 5'} #: 6
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 4'}, sequenceNbr=5}
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 6'} #: 7
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 5'}, sequenceNbr=6}
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 6'}, sequenceNbr=7}
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 7'} #: 8
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-4] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 7'}, sequenceNbr=8}
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 8'} #: 9
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-4] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 8'}, sequenceNbr=9}
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 9'} #: 10
[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 9'}, sequenceNbr=10}
18:29:48.699 [main] DEBUG c.c.a.persistent_channel.app.System - Actor System Shutdown Starting...




akka入门-基于信道进行消息可靠传输

标签:

原文地址:http://blog.csdn.net/liuchangqing123/article/details/45796991

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!