码迷,mamicode.com
首页 > 其他好文 > 详细

Spark版本定制第6天:Job动态生成和深度思考

时间:2016-05-22 13:50:09      阅读:144      评论:0      收藏:0      [点我收藏+]

标签:

本期内容:

1 Job动态生成

2 深度思考

  一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。

  Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。

  

  在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler

private[streaming] 
class JobScheduler(val ssc: StreamingContext) extends Logging 

  在JobScheduler中有两个重要的成员:JobGenerator和ReceiverTracker,而在JobGenerator中有两个至关重要的成员就是RecurringTimer和EventLoop。

  当JobGenerator启动的时候会调用startFirstTime方法,当然如果不是第一次启动就会restart。

  if (ssc.isCheckpointPresent) { 
    restart()      
  } else { 
    startFirstTime() 
  }

  在 这个方法中:会启动DStream和定时器

private def startFirstTime() { 
  val startTime = new Time(timer.getStartTime()) 
  graph.start(startTime - graph.batchDuration) 
  timer.start(startTime.milliseconds) 
  logInfo("Started JobGenerator at " + startTime) 
} 

  定时器负责在每一个时间间隔batchInterval,在EventLoop循环中发送一次消息。在EventLoop接收到消息后就会启动run方法在消息队列总来执行

override def run(): Unit = { 
  try { 
    while (!stopped.get) { 
      val event = eventQueue.take() 
      try { 
        onReceive(event) 
      } catch { 
        case NonFatal(e) => { 
          try { 
            onError(e) 
          } catch { 
            case NonFatal(e) => logError("Unexpected error in " + name, e) 
          } 
        } 
      } 
    } 
  } catch { 
    case ie: InterruptedException => // exit even if eventQueue is not empty 
    case NonFatal(e) => logError("Unexpected error in " + name, e) 
  } 
} 

  此时在消息中会执行generateJobs方法来不断地生成job,至此,job完成了生成的过程。

private def generateJobs(time: Time) { 
  // Set the SparkEnv in this thread, so that job generation code can access the environment 
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager 
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. 
  SparkEnv.set(ssc.env) 
  Try { 
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 
    graph.generateJobs(time) // generate jobs using allocated block 
  } match { 
    case Success(jobs) => 
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
    case Failure(e) => 
      jobScheduler.reportError("Error generating jobs for time " + time, e) 
  } 
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 
} 

  

在job生成过程中主要包含了以下4个步骤

  1. 要求ReceiverTracker将目前已收到的数据进行一次allocate,即将上次batch切分后的数据切分到到本次新的batch里

  2. 要求DStreamGraph复制出一套新的 RDD DAG 的实例。整个DStreamGraph.generateJobs(time)遍历结束的返回值是Seq[Job]

  3. 将第2步生成的本 batch 的 RDD DAG,和第1步获取到的 meta 信息,一同提交给JobScheduler异步执行这里我们提交的是将 (a) time (b) Seq[job] (c) 块数据的meta信息。这三者包装为一个JobSet,然后调用JobScheduler.submitJobSet(JobSet)提交给JobScheduler。这里的向JobScheduler提交过程与JobScheduler接下来在jobExecutor里执行过程是异步分离的,因此本步将非常快即可返回。

  4. 只要提交结束(不管是否已开始异步执行),就马上对整个系统的当前运行状态做一个checkpoint这里做checkpoint也只是异步提交一个DoCheckpoint消息请求,不用等 checkpoint 真正写完成即可返回这里也简单描述一下 checkpoint 包含的内容,包括已经提交了的、但尚未运行结束的JobSet等实际运行时信息。

 

 

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

Spark版本定制第6天:Job动态生成和深度思考

标签:

原文地址:http://www.cnblogs.com/pzwxySpark/p/Spark6.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!