做个笔记,记录streaming任务执行的整个流程,下文使用的源码是master分支的代码,1.2.1版本已经发布,应该和1.2.1差别不大
1、streaming程序是从StreamingContext.start()开始的,做一个必要的参数检查然后启动 jobscheduler
StreamingContext.scala
def start(): Unit = synchronized {
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
if (state == Stopped) {
throw new SparkException("StreamingContext has already been stopped")
}
//检查是否有duration
validate()
sparkContext.setCallSite(DStream.getCreationSite())
//第1步,启动入口
scheduler.start()
//记录状态,初始化时为Initialized,启动Started,最终Stopped
state = Started
}2、JobScheduler.scala
启动listene、actor、receiver,调用JobGenerator生成任务执行的Job对象
//start之后,等待接收msg处理job
def start(): Unit = synchronized {
if (eventActor != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")
//第2步启动listenerBus,receiverTracker,jobGenerator
listenerBus.start()//StreamingListenerBus实例
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}3、JobGenerator.scala
timer的构造器里使用匿名函数初始化了回调函数(callback)
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
/** Start generation of jobs */
def start(): Unit = synchronized {
if (eventActor != null) return // generator has already been started
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent => processEvent(event) //处理不同的消息
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
//启动job
startFirstTime()
}
}
/** Starts the generator for the first time */
//启动job
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
//DStreamGraph
graph.start(startTime - graph.batchDuration)
//RecurringTimer,callback函数 longTime => eventActor ! GenerateJobs(new Time(longTime))
//第三步,启动timer
timer.start(startTime.milliseconds)
logInfo("Started JobGenerator at " + startTime)
}4、RecurringTimer.scala
启动守护线程去不断的获取流数据组成的job对象,调用JobGenerator.scala代码中RecurringTimer实例对象时的参数
/**
* Start at the given start time.
* 启动守护线程去不断的获取流数据组成的job对象,调用longTime => eventActor ! GenerateJobs(new Time(longTime)发送msg
*/
def start(startTime: Long): Long = synchronized {
nextTime = startTime
//第4步
thread.start()
logInfo("Started timer for " + name + " at time " + nextTime)
nextTime
}
private val thread = new Thread("RecurringTimer - " + name) {
//守护线程
setDaemon(true)
override def run() { loop }
}
/**
* Repeatedly call the callback every interval.
*/
private def loop() {
try {
while (!stopped) {
clock.waitTillTime(nextTime)
//第4步使用akka发送msg(GenerateJobs)
callback(nextTime)
prevTime = nextTime
nextTime += period
logDebug("Callback for " + name + " called at time " + prevTime)
}
} catch {
case e: InterruptedException =>
}
}
}5、JobGenerator.scala
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
/** Generate jobs and perform checkpoint for the given `time`. */
//流数据根据间隔时间不断的调用此函数
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
//第5步,分配received blocks为第7步提交任务做准备
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
//第6步,生成各个小任务的job----outputStream.generateJob
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val receivedBlockInfos =
jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }
//第7步提交任务
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}在第6步中代码执行到DStreamGraph.scala
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}DStream.scala
/** Method that generates a RDD for the given time */
/** DStream的核心函数,每一个继承于此的子类都需要实现此compute()函数。而根据不同的
DStream, compute()函数都需要实现其特定功能,而计算的结果则是返回计算好的RDD*/
def compute (validTime: Time): Option[RDD[T]]
/**
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* to generate their own jobs.
*/
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
//注意这里返回的Some(rdd)是 getOrCompute返回的
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
/**
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
* 此方法是最后在sparkcontext调用 runjob时得到的RDD,为stream的核心
* 每个RDD有不同的时间key ,保存在hashmap当在
*/
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
/** 对于每一次不同时间的计算,DStream会调用子类所实现的compute()函数来计算产生新的RDD */
// Set the thread-local property for call sites to this DStream‘s creation site
// such that RDDs generated by compute gets that as their creation site.
// Note that this `getOrCompute` may get called from another DStream which may have
// set its own call site. So we store its call site in a temporary variable,
// set this DStream‘s creation site, generate RDDs and then restore the previous call site.
val prevCallSite = ssc.sparkContext.getCallSite()
ssc.sparkContext.setCallSite(creationSite)
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
ssc.sparkContext.setCallSite(prevCallSite)
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}6、JobScheduler.scala
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
jobSets.put(jobSet.time, jobSet)
//第8步多线程执行jobSet
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}7、JobScheduler.scala
private class JobHandler(job: Job) extends Runnable {
def run() {
//9.1
eventActor ! JobStarted(job)
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
// since we may need to write output to an existing directory during checkpoint recovery;
// see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
//9.2
job.run()
}
//9.3
eventActor ! JobCompleted(job)
}
}本文出自 “滴水石穿” 博客,请务必保留此出处http://chengyanbin.blog.51cto.com/3900113/1618014
原文地址:http://chengyanbin.blog.51cto.com/3900113/1618014