码迷,mamicode.com
首页 > 其他好文 > 详细

Spark版本定制第13天:Driver容错

时间:2016-05-24 11:45:51      阅读:151      评论:0      收藏:0      [点我收藏+]

标签:

本期内容

1、ReceivedBlockTracker容错安全性

2、DStreamGraph和JobGenerator容错安全性

  一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。

  Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。

  从数据层面,ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息。

  从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关。

 先从ReceiverTracker角度出发

  

  如果开启WAL,则将元数据写入WAL,加入ReceivedBlockQueue

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
 try {
   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
   if (writeResult) {
     synchronized {
       getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
     }
     logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
       s"block ${receivedBlockInfo.blockStoreResult.blockId}")
   } else {
     logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
       s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
   }
   writeResult
 } catch {
   case NonFatal(e) =>
     logError(s"Error adding block $receivedBlockInfo", e)
     false
 }
}

  这里来看writeToLog方法

private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
    if (isWriteAheadLogEnabled) {
      logTrace(s"Writing record: $record")
      try {
        writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
          clock.getTimeMillis())
        true
      } catch {
        case NonFatal(e) =>
          logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
          false
      }
    } else {
      true
    }
  }

  再来看看JobScheduler为为分配batch分配block

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
      }
    } else {
      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    }
  }

  这里此writeToLog虽然每次都调用,但是方法内部还是会判断WAL是否开启。

  再进入到writeToLog内部

private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
    if (isWriteAheadLogEnabled) {
      logTrace(s"Writing record: $record")
      try {
        writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
          clock.getTimeMillis())
        true
      } catch {
        case NonFatal(e) =>
          logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
          false
      }
    } else {
      true
    }
  }

  写入完成之后,ReceiverTracker部分容错就是数据部分完成

  接下来看看从job生成的角度,也就是调度层面看看容错

  根据DStreamGraph每次根据一个固定的batchINterval定时生成Job后都要调用DoCheckpoint

  

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
    if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
      logInfo("Checkpointing graph for time " + time)
      ssc.graph.updateCheckpointData(time)
      checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
    }
  }

  在改方法中会根据更新时间每一个的checkpoint,调用updateCheckPointData方法

def updateCheckpointData(time: Time) {
    logInfo("Updating checkpoint data for time " + time)
    this.synchronized {
      outputStreams.foreach(_.updateCheckpointData(time))
    }
    logInfo("Updated checkpoint data for time " + time)
  }
  private[streaming] def updateCheckpointData(currentTime: Time) {
    logDebug("Updating checkpoint data for time " + currentTime)
    checkpointData.update(currentTime)
    dependencies.foreach(_.updateCheckpointData(currentTime))
    logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
  }

  总结来说,ReceivedBlockTracker处理数据层面,它通过WAL的方式

  而DStreamGraph和JobGenerator是从调度层面出发,通过checkpoint的方式

 

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

Spark版本定制第13天:Driver容错

标签:

原文地址:http://www.cnblogs.com/pzwxySpark/p/Spark13.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!