标签:
我们知道spark可以将运行过的RDD存储到内存上, 并在需要的时候重复利用. 那么spark是怎么完成这些工作的, 本文将通过分析源码来解释RDD的重复利用过程.
在上一篇文章解释了spark的执行机制, DAGScheduler负责分解action, 在DAGScheduler.getMissingParentStages中, spark首次利用了过去的RDD, 而所使用的函数就是DAGScheduler.getCacheLocs.
1 private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
2
3 private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
4 if (!cacheLocs.contains(rdd.id)) {
5 val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
6 val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster)
7 cacheLocs(rdd.id) = blockIds.map { id =>
8 locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
9 }
10 }
11 cacheLocs(rdd.id)
12 }
DAGScheduler只在cacheLocs存储部分partiton的位置信息. 我们来看看未cache的执行逻辑, 首先生成代表每个partition的blocksIds, 然后调用BlockManager.blockIdsToBlockManagers把blocksId转换成Seq[blockManagerId], 而blockManagersId包含了partition的的位置信息(每个partition按一个block存放, block也可以存放broadcast等数据).
根据注释, 每个节点上(包括master 和 worker)都运行了BlockManager来管理所有的存储信息(包括RDD和broadcast等等), master与worker通过Akka Actor系统(可以看我的另外一篇文章来入门)交流, 即BlockManagerMasterActor 与 BlockManagerSlaveActor. 继续看BlockManager.blockIdsToBlockManagers.
1 def blockIdsToBlockManagers(
2 blockIds: Array[BlockId],
3 env: SparkEnv,
4 blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = {
5
6 // blockManagerMaster != null is used in tests
7 assert(env != null || blockManagerMaster != null)
8 val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
9 env.blockManager.getLocationBlockIds(blockIds)
10 } else {
11 blockManagerMaster.getLocations(blockIds)
12 }
13
14 val blockManagers = new HashMap[BlockId, Seq[BlockManagerId]]
15 for (i <- 0 until blockIds.length) {
16 blockManagers(blockIds(i)) = blockLocations(i)
17 }
18 blockManagers.toMap
19 }
blockManager实在SparkEnv中被创建的, SparkEnv同样运行在所有节点上, 并在创建时区分为DriverEnv, 和 executorEnv(同一个类, 但是元素不一样, 和blockManager一样), 在创建sparkEnv时, 会为driver上的blockManager创建一个blockManagerMasterActor, 为executor上的blockManager给一个blockManagerMasterActor的Ref. 上面代码使用sparkEnv.blockManager.blockManagerMaster.getLocations来求出各个blockId的BlockManagerId, 并组织成Map的形式返回.接下来来到blockManager.getLocations.
1 def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
2 askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId))
3 }
4 private def askDriverWithReply[T](message: Any): T = {
5 AkkaUtils.askWithReply(message, driverActor, AKKA_RETRY_ATTEMPTS, AKKA_RETRY_INTERVAL_MS,
6 timeout)
7 }
这段代码就是简单的将GetLocations的message发送给BlockManagerMasterActor, 并等待回复. BlockManagerMasterActor保存了所有关于存储的信息, blockManagerInfo有所有executor的存储信息, blockManagerIdByExecutor从executor到executor上的blockManagerId的映射, blockLocations保存了所有的block的所有存储位置(包含所有的partition的位置), 一下是blockManagerMasterActor的关于查询存储的位置:
1 override def receiveWithLogging = {
2 case GetLocations(blockId) =>
3 sender ! getLocations(blockId)
4 case ... =>
5 }
6
7 private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
8 if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
9 }
由于BlockManagerMasterActor保存了所有Block的位置, 所以只是简单的给出答案. 到现在可以看出所有Block的位置信息都是保存在Master节点上的. 以上是关于spark 查找persist RDD所需要的完整步骤, 可是没能覆盖整个spark存储机制, 接下来要分析一些其他的代码.
由于Block所有信息都存放在master上, 所有单单徐闻Block位置达不到和executor交互的目的, 我们分析一下RDD.unpersistRDD, 他调用sparkContext.unpersistRDD, 再继续掉用env.blockManager.master.removeRdd:
1 def removeBlock(blockId: BlockId) { 2 askDriverWithReply(RemoveBlock(blockId)) 3 } 4 5 private def askDriverWithReply[T](message: Any): T = { 6 AkkaUtils.askWithReply(message, driverActor, AKKA_RETRY_ATTEMPTS, AKKA_RETRY_INTERVAL_MS, 7 timeout) 8 }
跟上面的例子一样的, 发送RemoveBlock的消息到BlockManagerMasterActor.
1 override def receiveWithLogging = { 2 case RemoveBlock(blockId) => 3 removeBlockFromWorkers(blockId) 4 sender ! true 5 case ... => 6 } 7 8 private def removeBlockFromWorkers(blockId: BlockId) { 9 val locations = blockLocations.get(blockId) 10 if (locations != null) { 11 locations.foreach { blockManagerId: BlockManagerId => 12 val blockManager = blockManagerInfo.get(blockManagerId) 13 if (blockManager.isDefined) { 14 // Remove the block from the slave‘s BlockManager. 15 // Doesn‘t actually wait for a confirmation and the message might get lost. 16 // If message loss becomes frequent, we should add retry logic here. 17 blockManager.get.slaveActor.ask(RemoveBlock(blockId))(akkaTimeout) 18 } 19 } 20 } 21 }
这段代码先通过blockLocations求出block的所有位置的BlockManagerId, 然后通过blockManagerId求出blockManagerInfo从而给出executor上的BlockManagerSlaveActor, 然后发送RemoveBlock的消息.
1 override def receiveWithLogging = { 2 case RemoveBlock(blockId) => 3 doAsync[Boolean]("removing block " + blockId, sender) { 4 blockManager.removeBlock(blockId) 5 true 6 } 7 case ... => 8 }
BlockManagerSlaveActor收到消息后调用blockManager.removeBlock.
1 def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { 2 logInfo(s"Removing block $blockId") 3 val info = blockInfo.get(blockId).orNull 4 if (info != null) { 5 info.synchronized { 6 // Removals are idempotent in disk store and memory store. At worst, we get a warning. 7 val removedFromMemory = memoryStore.remove(blockId) 8 val removedFromDisk = diskStore.remove(blockId) 9 val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false 10 if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { 11 logWarning(s"Block $blockId could not be removed as it was not found in either " + 12 "the disk, memory, or tachyon store") 13 } 14 blockInfo.remove(blockId) 15 if (tellMaster && info.tellMaster) { 16 val status = getCurrentBlockStatus(blockId, info) 17 reportBlockStatus(blockId, info, status) 18 } 19 } 20 } else { 21 // The block has already been removed; do nothing. 22 logWarning(s"Asked to remove block $blockId, which does not exist") 23 } 24 }
这段代码调用3个Store的remove函数来完成任务, 并按要求反馈结果. 其中在memoryStore中的内存以off-heap方式存储, 不受java GC影响.整个spark的存储管理机制就到这里了.
标签:
原文地址:http://www.cnblogs.com/OddLearner/p/4181300.html