码迷,mamicode.com
首页 > 其他好文 > 详细

Akka学习 实现workcount

时间:2018-07-05 23:15:58      阅读:190      评论:0      收藏:0      [点我收藏+]

标签:学习   常用   buffer   hashmap   case   ceo   flat   turn   while   

package com.dcx.scala.actor import akka.actor.{Actor, ActorRef, ActorSystem, Props} import scala.collection.mutable.HashMap import scala.collection.mutable.ListBuffer import scala.io.Source /** * 思路: * 要有个Server * 要有个Client去通信,client统计文本后把(qy,3)输出给Server;Server再把所有的qy聚合,放到ListBuffer中 */ object AkkaWordCount { // 可变长List val list = new ListBuffer[HashMap[String,Int]] def main(args: Array[String]): Unit = { // 输入数据文本 val files: Array[String] = Array("D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt") //存放接收到的每个actor处理的结果数据 //存放有actor返回结果的Future数据 //拿ActorSystem是一个静态工厂 val weChatApp = ActorSystem("WeChatApp") //拿到两个Actor的通信地址 val akkaServerRef: ActorRef = weChatApp.actorOf(Props[AkkaServer],"jianjian1") val clientRef: ActorRef = weChatApp.actorOf(Props(new Client(akkaServerRef)),"jianjian") for (file <- files) { clientRef ! file } // 让该线程先睡一下,过早进入死循环会导致list没有3个,一直循环不出来 Thread.sleep(1000) // 如果list把三个文件都放满了,就退出循环 while(true){ if(list.size == 3){ // 输出list println(list(list.size -1)) return } } } } //把每次聚合后的值都发送给AkkaServer class Client(val serverRef:ActorRef) extends Actor { override def receive: Receive = { { // 偏函数 常用作模式匹配 // case filePath: String => { //// map阶段 // val list: List[String] = Source.fromFile(filePath).getLines().toList // val words: List[String] = list.flatMap(_.split(" ")) // val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size) // //异步发送结果数据 res发送到Server,去模式匹配 // serverRef ! res // } case filePath:String => { val list: List[String] = Source.fromFile(filePath).getLines().toList val words: List[String] = list.flatMap(_.split(" ")) // 得出: (qy,3) 格式 val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size) serverRef ! res } } } } import scala.collection.mutable.HashMap class AkkaServer extends Actor { private var hashMap: HashMap[String, Int] = new HashMap[String, Int] override def receive: Receive = { case context: Map[String, Int] =>{ // (qy,3) context.map( (map:(String,Int)) => { // 聚合 val value: Any = hashMap.getOrElse(map._1,None) if(value != None){ hashMap(map._1) = value.asInstanceOf[Int] + map._2 }else{ hashMap(map._1) = map._2 } } ) AkkaWordCount.list += hashMap } } }

Akka学习 实现workcount

标签:学习   常用   buffer   hashmap   case   ceo   flat   turn   while   

原文地址:http://blog.51cto.com/dangchuxiang/2136838

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!