1.1 示例代码DEMO:
实时计算的WorldCount:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be ‘local[n]‘ with n > 1")
System.exit(1)
}
// Create the context with a 1 second batch size
val ssc = new StreamingContext("local[4]", "NetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by ‘nc‘)
val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
以上代码能实时的接受和处理网络传输过来的文本,并计算单词个数。
1.2 下面基于以上代码DEMO分析记录下Spark streaming的启动过程的源码,以及字节流处理过程的代源码。
以上代码DEMO涉及的启动流程时序如下: 简述:
(1)从Streaming WorldCount的编程示例可看出Spark Streaming和Spark的编程API很像。Spark Streaming是构建Spark程序框架的基础上的。从时序图的StreamingContext和SparkContext的交互可以看出来StreamingContext里面封装了SparkContext。部分关键代码如下:
//包权限,包内可见,包外不可见
//StreamingContext 里面包装的还是一个SparkContext
class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
) extends Logging {
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
private[streaming] def createNewSparkContext(
master: String,
appName: String,
sparkHome: String,
jars: Seq[String],
environment: Map[String, String]
): SparkContext = {
val conf = SparkContext.updatedConf(
new SparkConf(), master, appName, sparkHome, jars, environment)
createNewSparkContext(conf)
}
...
上面是StreamingContext的部分构造函数的代码,可见其中对SparkContext的封装以及CreateNewSparkContext对于SparkContext的构造过程。从代码角度看展示了StreamingContext是针对于SparkContext的封装,结合DEMO从框架启动阶段来讲,当完成StreamingContext的实例化时候也就是完成了程序的配置初始化过程。
(2)结合Demo,StreamingContext实例化之后则是从StreamingContext获取了一个SocketTextStream,代码如下:
val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
其中args(1),args(2)分别对应于后面所说的Receiver监听的Ip和端口号,随后的参数从英文名可以看出来定义的是存储级别(这里定义仅存储在内存)。
DEMO中的代码简单解释下,然后还是结合时序图和源码来看下:
StreamingContext.SocketTextStream方法如下:
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
方法返回的是ReceiverInputDStreaming。这里引出了DStream的概念(拓展总结一下),根据DStream的代码注释,DStream的解释如下:Dstream本质就是离散化的stream,将stream离散化成一组RDD的list,所以基本的操作仍然是以RDD为基础。
InputDStream 涉及的类继承结构:
上述类继承结构图展示了DInputStream,根据类的名字可以大致看出来DStream的作用,比如FileInputDStream就是处理文件流的DStream,kalfkaInputDSteam就是处理kalfka消息的DStream;DStream本身封装了按时间片离散化了的Stream(数据流);看如下DStream的代码片段:
abstract class DStream[T: ClassTag] (
@transient private[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
...
如代码所示,generatedRDDS是DStream的一个成员变量HashMap,其中key是时间片段,value是RDD。从DStream持有的这个成员变量就可以看出来DStream的本质,就是按照时间片存储了一系列的数据流。
/**
* Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
* method that should not be called directly.
*/
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
// If an RDD was already generated and is being reused, then
// probably all RDDs in this DStream will be reused and hence should be cached
case Some(oldRDD) => Some(oldRDD)
// if RDD was not generated, and if the time is valid
// (based on sliding time of this DStream), then generate the RDD
case None => {
if (isTimeValid(time)) {
compute(time) match {
case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logInfo("Persisting RDD " + newRDD.id + " for time " +
time + " to " + storageLevel + " at time " + time)
}
if (checkpointDuration != null &&
(time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo("Marking RDD " + newRDD.id + " for time " + time +
" for checkpointing at time " + time)
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
case None =>
None
}
} else {
None
}
}
}
以上这段代码则是按时间片构建和检索RDD的主要函数,由于逻辑比较简单就不多说!
DStream不在深入,还是回到正题,上面的DEMO代码从SteamingContext拿到了一个ReceiverInputDStream,ReceiverInputDStream封装了SocketReceiver对象,用来从网络中读取数据流,其实代码逻辑跟下去,其实就是拿到了一个SocketInputStream,然后按时间片将流离散化的存储在DStream定义的HashMap中。这段涉及的代码如下:
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
以上就是客户端DEMO(Driver)获取socketTextStream的过程,总结一下就是通过StreamingContext提供的API获取了一个SocketInputStreamInputDStream。至此返回到DEMO示例,除了WordCount的部分处理逻辑外,就是StreamingContext的启动逻辑了。通过Spark的只是可以知道RDD只是构建了代码的逻辑以及依赖关系,但并没有真正的执行,同理这里也是同样的道理。下面就是关键的StreamingContext的启动逻辑了。下面继续总结。
(3)回到时序图,我们可以看出StreamingContext的start方法,直接或间接依次触发了JobScheduler、StreamingListenerBus、ReceiverTracker、JobGenerator的方法。下面结合源码看一下:
StreamingContext调用了核心的JobScheduler的start方法如下:
def start(): Unit = synchronized {
if (eventActor != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")
/**
* 启动listenerBus
*/
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
//启动receiverTracker
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
上面代码简单总结一下:首先生成了一个eventActor对象(类型是Akka Actor; Akka的简单定义:http://baike.baidu.com/view/5912486.htm?fr=aladdin,具体可以深入学下Scala)。eventActor的事件处理逻辑里面调用了processEvent方法,源码如下:
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
case JobStarted(job) => handleJobStart(job)
case JobCompleted(job) => handleJobCompletion(job)
case ErrorReported(m, e) => handleError(m, e)
}
} catch {
case e: Throwable =>
reportError("Error in job scheduler", e)
}
}
//这个是间接调用 捕获job的启动事件
private def handleJobStart(job: Job) {
val jobSet = jobSets.get(job.time)
if (!jobSet.hasStarted) {
listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
}
jobSet.handleJobStart(job)
logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
SparkEnv.set(ssc.env)
}
//捕获job的完成事件
private def handleJobCompletion(job: Job) {
job.result match {
case Success(_) =>
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
case Failure(e) =>
reportError("Error running job " + job, e)
}
}
//捕获job的ErrorReport事件
private def handleError(msg: String, e: Throwable) {
logError(msg, e)
ssc.waiter.notifyError(e)
}
至此可以看到JobScheduler作为Job的调度器,所通过Akka框架的eventActor对象可以实时监听到Job的启动,停止以及错误等信息。并且通过以上列出的几个Handle*方法可以看出来在处理完开始结束时间后,将事件信息做了简单的封装后传输到了ListenerBus,让注册在监听器总线上的监听器都能针对Job的状态变化及时作出反应。这个ListenerBus的设计还是十分巧妙的。
EventActor启动完毕后就启动了StreamingListenerBus对象。涉及的ListenerBus.start的核心源码如下:
//启动函数
def start() {
listenerThread.start()
}
//启动一条Daemon线程
val listenerThread = new Thread("StreamingListenerBus") {
setDaemon(true)
override def run() {
while (true) {
val event = eventQueue.take
event match {
case receiverStarted: StreamingListenerReceiverStarted =>
listeners.foreach(_.onReceiverStarted(receiverStarted))
case receiverError: StreamingListenerReceiverError =>
listeners.foreach(_.onReceiverError(receiverError))
case receiverStopped: StreamingListenerReceiverStopped =>
listeners.foreach(_.onReceiverStopped(receiverStopped))
case batchSubmitted: StreamingListenerBatchSubmitted =>
listeners.foreach(_.onBatchSubmitted(batchSubmitted))
case batchStarted: StreamingListenerBatchStarted =>
listeners.foreach(_.onBatchStarted(batchStarted))
case batchCompleted: StreamingListenerBatchCompleted =>
listeners.foreach(_.onBatchCompleted(batchCompleted))
case StreamingListenerShutdown =>
// Get out of the while loop and shutdown the daemon thread
return
case _ =>
}
}
}
通过以上代码可以看出监听器总线真实代码如其名,其start方法启动了一条Deamon线程,来监听各种事件,然后将时间捕捉后发送给注册在总线上的监听器,其设计的巧妙可见一斑。下面在没忍住贴上ListenerBus完整的源码:
/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
private[spark] class StreamingListenerBus() extends Logging {
private val listeners = new ArrayBuffer[StreamingListener]()
with SynchronizedBuffer[StreamingListener]
/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
* an OOM exception) if it‘s perpetually being added to more quickly than it‘s being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
val listenerThread = new Thread("StreamingListenerBus") {
setDaemon(true)
override def run() {
while (true) {
val event = eventQueue.take
event match {
case receiverStarted: StreamingListenerReceiverStarted =>
listeners.foreach(_.onReceiverStarted(receiverStarted))
case receiverError: StreamingListenerReceiverError =>
listeners.foreach(_.onReceiverError(receiverError))
case receiverStopped: StreamingListenerReceiverStopped =>
listeners.foreach(_.onReceiverStopped(receiverStopped))
case batchSubmitted: StreamingListenerBatchSubmitted =>
listeners.foreach(_.onBatchSubmitted(batchSubmitted))
case batchStarted: StreamingListenerBatchStarted =>
listeners.foreach(_.onBatchStarted(batchStarted))
case batchCompleted: StreamingListenerBatchCompleted =>
listeners.foreach(_.onBatchCompleted(batchCompleted))
case StreamingListenerShutdown =>
// Get out of the while loop and shutdown the daemon thread
return
case _ =>
}
}
}
}
def start() {
listenerThread.start()
}
def addListener(listener: StreamingListener) {
listeners += listener
}
def post(event: StreamingListenerEvent) {
val eventAdded = eventQueue.offer(event)
if (!eventAdded && !queueFullErrorMessageLogged) {
logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
"This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
"rate at which events are being started by the scheduler.")
queueFullErrorMessageLogged = true
}
}
/**
* Waits until there are no more events in the queue, or until the specified time has elapsed.
* Used for testing only. Returns true if the queue has emptied and false is the specified time
* elapsed before the queue emptied.
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!eventQueue.isEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
/* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
* add overhead in the general case. */
Thread.sleep(10)
}
true
}
def stop(): Unit = post(StreamingListenerShutdown)
}
通过这个监听器总线的完整源码,可以看到里面一个监听器数组,以及一个事件的阻塞队列,以及事件处理程序,也就是将事件通知到注册上来的每个监听器。这段代码这是看着就舒服。是一个很好的代码模式呀,赞一个!
陶醉至此继续看ReceiverTracker(ReceiverTracker是Receiver监管程序)的启动过程,首先看下ReceiverTracker的start方法:
/** Start the actor and receiver execution thread. */
def start() = synchronized {
if (actor != null) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
"ReceiverTracker")
receiverExecutor.start()
logInfo("ReceiverTracker started")
}
}
解读一下首先还是new了一个Akka的Actor对象,肯定是用来处理一些事件呀,看代码:
/** Actor to receive messages from the receivers. */
private class ReceiverTrackerActor extends Actor {
def receive = {
case RegisterReceiver(streamId, typ, host, receiverActor) =>
registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
case AddBlock(receivedBlockInfo) =>
addBlocks(receivedBlockInfo)
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
case DeregisterReceiver(streamId, message, error) =>
deregisterReceiver(streamId, message, error)
sender ! true
}
}
这个Actor主要是监听从Receiver发送过来的一些消息,包括Receiver的注册、注销,以及接受数据流的存储AddBlock。再次可见JobTracker真是统揽全局的组件。
ReceiverTracker start方法初始化监听Receiver的Actor后,下面的 receiverExecutor.start()
方法的调用才是真正的启动Receiver的核心逻辑。看代码:
从这个代码片段可以看出receiverExecutor是ReceiverLaucher的实例。其Start逻辑如下:
def start() {
thread.start()
}
/** This thread class runs all the receivers on the cluster. */
class ReceiverLauncher {
@transient val env = ssc.env
@transient val thread = new Thread() {
override def run() {
try {
SparkEnv.set(env)
//启动Receiver
startReceivers()
} catch {
case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")
}
}
}
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def startReceivers() {
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
ssc.sparkContext.runJob(tempRDD, startReceiver)
logInfo("All of the receivers have been terminated")
}
可以ReceiverLauncher的start方法同样启动一条线程,异步的从InputStreams获取所有的Receiver,然后将Receiver封装成ReceiverSuperior逐个启动。当然这么这个是笼统的总结,从细节上再看下:
上面方法中是从receiverInputStreams获取了所有的Receiver,ReceiverInputStreams其实是这个样子的。
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
def getReceiverInputStreams() = this.synchronized {
inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]])
.map(_.asInstanceOf[ReceiverInputDStream[_]])
.toArray
}
一个InputDStream数组经过下面函数处理后得到的数组,主要是校验一下是否是ReceiverInputDStream类型。
从封装在InputDStream中拿到所有的Receiver结合后,然后将Receiver封装成RDD分发到多个work结点上(可见Spark Streaming的巧妙,将Receiver像RDD一样分发)代码如下:
//将Receiver封装成RDD并分发到work结点上。
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
//在worker结点上循环遍历并将Receiver封装成ReceiverSupervisor然后逐个启动。
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
}
这里有个细节是每个Receiver其实都是封装成了ReceiverSupervisor,然后才启动的。ReceiverSupervisor其实是Receiver的监管程序,其可以用来处理Receiver启动之外的一些事件逻辑,至此补充一张ReceiverTracker以及ReceverSupervisor相关的时序图:
由于Receiver都是封装在ReceiverSupervisor里的,那就看一下ReceiverSupervisor的start方法。
/** Start the supervisor */
def start() {
onStart()
startReceiver()
}
override protected def onStart() {
blockGenerator.start()
}
/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
logInfo("Starting receiver")
receiver.onStart()
logInfo("Called receiver onStart")
onReceiverStart()
receiverState = Started
} catch {
case t: Throwable =>
stop("Error starting receiver " + streamId, Some(t))
}
}
/**
* 发送RegisterServer 消息给Driver报告自己启动了。
*/
override protected def onReceiverStart() {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
val future = trackerActor.ask(msg)(askTimeout)
Await.result(future, askTimeout)
}
结合时序图以及源码,可以看出ReceiverSupervisor的start方法触发了BlockGenerator的start方法以及Receiver的onstart方法。那么一下补充一张有关BlockGenerator的相关调用时序以及Receiver的类继承结构图。
SocketReceiver接受并保存数据的处理流程。
Receiver的类继承结构图:
根据以上Receiver的类结构图,可以看出来Receiver的继承堆栈结构,我们只看下和本次Demo相关的onstart方法(因为ReceiverSupervisor调用了Recevier的onstart方法,并且这里涉及的Receiver是SocketReceiver)。
private[streaming]
class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
logInfo("Stopped receiving")
restart("Retrying connecting to " + host + ":" + port)
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
restart("Error receiving data", t)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
}
从以上代码可以看出SocketReceiver的onStart方法也是启动了一条后台线程,循环的监听Socket的另一端发送过来的字节流,然后调用store方法保存起来,供后续的Action进行处理。那就在看下Store方法:
/**
* Store a single item of received data to Spark‘s memory.
* These single items will be aggregated together into data blocks before
* being pushed into Spark‘s memory.
*/
def store(dataItem: T) {
executor.pushSingle(dataItem)
}
结合上端代码和并结合上面“socketReceiver接受并保存数据”的时序图,即可追溯到SocketReceiver接受数据并有BlockManager报错的过程。其中ReceiverSupervisor重载了BlockManager的“+”号方法,同时也利用了阻塞队列的机制来保存数据。最后通过BlockManager将接受到的流数据存储起来。以上过程就不做详细分解和结合时序图还是比较容易理解的。
(4)最后单拿出来再看下JobGenerator的start方法:
/** Start generation of jobs */
def start(): Unit = synchronized {
if (eventActor != null) return // generator has already been started
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent => processEvent(event)
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
/** Starts the generator for the first time */
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started JobGenerator at " + startTime)
}
/** Restarts the generator based on the information in checkpoint */
private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
val batchDuration = ssc.graph.batchDuration
// Batches when the master was down, that is,
// between the checkpoint and current restart time
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration)
logInfo("Batches during down time (" + downTimes.size + " batches): "
+ downTimes.mkString(", "))
// Batches that were unprocessed before failure
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
pendingTimes.mkString(", "))
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
)
// Restart the timer
timer.start(restartTime.milliseconds)
logInfo("Restarted JobGenerator at " + restartTime)
}
以上代码则是设计了Job的生成以及启动过程,具体信息暂时做个checkPoint有待下次分解。
原文地址:http://blog.csdn.net/lantian0802/article/details/38784261