标签:
本来只是想写一个 ForkJoin 的示例,但写着写着就加入了 akka, future 的元素, 是在解决问题的过程中逐渐引入的。我觉得这种学习的方式很好,就是在解决一个问题的过程中,可以综合地探索和学习到很多不同的东西。传统的学习讲究"循序渐进"的方式,但是"跳跃式+快速试错"也许是学习新技术的更好的方法。 :)
原本是想实现十亿个不重复整数的排序, 由于文件外排序没有解决,因此,暂时实现的是一千万个不重复数,可以一次性加载到 2G 的内存里。
一、 任务拆分
首先要进行任务拆分。要实现一千万个不重复整数的排序, 可以拆分为三个子任务: (1) 生成一千万的不重复整数并写入文件 NumberGeneratorTask; (2) 从文件读取并检测确实生成的是一千万个不重复的整数 CheckUnduplicatedNumbersActor; (3) 从文件读取整数进行排序和排序检测 BigfileSortActor。接下来逐一实现这些子任务。入口如下。这里使用了 Akka 的框架及 Java ForkJoin 线程池实例。
package scalastudy.concurrent.forkjoin import java.util.concurrent.{TimeUnit, ForkJoinPool} import akka.actor.{Props, ActorSystem} import scalastudy.concurrent.actors.{BigFileSortActor, CheckUnduplicatedNumbersActor, StatWordActor} import scalastudy.concurrent.config.ActorSystemFactory /** * Created by shuqin on 16/5/18. */ object BillionNumberSort extends App { val numbers = 10000000 // 在 [0, 2^31-1] 生成 numbers 个不重复的整数 launch() def launch(): Unit = { val system:ActorSystem = ActorSystemFactory.newInstance() val bigfileSortActor = system.actorOf(Props(new BigFileSortActor(numbers))) val checkNumberActor = system.actorOf(Props(new CheckUnduplicatedNumbersActor(numbers, bigfileSortActor)), name="checkNumberActor") val numGenTask = new NumberGeneratorTask(numbers, 0, Integer.MAX_VALUE, checkNumberActor) val pool = new ForkJoinPool() pool.execute(numGenTask) pool.shutdown; pool.awaitTermination(420, TimeUnit.SECONDS); pool.shutdownNow } }
二、 生成一千万个不重复整数 NumberGeneratorTask
显然,这个子任务是可以采用 ForkJoin 来完成的。 ForkJoin 是分治思想的框架性实现, 将原问题分解为同样性质的多个子问题,然后将子问题的解组合起来得到原问题的解。通常采用二分法。实现上,通常会采用递归结构, 注意递归不要太深。 NumberGeneratorTask 的实现如下:
package scalastudy.concurrent.forkjoin import java.util.concurrent.RecursiveAction import akka.actor.ActorRef import zzz.study.algorithm.select.RandomSelector /** * Created by shuqin on 16/5/19. * * 在 [start, end] 选出 num 个不重复的整数 * */ class NumberGeneratorTask(num:Int, start:Int, end:Int, checkNumberActor: ActorRef) extends RecursiveAction { // 每次生成不超过 threshold 个不重复的整数数组; // 该值不能过小, 否则会因递归层次过深导致内存不足. private val threshold = 500 override def compute(): Unit = { // println("Select: " + num + " unduplicated numbers from [" + start + " " + end + ")"); if (num <= threshold) { if (num > end - start+1) { checkNumberActor ! start.to(end).toList } else { val randInts = RandomSelector.selectMDisorderedRandInts2(num, end-start+1) checkNumberActor ! randInts.map(i=>i+start).toList } } else { val middle = start/2 + end/2 val leftTask = new NumberGeneratorTask(num/2, start, middle, checkNumberActor) val rightTask = new NumberGeneratorTask((num+1)/2, middle+1, end, checkNumberActor) //println("Left: [" + start + "-" + middle + "," + num/2 + "]") //println("Right: [" + (middle+1) + "-" + end + "," + (num+1)/2 + "]") leftTask.fork rightTask.fork leftTask.join rightTask.join checkNumberActor ! (start, end) } } }
三、 检测生成的一千万个整数不重复 CheckUnduplicatedNumbersActor
嗯,看上去没有什么特别的技巧。有三点值得注意: (1) 这里涉及到 Actor 通信; 怎样判断整数生成任务完成从而可以开始检测了呢?在 NumberGeneratorTask 生成最后一组整数时并回退到最开始的调用层时,就会发送 (0, Integer.MAX_VALUE) 作为信号, 而 CheckUnduplicatedNumbersActor 则通过 case (0, Integer.MAX_VALUE) 可以匹配到这一点; (2) 在这里也使用了策略模式。对于一千万个整数来说,内存占用 40M 左右, 2G 内存是装滴下的, 若是十亿个整数,那么就需要 4G,就不能一次性加载了。因此这里定义了个接口并实现了一次性加载策略。 读者感兴趣可以实现多次加载策略,以应对内存不够的情形; (3) Source.fromFile(filename).getLines 这里返回的是迭代器, 如果内存不够用的话,就必须使用这个方法,而不是 Source.fromFile(filename).getLines.toList , 后者会将所有行全部加载到内存中从而导致 OutOfMemoryError .
package scalastudy.concurrent.actors import java.io.{File, PrintWriter} import akka.actor.{ActorRef, Actor} import scala.io.Source import scala.collection.immutable.{List} import scala.collection.mutable.Set import scalastudy.utils.{PathConstants} /** * Created by shuqin on 16/5/19. */ class CheckUnduplicatedNumbersActor(numbers:Int, bigfileSortActor: ActorRef) extends Actor { val filename = PathConstants.projPath + "/data/"+numbers+".txt" val fwResult = new PrintWriter(new File(filename)) var count = 0; def checkUnduplicatedNumbers(): Unit = { println("Expected: " + numbers + " , Actual Received: " + count) assert(count == numbers) assert(new OnceLoadStrategy().checkUnduplicatedNumbersInFile(filename) == true) println("checkUnduplicatedNumbers passed.") } override def receive: Receive = { case numberList: List[Int] => fwResult.write(numberList.mkString(" ") + "\n"); count += numberList.length case (0, Integer.MAX_VALUE) => println("Reach End.") fwResult.flush fwResult.close checkUnduplicatedNumbers bigfileSortActor ! filename } /** * 一次性加载所有数到内存, 适用于内存可以装下所有数的情况 * 比如 10000000 个整数占用 40M 空间, 2G 内存是绰绰有余的, 但十亿占用 4G 空间失效 */ class OnceLoadStrategy extends CheckUnduplicatedStrategy { def checkUnduplicatedNumbersInFile(filename:String):Boolean = { var numbersInFile = 0 val unDupNumberSet = Set[Int]() Source.fromFile(filename).getLines. foreach { line => val numbersInLine = line.split("\\s+").map(Integer.parseInt(_)).toSet numbersInFile += numbersInLine.size; unDupNumberSet ++= numbersInLine } println("Expected: " + numbers + " , Actual In File: " + numbersInFile) println("Unduplicated numbers in File: " + unDupNumberSet.size) unDupNumberSet.size == numbers } } trait CheckUnduplicatedStrategy { def checkUnduplicatedNumbersInFile(filename:String):Boolean } }
四、 大文件排序 BigfileSortActor
Oh, 终于进入正题了。大文件排序当然采用归并排序了。 在这个实现里,值得注意的是采用了 Future 全异步框架。 可以看到:
(1) def produceFuture(line:String): Future[List[List[Int]]] 将文件的每一行(包含 threshold 个整数)转化为一个对行内整数排序的 Future, 可以在后续获取结果; 对于一个文件,就是获得了 futureTasks = List[Future[List[List[Int]]]] ; List[List[Int]] 是为了让后面的 Reduce 语法上走得通。
(2) val sortedListsFuture = Future.reduce(futureTasks)(cat(_,_)) 将 List[Future[List[List[Int]]]] 整合成一个 TotalFuture, 这个 TotalFuture 的结果是 futureTasks 里面的所有 Future 结果的连接; 每一个 Future 的结果是一个已排序的列表; 那么 TotalFuture 的结果是一个已排序列表的列表。
(3) 注意到下面这行代码: 是将一个 Future A 转化为另一个 Future B. 其中 B 的结果是基于 A. 在本例中,即是将已排序列表的列表合并为最终列表,但仍然返回的是 Future 而不是最终列表。为什么要这么写, 而不是将 sortedListsFuture 的结果取出来再合并呢? 这是由于之前的所有动作都是异步的。 如果应用只是取排序的结果,那么也没什么; 但如果应用要将 sortedListsFuture 的结果写入文件呢? 进而还要做一下排序检测? 那么, 就不得不在后面加入 TimeUnit.SECONDS.sleep(n) 的代码, 让主线程休息一会了(因为前面整个是异步的, 在 sortedListsFuture 还没完成时,后面的代码就会被执行了)! 而且你得不断估计前面的排序/合并操作究竟大约需要多少时间从而不断调整休眠的时间! 之前就是这样实现的! 但这样并不符合 Future 异步框架的初衷! 因此后面,我突然觉得要写成全异步的, 也体验到了写成全异步应用的滋味~~ :) 要求确实是有点高,需要不断从 Future 转换成新的 Future ~~ 同时你也发现, Scala Future 也提供了一个帮助编写全异步框架的 API ~~
sortedListsFuture map { value:List[List[Int]] => CollectionUtil.mergeKOrderedList(value) }
(4) 由于后面将排序结果写入文件以及从文件检测排序是否 OK 都是同步的,因此,可以在排序 Future 完成后执行。 注意到 Future 的非阻塞写法: f.onComplete { case Success(result) => doWith(result) ; case Failure(ex) => doWith(ex) }
(5) 为了将列表链接起来,也试错了好几次: (x :: y :: Nil).flatten ; 如果写成 reduce(_ :: _ :: Nil) 是会报错的; 写成 reduce(_.flatten :: _.flatten :: Nil) 最终会合并成两个列表不符合预期。
package scalastudy.concurrent.actors import java.io.{File, PrintWriter} import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{TimeUnit, Callable, Executors} import akka.actor.{Props, ActorSystem, Actor} import akka.dispatch.Futures import scala.collection.mutable.ListBuffer import scala.concurrent.{Future} import scala.io.Source import scala.util.{Failure, Success} import scalastudy.utils.{CollectionUtil, PathConstants, DefaultFileUtil} import scala.concurrent.ExecutionContext.Implicits.global /** * Created by shuqin on 16/5/20. */ class BigFileSortActor(numbers: Int) extends Actor { override def receive: Receive = { case filename:String => println("Received File: " + filename) sortFile(filename) } def produceFuture(line:String): Future[List[List[Int]]] = { val origin = line.split("\\s+").map( s => Integer.parseInt(s)).toList Future { List(origin.sorted) } } def cat(x: List[List[Int]],y:List[List[Int]]): List[List[Int]] = { return (x :: y :: Nil).flatten } def obtainSortedFuture(futureTasks:List[Future[List[List[Int]]]]):Future[List[Int]] = { val sortedListsFuture = Future.reduce(futureTasks)(cat(_,_)) sortedListsFuture map { value:List[List[Int]] => CollectionUtil.mergeKOrderedList(value) } } def checkSorted(filename:String): Unit = { var last = 0 var count = 0 Source.fromFile(filename + ".sorted.txt").getLines().toList.foreach { line => val number = Integer.parseInt(line.trim) assert(number > last) count += 1 last = number } assert(count == numbers) println("test sorted passed.") } def sortFile(filename:String):Unit = { val futureTasks = DefaultFileUtil.readFileLines(filename).map(produceFuture(_)) println("task numbers: " + futureTasks.size) val allNumberSortedFuture = obtainSortedFuture(futureTasks) allNumberSortedFuture.onComplete { case Success(value:List[Int]) => println("sort finished.") writeSorted(value, filename) checkSorted(filename) case Failure(ex) => println("Sort failed: " + ex.getMessage) } } def writeSorted(allNumberSorted: List[Int], filename: String): Unit = { val fwResult = new PrintWriter(new File(filename + ".sorted.txt")) fwResult.write(allNumberSorted.mkString("\n")) fwResult.flush fwResult.close } } object BigFileSortActorTest { def main(args:Array[String]):Unit = { val numbers = 10000000 val system = ActorSystem("BigFileSortActorTest") val bigFileSortActor = system.actorOf(Props(new BigFileSortActor(numbers)),name="bigFileSortActor") bigFileSortActor ! PathConstants.projPath + "/data/" + numbers +".txt" TimeUnit.SECONDS.sleep(640) system.shutdown } }
五、 辅助类
(1) CollectionUtil 实现了一个二路有序列表合并和多路有序列表合并。 其中多路有序列表合并后续可以优化成并行的。Scala 的 List 是一个列表 (head::(tail::Nil)), 空列表可以用 List(), Nil 来表示; 取元素可以用 List(index) 或 List.head , List.tail ;
(2) 从 N 个数中选出不重复的 M 个数参见 RandomSelector 的实现。
package scalastudy.utils import scala.collection.mutable import scala.collection.mutable.{ListBuffer, Map} /** * Created by lovesqcc on 16-4-2. */ object CollectionUtil { def main(args: Array[String]): Unit = { testMerge val map = Map("shuqin" -> 31, "fanfan" -> 26, "yanni" -> 28) sortByValue(map).foreach { println } } def testMerge(): Unit = { assert(CollectionUtil.merge(Nil, Nil) == List()) assert(CollectionUtil.merge(List(), Nil) == List()) assert(CollectionUtil.merge(List(), List()) == List()) assert(CollectionUtil.merge(List(), List(1,3)) == List(1,3)) assert(CollectionUtil.merge(List(4,2), List()) == List(4,2)) assert(CollectionUtil.merge(List(4,2), Nil) == List(4,2)) assert(CollectionUtil.merge(List(2,4), List(1,3)) == List(1,2,3,4)) assert(CollectionUtil.merge(List(2,4), List(1,3,5)) == List(1,2,3,4,5)) assert(CollectionUtil.merge(List(2,4,6), List(1,3)) == List(1,2,3,4,6)) assert(CollectionUtil.mergeKOrderedList(Nil) == List()) assert(CollectionUtil.mergeKOrderedList(List()) == List()) assert(CollectionUtil.mergeKOrderedList(List(List())) == List()) assert(CollectionUtil.mergeKOrderedList(List(List(1,2))) == List(1,2)) assert(CollectionUtil.mergeKOrderedList(List(List(), List())) == List()) assert(CollectionUtil.mergeKOrderedList(List(List(), List(1,3))) == List(1,3)) assert(CollectionUtil.mergeKOrderedList(List(List(2,4), List())) == List(2,4)) assert(CollectionUtil.mergeKOrderedList(List(List(2,4), List(1,3))) == List(1,2,3,4)) assert(CollectionUtil.mergeKOrderedList(List(List(2,4), List(1,3,5))) == List(1,2,3,4,5)) assert(CollectionUtil.mergeKOrderedList(List(List(2,4,6), List(1,3))) == List(1,2,3,4,6)) assert(CollectionUtil.mergeKOrderedList(List(List(2,4,7), List(1,6), List(3,5))) == List(1,2,3,4,5,6,7)) assert(CollectionUtil.mergeKOrderedList(List(List(2,4,9), List(1,7), List(3,6), List(5,8))) == List(1,2,3,4,5,6,7,8,9)) println("test merge list passed.") } /** * 对指定 Map 按值排序 */ def sortByValue(m: Map[String,Int]): Map[String,Int] = { val sortedm = new mutable.LinkedHashMap[String,Int] m.toList.sortWith{case(kv1,kv2) => kv1._2 > kv2._2}.foreach { t => sortedm(t._1) = t._2 } return sortedm } /** * 合并两个有序列表 */ def merge(xList: List[Int], yList: List[Int]): List[Int] = { if (xList.isEmpty) { return yList } if (yList.isEmpty) { return xList } val result = ListBuffer[Int]() var xListC = xList var yListC = yList while (!xListC.isEmpty && !yListC.isEmpty ) { if (xListC.head < yListC.head) { result.append(xListC.head) xListC = xListC.tail } else { result.append(yListC.head) yListC = yListC.tail } } if (xListC.isEmpty) { result.appendAll(yListC) } if (yListC.isEmpty) { result.appendAll(xListC) } result.toList } /** * 合并k个有序列表 */ def mergeKOrderedList(klists: List[List[Int]]): List[Int] = { if (klists.isEmpty) { return List[Int]() } var nlist = klists.size if (nlist == 1) { return klists.head } var klistp = klists; val kbuf = ListBuffer[List[Int]]() while (nlist > 1) { for (i <- 0 to nlist/2-1) { kbuf.insert(i, merge(klistp(2*i), klistp(2*i+1))) if (nlist%2 == 1) { kbuf.append(klistp(nlist-1)) } } nlist = nlist - nlist/2 klistp = kbuf.toList } kbuf.toList.head } }
package zzz.study.algorithm.select; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Random; import java.util.Set; import java.util.TreeSet; public class RandomSelector { private RandomSelector() { } private static Random rand = new Random(47); /** * bigRandInt: 返回一个非常大的随机整数,该整数的二进制位数不小于 bits */ public static int bigRandInt(int bits) { if (bits >= 32 || bits <= 0) { throw new IllegalArgumentException("参数 " + bits + " 错误,必须为小于 32 的正整数!"); } int baseNum = 1 << (bits - 1); return rand.nextInt(Integer.MAX_VALUE - baseNum) + baseNum; } /** * randRange: 生成给定范围的随机整数 * @param low 范围下限 * @param high 范围上限(不包含) * @return 给定范围的随机整数 */ public static int randRange(int low, int high) { if (high <= low) { throw new IllegalArgumentException("参数 [" + low + "," + high + "] 错误,第一个参数必须小于第二个参数!"); } return bigRandInt(30) % (high-low) + low; } /** * selectMOrderedRandInts : 从指定集合中随机选择指定数目的整数,并以有序输出 * @param m 需要选取的整数数目 * @param n 指定整数集合 [0:n-1] * @return 随机选取的有序整数列表 */ public static int[] selectMOrderedRandInts(int m, int n) { checkParams(m, n); int[] result = new int[m]; int remaining = n; int selector = m; for (int k=0, i=0; i < n; i++) { if ((bigRandInt(30) % remaining) < selector) { result[k++] = i; selector--; } remaining--; } return result; } /** * selectMOrderedRandInts2 : 从指定集合中随机选择指定数目的整数,并以有序输出 * @param m 需要选取的整数数目 * @param n 指定整数集合 [0:n-1] * @return 随机选取的有序整数列表 */ public static int[] selectMOrderedRandInts2(int m, int n) { checkParams(m, n); Set<Integer> holder = new TreeSet<Integer>(); while (holder.size() < m) { holder.add(bigRandInt(30) % n); } return collectionToArray(holder); } /** * selectMOrderedRandInts3 : 从指定集合中随机选择指定数目的整数,并以有序输出 * @param m 需要选取的整数数目 * @param n 指定整数集合 [0:n-1] * @return 随机选取的有序整数列表 */ public static int[] selectMOrderedRandInts3(int m, int n) { checkParams(m, n); int[] arr = selectMDisorderedRandInts3(m, n); Arrays.sort(arr); return arr; } /** * selectMDisorderedRandInts2: 从指定整数集合中随机选择指定数目的整数,并以无序输出 * @param m 需要选取的整数数目 * @param n 指定整数集合 [0:n-1] * @return 随机选取的无序整数列表 */ public static int[] selectMDisorderedRandInts2(int m, int n) { checkParams(m, n); Set<Integer> intSet = new HashSet<Integer>(); while (intSet.size() < m) { intSet.add(bigRandInt(30) % n); } return collectionToArray(intSet); } /** * selectMDisorderedRandInts3: 从指定整数集合中随机选择指定数目的整数,并以无序输出 * @param m 需要选取的整数数目 * @param n 指定整数集合 [0:n-1] * @return 随机选取的无序整数列表 */ public static int[] selectMDisorderedRandInts3(int m, int n) { checkParams(m, n); int[] arr = new int[n]; for (int i=0; i < n; i++) { arr[i] = i; } for (int k=0; k < m; k++) { int j = randRange(k, n); int tmp = arr[k]; arr[k] = arr[j]; arr[j] = tmp; } return Arrays.copyOf(arr, m); } public static void checkParams(int m, int n) { if (m > n || m <= 0 || n <= 0 ) { throw new IllegalArgumentException("参数 [" + m + "," + n + "] 错误,必须均为正整数,且第一个参数必须小于或等于第二个参数!"); } } /** * collectionToArray : 将指定整数集合转化为整型数组列表 * @param collection 指定整数集合 * @return 要返回的整型数组列表,若给定集合为空,则返回 null */ public static int[] collectionToArray(Collection<Integer> collection) { if (collection == null || collection.size() == 0) { return null; } int[] result = new int[collection.size()]; int k = 0; for (Integer integer : collection) { result[k] = integer; k++; } return result; } /** * printArray: 打印数组的便利方法,每打印十个数换行 * @param arr 指定要打印的数组 */ public static void printArray(int[] arr) { for (int i=0; i < arr.length; i++) { System.out.printf("%d%c", arr[i], i%10==9 ? ‘\n‘ : ‘ ‘); } } }
本文原创, 转载请注明出处,谢谢! :)
混合使用 ForkJoin, Akka, Future 实现一千万个不重复整数的排序
标签:
原文地址:http://www.cnblogs.com/lovesqcc/p/5540415.html