标签:
本期内容:
1,JobScheduler内幕实现
2,JobScheduler深度思考
摘要:JobScheduler是Spark Streaming整个调度的核心,其地位相当于Spark Core上的调度中心中的DAGScheduler!
问:JobScheduler是在什么地方生成的?
答:JobScheduler是在StreamingContext实例化时产生的,从StreamingContext的源码第183行中可以看出:
private[streaming] val scheduler = new JobScheduler(this)
问:Spark Streaming为啥要设置两条线程?
答:setMaster指定的两条线程是指程序运行的时候至少需要两条线程。一条线程用于接收数据,需要不断的循环。另一条是处理线程,是我们自己指定的线程数用于作业处理。如StreamingContext的start()方法所示:
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
//Spark Streaming内部启动的线程,用于整个作业的调度
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
进入JobScheduler源码:
/**
JobScheduler负责逻辑层面的Job,并将其物理级别的运行在Spark之上 * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a thread pool.
*/ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { //通过JobSet集合,不断地存放接收到的Job private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
//设置并行度,默认为1,想要修改作业运行的并行度在spark-conf或者应用程序中修改此值就中
为什么要修改并发度呢?
答:有时候应用程序中有多个输出,会导致多个job的执行,都是在一个batchDurations里面,job之间执行无需互相等待,所以可以通过设置此值并发执行!
不同的Batch,线程池中有很多的线程,也可以并发运行! private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
//将逻辑级别的Job转化为物理级别的job就是通过newDaemonFixedThreadPool线程实现的 private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
//实例化JobGenerator private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus()
//下面三个是说在JobScheduler启动时实例化 // These two are created only when scheduler starts. // eventLoop not being null means the scheduler has been started and not stopped var receiverTracker: ReceiverTracker = null // A tracker to track all the input stream information as well as processed record number var inputInfoTracker: InputInfoTracker = null private var eventLoop: EventLoop[JobSchedulerEvent] = null def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
下面从应用程序的输出方法print()入手,反推Job的生成过程:
1.点击应用程序中的print()方法后,跳入DStream的print():
/**
* Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(): Unit = ssc.withScope {
print(10)
}
2.再次点击上面红线标记的print()方法:
/**
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
从图中红色标记的代码可以得出:SparkStreaming最终执行的时候还是对RDD进行各种逻辑级别的操作!
3.再次点击图上的foreachRDD进入foreachRDD方法:
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* ‘this‘ DStream will be registered as an output stream and therefore materialized.
* @param foreachFunc foreachRDD function
* @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
* in the `foreachFunc` to be displayed in the UI. If `false`, then
* only the scopes and callsites of `foreachRDD` will override those
* of the RDDs on the display.
*/
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
4.点击上图的ForEachDStream进入ForEachDStream类并找到了generateJob方法:
/**
* An internal DStream used to represent output operations like DStream.foreachRDD.
* @param parent Parent DStream
* @param foreachFunc Function to apply on each RDD generated by the parent DStream
* @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
* by `foreachFunc` will be displayed in the UI; only the scope and
* callsite of `DStream.foreachRDD` will be displayed.
*/
private[streaming]
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Unit]] = None
//根据时间间隔不断的产生Job
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
//基于时间生成的RDD,由于是输出,所以是最后一个RDD,接下来我们只要找出哪儿调用ForEachDStream的generateJob方法,就能知道job最终的生成
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
}
5.上一讲中我们得出了如下的流程:
streamingcontext.start-->jobscheduler.start-->receiverTracker.start()-->JobGenterator.start()-->EventLoop-->processEvent()-->generateJobs()-->jobScheduler.receiverTracker.allocateBlocksToBatch(time)-->graph.generateJobs(time)
其中最后的graph.generateJobs是DSTreamGraph的方法,进入之:
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
//此时的outputStream就是forEachDStream
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
private val outputStreams = new ArrayBuffer[DStream[_]]()
通过查看DStream的子类继承结构和上面的ForEachDStream的generateJob方法,得出DStream的子类中只有ForEachDStream override了DStream的generateJob!
最终得出结论:
真正Job的生成是通过ForeachDStream的generateJob来生成的,此时的job是逻辑级别的,真正被物理级别的调用是在JobGenerator中generateJob方法中:
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
进入jobScheduler.submitJobSet方法:
//将逻辑级别的Job转化为物理级别的job就是通过newDaemonFixedThreadPool线程实现的
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
至此,整个job的生成、执行就非常清晰了,最后总结如下:
从上一讲中,我们得知JobScheduler包含两个核心组件JobGenerator和ReceiverTracker,它们分别负责Job的生成和源数据的接收,
ReceiverTracker启动后会导致运行在Executor端的Receiver启动并且接收数据,ReceiverTracker会记录Receiver接收到的数据meta信息,
JobGenerator的启动导致每隔BatchDuration,就调用DStreamGraph生成RDD Graph,并生成Job,
JobScheduler中的线程池来提交封装的JobSet对象(时间值,Job,数据源的meta)。Job中封装了业务逻辑,导致最后一个RDD的action被触发,
被DAGScheduler真正调度在Spark集群上执行该Job。
特别感谢王家林老师的独具一格的讲解:
王家林老师名片:
中国Spark第一人
新浪微博:http://weibo.com/ilovepains
微信公众号:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
QQ:1740415547
YY课堂:每天20:00现场授课频道68917580
Spark版本定制七:Spark Streaming源码解读之JobScheduler内幕实现和深度思考
标签:
原文地址:http://www.cnblogs.com/game-bigdata/p/5516110.html