标签:
学习了使用SBT开发Akka第一个案例源码解析MapActor、ReduceActor、AggregateActor,应用MapActor对单词计数,发信息给ReduceActor,对信息进行local级的汇总,然后交给AggregateActor。
案例如下:
class MapActor(reduceActor: ActorRef) extend Actor{
val STOP_WORDS_LIST=List("a","is")
deg receive: Receive={
case message: string=>
reduceActor ! evaluateExprssion(message)
case _=>
}
def evaluateExpression(line:String):MapData={
var dataList=new ArrayList[word]
var parser:StringTokenizer=new StringTokenizer(line)
var defaultCount: Tnteger=1
while (parser.hasMoreTokens()){
var word:String=parser.nextToken().toLowerCase()
if (!STOP_WORDS_LIST.contains(word)){
dataList.add(new Word(word, defaultCount))
}
}
return new MapData(dataList)
}
}
class ReduceActor(aggregateActor: ActorRef) extend Actor{
def receive : Receive = {
case message : MapData =>
aggregateActor ! reduce(message.dataList)
case _ =>
}
def reduce(dataList : ArrayList[word]);ReduceData={
var reducedMap = new HashMap[Sting,Integer]
for (wc:word <-dataList) {
var word: String =wc.word
if (reduceMap.containsKey(word){
reduceMap.put(word,reduceMap.get(word)+1)
} else{ reduceMap.put(word,1) }
}
return new ReduceData(reduceMap)}
}
class AggregateActor extend Actor {
var finalReduceMap = new HashMap[string, Integer]
def receive: Receive ={
case message: ReduceData =>
aggregateInMemoryReduce(message.reduceDataMap)
case message : Result =>
System.out.println(finalReduceMap.toString())}
def aggregateInMemoryReduce(reducedList: HashMap[String,Integer]){
var count: Integer =0
for (key <- reduceList.keyset){
if ( finalReduceMap.containsKey(key)){
count = reducedList.get(key)
count+=finalReduceMao.get(key)
finalReducedMap.put(key,count)
}else{ finalReducedMap.put(key,reducedList.get(key))}
}
}
}
}
DT大数据梦工厂Scala视频于今日更新突破100集!风雨同舟,感谢各位大数据爱好者的持续学习和支持!(DT大数据梦工厂1至100集scala的所有视频、PPT和代码在百度云盘的链接: http://url.cn/fSFPjS )
使用SBT开发Akka第一个案例源码解析MapActor、ReduceActor、AggregateActor
标签:
原文地址:http://www.cnblogs.com/tom-lee/p/4800163.html