本文主要关注ShuffledRDD的Shuffle Read是如何从其他的node上读取数据的。
上文讲到了获取如何获取的策略都在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#splitLocalRemoteBlocks中。可以见注释。
protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 // nodes, rather than blocking on reading output from one node. // 为了快速的得到数据,每次都会启动5个线程去最多5个node上取数据; // 每次请求的数据不会超过spark.reducer.maxMbInFlight(默认值为48MB) / 5。 // 这样做的原因有几个: // 1. 避免占用目标机器的过多带宽,在千兆网卡为主流的今天,带宽还是比较重要的。 // 如果一个连接将要占用48M的带宽,这个Network IO可能会成为瓶颈。 // 2. 请求数据可以平行化,这样请求数据的时间可以大大减少。请求数据的总时间就是那个请求最长的。 // 如果不是并行请求,那么总时间将是所有的请求时间之和。 // 而设置spark.reducer.maxMbInFlight,也是为了不要占用过多的内存 val targetRequestSize = math.max(maxBytesInFlight / 5, 1L) logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize) // Split local and remote blocks. Remote blocks are further split into FetchRequests of size // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { // address实际上是executor_id totalBlocks += blockInfos.size if (address == blockManagerId) { //数据在本地,那么直接走local read // Filter out zero-sized blocks localBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1) _numBlocksToFetch += localBlocksToFetch.size } else { val iterator = blockInfos.iterator var curRequestSize = 0L var curBlocks = new ArrayBuffer[(BlockId, Long)] while (iterator.hasNext) { // blockId 是org.apache.spark.storage.ShuffleBlockId, // 格式:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId val (blockId, size) = iterator.next() // Skip empty blocks if (size > 0) { //过滤掉为大小为0的文件 curBlocks += ((blockId, size)) remoteBlocksToFetch += blockId _numBlocksToFetch += 1 curRequestSize += size } else if (size < 0) { throw new BlockException(blockId, "Negative block size " + size) } if (curRequestSize >= targetRequestSize) { // 避免一次请求的数据量过大 // Add this FetchRequest remoteRequests += new FetchRequest(address, curBlocks) curBlocks = new ArrayBuffer[(BlockId, Long)] logDebug(s"Creating fetch request of $curRequestSize at $address") curRequestSize = 0 } } // Add in the final request if (!curBlocks.isEmpty) { // 将剩余的请求放到最后一个request中。 remoteRequests += new FetchRequest(address, curBlocks) } } } logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " + totalBlocks + " blocks") remoteRequests }
原文地址:http://blog.csdn.net/anzhsoft/article/details/41620329