标签:
Akka MessageDispatcher是维持Akka Actor “运作”的部分, 可以说它是整个机器的引擎。在application.conf文件中配置
my-dispatcher { # Dispatcher 是基于事件的派发器的名称 type = Dispatcher # 使用何种ExecutionService executor = "fork-join-executor" # 配置 fork join 池 fork-join-executor { # 容纳基于倍数的并行数量的线程数下限 parallelism- min = 2 #并行数(线程) ... ceil (可用CPU数*倍数) parallelism-factor = 2.0 #容纳基于倍数的并行数量的线程数上限 parallelism-max = 10 } # Throughput 定义了线程切换到另一个actor之前处理的消息数上限 # 设置成1表示尽可能公平. throughput = 100 }
import com.center.akka.simple.actor.SimpleActor; import com.center.akka.simple.command.Command; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class DispatchersTest { public static void main(String[] args) throws InterruptedException { ActorSystem system = ActorSystem.create( "MySystem"); //为 Actor 指定派发器 ActorRef myActor = system.actorOf(Props.create(SimpleActor. class ).withDispatcher("my-dispatcher" ), "myactor" ); myActor.tell( new Command("CMD 1" ), ActorRef.noSender()); Thread. sleep(2000); system.shutdown(); } }
import com.typesafe.config.Config; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.dispatch.PriorityGenerator; import akka.dispatch.UnboundedPriorityMailbox; public class MyPrioMailbox extends UnboundedPriorityMailbox { public MyPrioMailbox(ActorSystem.Settings settings, Config config) { // needed for reflective instantiation // Create a new PriorityGenerator, lower prio means more important super (new PriorityGenerator() { @Override public int gen(Object message) { if (message.equals("highpriority" )) return 0; // ' highpriority messages should be treated first if possible else if (message.equals( "lowpriority")) return 2; // ' lowpriority messages should be treated last if possible else if (message.equals(PoisonPill.getInstance())) return 3; // PoisonPill when no other left else return 1; // By default they go between high and low prio } }); } }
prio -dispatcher { mailbox-type = "com.center.akka.dispatchers.MyPrioMailbox" }测试类:
import com.center.akka.simple.actor.SimpleActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; public class PrioMailboxTest { public static void main(String[] args) throws InterruptedException { ActorSystem system = ActorSystem.create( "MySystem"); ActorRef myActor = system.actorOf(Props.create(SimpleActor. class ).withDispatcher("prio-dispatcher" )); myActor.tell( "lowpriority" , null ); myActor.tell( "lowpriority" , null ); myActor.tell( "highpriority" , null ); myActor.tell( "pigdog" , null ); myActor.tell( "pigdog2" , null ); myActor.tell( "pigdog3" , null ); myActor.tell( "highpriority" , null ); myActor.tell(PoisonPill. getInstance(), null); Thread. sleep(2000); system.shutdown(); } }
[INFO] [05/20/2015 17:07:37.459] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] SimpleActor constructor [INFO] [05/20/2015 17:07:37.459] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: highpriority [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: highpriority [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: pigdog [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: pigdog2 [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: pigdog3 [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: lowpriority [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: lowpriority
标签:
原文地址:http://blog.csdn.net/liuchangqing123/article/details/45873101