Akka实现WordCount(Scala):
架构图:
项目结构:
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.citi.sky</groupId> <artifactId>AkkaPJ</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>AkkaPJ</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.6</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.6</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.6</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>2.3.3</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-testkit_2.11</artifactId> <version>2.3.6</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.11</artifactId> <version>3.0.4</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
消息:
case class MapData (dataList: List[WordCount]) case class ReduceData (reduceDataList: Map[String, Int]) case class Result() case class WordCount (key: String, count: Int)
Actors:
MasterActor
import akka.actor.Actor import akka.actor.Props import com.citi.dw.messages.Result class MasterActor extends Actor { private val aggregateActor = context.actorOf(Props(classOf[AggregateActor]), "aggregateActor") private val reduceActor = context.actorOf(Props(classOf[ReduceActor], aggregateActor), "reduceActor") private val mapActor = context.actorOf(Props(classOf[MapActor], reduceActor), "mapActor") def receive: Actor.Receive = { case msg: String => { mapActor ! msg } case msg: Result => { aggregateActor.forward(msg) } // case msg: Map[String, Int] => case _ => println("MasterActor receive wrong message.") } }
MapActor:
import akka.actor.Actor import com.citi.dw.messages.MapData import com.citi.dw.messages.WordCount import scala.collection.mutable.ListBuffer import akka.actor.ActorRef class MapActor(val reduceActor: ActorRef) extends Actor { def receive: Actor.Receive = { case msg: String => { val mapData = evaluateExpression(msg) reduceActor ! mapData } case _ => println("MapActor receive wrong message.") } private[this] def evaluateExpression(line: String): MapData = { val dataList = ListBuffer[WordCount]() line.split(" ").map(word => dataList += WordCount(word, 1)) // val wordArr = line.split(" ") // for(word <- wordArr) { // dataList += WordCount(word, 1) // } // println(dataList) MapData(dataList.toList) } }
ReduceActor:
import akka.actor.Actor import com.citi.dw.messages.MapData import com.citi.dw.messages.ReduceData import com.citi.dw.messages.WordCount import scala.collection.mutable.HashMap import akka.actor.ActorRef class ReduceActor(val aggregateActor: ActorRef) extends Actor { def receive: Actor.Receive = { case msg: MapData => { val reduceData = reduce(msg.dataList) aggregateActor ! reduceData } case _ => println("ReduceActor receive wrong message.") } private[this] def reduce(dataList: List[WordCount]): ReduceData = { val reduceMap = HashMap[String, Int]() for (wc <- dataList) { wc match { case WordCount(key, count) if reduceMap.contains(key) => { val localSumCount = reduceMap.get(key).get + count reduceMap += ((key, localSumCount)) // println(reduceMap) } case WordCount(key, count) => { reduceMap += ((key, 1)) // println(reduceMap) } } } ReduceData(reduceMap.toMap) } }
AggregateActor:
import akka.actor.Actor import com.citi.dw.messages.ReduceData import scala.collection.mutable.HashMap import com.citi.dw.messages.Result import akka.actor.ActorRef class AggregateActor extends Actor { private[this] var finalReduceMap = HashMap[String, Int]() def receive: Actor.Receive = { case msg: ReduceData => { aggregateAndReduce(msg.reduceDataList) } case msg: Result => { // println(f"Result: ${finalReduceMap}") // sender().tell(finalReduceMap.toMap, ActorRef.noSender) sender ! finalReduceMap.toMap } case _ => println("AggregateActor receive wrong message.") } private[this] def aggregateAndReduce(reduceList: Map[String, Int]) = { // println(s"final: ${finalReduceMap}") for (key <- reduceList.keys) { if (finalReduceMap.contains(key)) { val count = finalReduceMap.get(key).get + reduceList.get(key).get finalReduceMap += ((key, count)) } else { finalReduceMap += ((key, reduceList.get(key).get)) } } } }
主程序:
import akka.actor.ActorSystem import akka.actor.Props import com.citi.dw.actors.MasterActor import com.citi.dw.messages.Result import akka.pattern.ask import scala.concurrent.duration._ import akka.util.Timeout import scala.util._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Await object AkkaWordCount extends App { implicit val timeout = Timeout(5 seconds) val system = ActorSystem("WordCountAkka") val master = system.actorOf(Props(classOf[MasterActor]), "master") master ! "Hi! Hi!" master ! ("My name is Sky. I am so so so happy to be here ") master ! ("Today, I am going to introduce word count for Akka ") master ! ("I hope hope It is helpful to you ") master ! ("Thank you ") Thread.sleep(1000) val future = master ? Result() // future.onComplete({ // case Success(x: String) => println(x) // case Failure(t) => println(t) // case msg => println("unknown message! " + msg) // }) val result = Await.result(future, timeout.duration).asInstanceOf[Map[String, Int]] result.map(m => println(m._1, m._2)) system.shutdown() }
运行结果:
(for,1)
(name,1)
(count,1)
(is,2)
(am,2)
(My,1)
(going,1)
(so,3)
(introduce,1)
(Sky.,1)
(I,3)
(to,3)
(Hi!,2)
(you,2)
(here,1)
(happy,1)
(Thank,1)
(hope,2)
(Today,,1)
(helpful,1)
(Akka,1)
(It,1)
(be,1)
(word,1)