码迷,mamicode.com
首页 > 其他好文 > 详细

akka入门-远程调用

时间:2015-05-26 00:19:35      阅读:225      评论:0      收藏:0      [点我收藏+]

标签:

akka远程调用有两种形式:
一种是查找远程Actors,一种是创建远程Actors。

公用的类:

import java.io.Serializable;

public class Op {

  public interface MathOp extends Serializable {
  }

  public interface MathResult extends Serializable {
  }

  static class Add implements MathOp {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;

    public Add( int n1 , int n2) {
      this. n1 = n1;
      this. n2 = n2;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class AddResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;
    private final int result ;

    public AddResult( int n1 , int n2, int result) {
      this. n1 = n1;
      this. n2 = n2;
      this. result = result;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public int getResult() {
      return result;
    }
  }

  static class Subtract implements MathOp {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;

    public Subtract( int n1 , int n2) {
      this. n1 = n1;
      this. n2 = n2;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class SubtractResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;
    private final int result ;

    public SubtractResult( int n1 , int n2, int result) {
      this. n1 = n1;
      this. n2 = n2;
      this. result = result;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public int getResult() {
      return result;
    }
  }

  static class Multiply implements MathOp {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;

    public Multiply( int n1 , int n2) {
      this. n1 = n1;
      this. n2 = n2;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class MultiplicationResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;
    private final int result ;

    public MultiplicationResult( int n1 , int n2, int result ) {
      this. n1 = n1;
      this. n2 = n2;
      this. result = result;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public int getResult() {
      return result;
    }
  }

  static class Divide implements MathOp {
    private static final long serialVersionUID = 1L;
    private final double n1 ;
    private final int n2 ;

    public Divide( double n1 , int n2) {
      this. n1 = n1;
      this. n2 = n2;
    }

    public double getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class DivisionResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final double n1 ;
    private final int n2 ;
    private final double result ;

    public DivisionResult( double n1 , int n2, double result ) {
      this. n1 = n1;
      this. n2 = n2;
      this. result = result;
    }

    public double getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public double getResult() {
      return result;
    }
  }
}

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class CreationActor extends UntypedActor {

  @Override
  public void onReceive(Object message ) throws Exception {

    if ( message instanceof Op .MathOp) {
      ActorRef calculator = getContext().actorOf(Props.create(CalculatorActor. class ));
      calculator.tell( message, getSelf());

    } else if (message instanceof Op.MultiplicationResult) {
      Op .MultiplicationResult result = (Op.MultiplicationResult) message;
      System. out .printf("Mul result: %d * %d = %d\n" , result .getN1(), result .getN2(), result.getResult());
      getContext().stop(getSender());

    } else if (message instanceof Op .DivisionResult) {
      Op .DivisionResult result = (Op .DivisionResult) message ;
      System. out .printf("Div result: %.0f / %d = %.2f\n" , result .getN1(), result .getN2(), result.getResult());
      getContext().stop(getSender());

    } else {
      unhandled( message);
    }
  }
}

import akka.actor.UntypedActor;

/**
 * 计算加减乘除的Actor getSender getSelf
 */
public class CalculatorActor extends UntypedActor {
  @Override
  public void onReceive(Object message ) {

    if ( message instanceof Op.Add) {
      Op.Add add = (Op.Add) message;
      System. out .println("Calculating " + add .getN1() + " + " + add.getN2());
      Op.AddResult result = new Op.AddResult(add .getN1(), add .getN2(), add .getN1() + add.getN2());
      getSender().tell( result, getSelf());

    } else if (message instanceof Op.Subtract) {
      Op.Subtract subtract = (Op.Subtract) message;
      System. out .println("Calculating " + subtract .getN1() + " - " + subtract .getN2());
      Op.SubtractResult result = new Op.SubtractResult(subtract .getN1(),subtract.getN2(), subtract .getN1() - subtract .getN2());
      getSender().tell( result, getSelf());

    } else if (message instanceof Op.Multiply) {
      Op.Multiply multiply = (Op.Multiply) message;
      System. out .println("Calculating " + multiply .getN1() + " * " + multiply .getN2());
      Op.MultiplicationResult result = new Op.MultiplicationResult(multiply .getN1(), multiply .getN2(), multiply .getN1() * multiply.getN2());
      getSender().tell( result, getSelf());

    } else if (message instanceof Op.Divide) {
      Op.Divide divide = (Op.Divide) message;
      System. out .println("Calculating " + divide .getN1() + " / " + divide .getN2());
      Op.DivisionResult result = new Op.DivisionResult(divide .getN1(), divide .getN2(), divide .getN1() / divide .getN2());
      getSender().tell( result, getSelf());

    } else {
      unhandled( message);
    }
  }
}
基础的配置文件:
commom.conf文件

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
     enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
    }
  }
}
calculater.conf文件

include "common"

akka {
  # LISTEN on tcp port 2552
  remote.netty.tcp.port = 8989
}

1.创建远程Actors

import java.util.Arrays;
import java.util.concurrent.Callable;

import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.dispatch.OnSuccess;
import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;

import com.typesafe.config.ConfigFactory;

public class RemoteActorDemo {

  public static void main(String args[]) {

    // 不使用默认的配置,而是选择加载选定的remote actor配置
    final ActorSystem system = ActorSystem.create( "CalculatorWorkerSystem", ConfigFactory.load(( "calculator")));

    // remote actor的ref
    final ActorRef calculatorActor = system .actorOf(Props.create(CalculatorActor. class ), "CalculatorActor" );

    System. out.println( "Started CalculatorWorkerSystem" );

    final Timeout timeout = new Timeout(Duration.create(5, "seconds"));
    Future<Object> addFuture = Patterns.ask( calculatorActor, new Op.Add(1, 2), timeout );
    Future<Object> subtractFuture = Patterns.ask( calculatorActor, newOp.Subtract(1, 2), timeout );
    Future<Object> multiplyFuture = Patterns.ask( calculatorActor, newOp.Multiply(1, 2), timeout );
    Future<Object> divideFuture = Patterns.ask( calculatorActor, newOp.Divide(1, 2), timeout );

    Iterable<Future<Object>> futureArray = Arrays.asList (addFuture , subtractFuture , multiplyFuture, divideFuture );
    Future<Iterable<Op.MathResult>> futureResult = Futures.traverse( futureArray, new Function<Future<Object>, Future<Op.MathResult>>() {
      public Future<Op.MathResult> apply( final Future<Object> param ) throws Exception {
        return Futures.future( new Callable<Op.MathResult>() {
          public Op.MathResult call() throws Exception {
            return (Op.MathResult) Await.result( param, timeout .duration());
          }
        }, system.dispatcher());
      }
    }, system.dispatcher());

    futureResult.onSuccess( new OnSuccess<Iterable<Op.MathResult>>() {
      @Override
      public void onSuccess(Iterable<Op.MathResult> result ) throws Throwable {
        for (Op.MathResult r : result ) {
          if (r instanceof Op.AddResult) {
            System. out .println("add result=" + ((Op.AddResult) r ).getResult());
          } else if (r instanceof Op.SubtractResult) {
            System. out .println("subtract result=" + ((Op.SubtractResult) r ).getResult());
          } else if (r instanceof Op.MultiplicationResult) {
            System. out .println("multiply result=" + ((Op.MultiplicationResult) r ).getResult());
          } else if (r instanceof Op.DivisionResult) {
            System. out .println("divide result=" + ((Op.DivisionResult) r ).getResult());
          }
        }
      }
    }, system.dispatcher());
  }
}

输出结果:

[INFO] [05/25/2015 23:48:23.062] [main] [Remoting] Starting remoting
[INFO] [05/25/2015 23:48:23.225] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://CalculatorWorkerSystem@127.0.0.1:8989]
[INFO] [05/25/2015 23:48:23.227] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://CalculatorWorkerSystem@127.0.0.1:8989]
Started CalculatorWorkerSystem
Calculating 1 + 2
Calculating 1 - 2
Calculating 1 * 2
Calculating 1.0 / 2
add result=3
subtract result=-1
multiply result=2
divide result=0.5

2.查找远程Actors
remotelookup.conf文件

include "common"

akka {
  remote.netty.tcp.port = 2553
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;

import com.typesafe.config.ConfigFactory;

public class RemoteActorSelectionDemo {

  public static class HandlerResult extends UntypedActor {

    @Override
    public void preStart() throws Exception {
      ActorSelection selection = this .getContext().actorSelection("akka.tcp://CalculatorWorkerSystem@127.0.0.1:8989/user/CalculatorActor" );
      System. out .println("selection : " + selection );
      selection.tell( new Op.Add(1, 2), this.getSelf());
    }


    @Override
    public void onReceive(Object message ) throws Exception {
      if ( message instanceof Op.AddResult) {
        System. out .println("add result=" + ((Op.AddResult) message ).getResult());
      } else if (message instanceof Op.SubtractResult) {
        System. out .println("subtract result=" + ((Op.SubtractResult) message ).getResult());
      } else if (message instanceof Op.MultiplicationResult) {
        System. out .println("multiply result=" + ((Op.MultiplicationResult) message ).getResult());
      } else if (message instanceof Op.DivisionResult) {
        System. out .println("divide result=" + ((Op.DivisionResult) message ).getResult());
      }
    }
  }

  public static void main(String args[]) {
    // 不使用默认的配置,而是选择加载选定的remote actor配置
    final ActorSystem system = ActorSystem.create( "CalculatorWorkerSystem", ConfigFactory.load(( "calculator")));
    // 初始化远程actor
    ActorRef actref = system .actorOf(Props.create(CalculatorActor. class ),"CalculatorActor" );
    System. out.println( "Started CalculatorWorkerSystem" );

    // 初始化本地的Actor
    final ActorSystem localSystem = ActorSystem.create( "localSystem", ConfigFactory.load(( "remotelookup")));
    localSystem .actorOf(Props.create(HandlerResult. class ), "handlerResult" );

  }
}

输出结果:

[INFO] [05/25/2015 23:50:17.523] [main] [Remoting] Starting remoting
[INFO] [05/25/2015 23:50:17.689] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://CalculatorWorkerSystem@127.0.0.1:8989]
[INFO] [05/25/2015 23:50:17.691] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://CalculatorWorkerSystem@127.0.0.1:8989]
Started CalculatorWorkerSystem
[INFO] [05/25/2015 23:50:17.714] [main] [Remoting] Starting remoting
[INFO] [05/25/2015 23:50:17.724] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://localSystem@127.0.0.1:2553]
[INFO] [05/25/2015 23:50:17.725] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://localSystem@127.0.0.1:2553]
selection : ActorSelection[Anchor(akka.tcp://CalculatorWorkerSystem@127.0.0.1:8989/), Path(/user/CalculatorActor)]
Calculating 1 + 2
add result=3





akka入门-远程调用

标签:

原文地址:http://blog.csdn.net/liuchangqing123/article/details/45999067

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!