码迷,mamicode.com
首页 > 其他好文 > 详细

版本定制第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

时间:2016-05-13 02:55:25      阅读:154      评论:0      收藏:0      [点我收藏+]

标签:

本期内容:
1、在线动态计算分类最热门商品案例回顾与演示
2、基于案例贯通Spark Streaming的运行源码
第一部分案例:
package com.dt.spark.sparkstreaming

import com.robinspark.utils.ConnectionPool
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerTypeStringTypeStructFieldStructType}
import org.apache.spark.streaming.{SecondsStreamingContext}

/**
  * 使用Spark Streaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机、电视这个类别
  下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义;
  *
  * @author DT大数据梦工厂
  新浪微博:http://weibo.com/ilovepains/
  *
  *
  *   实现技术:Spark Streaming+Spark SQL,之所以Spark Streaming能够使用MLsqlgraphx等功能是因为有foreachRDDTransform
  * 等接口,这些接口中其实是基于RDD进行操作,所以以RDD为基石,就可以直接使用Spark其它所有的功能,就像直接调用API一样简单。
  *  假设说这里的数据的格式:user item category,例如Rocky Samsung Android
  */
object OnlineTheTop3ItemForEachCategory2DB {
  def main(args: Array[String]){
    /**
      * 1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
      例如说通过setMaster来设置程序要链接的Spark集群的MasterURL,如果设置
      local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
      只有1G的内存)的初学者       *
      */
    val conf = new SparkConf() //创建SparkConf对象
    conf.setAppName("OnlineTheTop3ItemForEachCategory2DB"//设置应用程序的名称,在程序运行的监控界面可以看到名称
    conf.setMaster("spark://Master:7077"//此时,程序在Spark集群
    //conf.setMaster("local[2]")
    //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
    val ssc = new StreamingContext(confSeconds(5))

    ssc.checkpoint("/root/Documents/SparkApps/checkpoint")

    val userClickLogsDStream = ssc.socketTextStream("Master"9999)

    val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>
        (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1)1))

//    val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2: Int) => v1 + v2,
//      (v1:Int, v2: Int) => v1 - v2, Seconds(60), Seconds(20))

    val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
      _-_Seconds(60)Seconds(20))

    categoryUserClickLogsDStream.foreachRDD { rdd => {
      if (rdd.isEmpty()) {
        println("No data inputted!!!")
      } else {
        val categoryItemRow = rdd.map(reducedItem => {
          val category = reducedItem._1.split("_")(0)
          val item = reducedItem._1.split("_")(1)
          val click_count = reducedItem._2
          Row(categoryitemclick_count)
        })

        val structType = StructType(Array(
          StructField("category"StringTypetrue),
          StructField("item"StringTypetrue),
          StructField("click_count"IntegerTypetrue)
        ))

        val hiveContext = new HiveContext(rdd.context)
        val categoryItemDF = hiveContext.createDataFrame(categoryItemRowstructType)

        categoryItemDF.registerTempTable("categoryItemTable")

        val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +
          " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
          " WHERE rank <= 3")
        reseltDataFram.show()

        val resultRowRDD = reseltDataFram.rdd

        resultRowRDD.foreachPartition { partitionOfRecords => {

          if (partitionOfRecords.isEmpty){
            println("This RDD is not null but partition is null")
          } else {
            // ConnectionPool is a static, lazily initialized pool of connections
            val connection = ConnectionPool.getConnection()
            partitionOfRecords.foreach(record => {
              val sql = "insert into categorytop3(category,item,client_count) values(‘" + record.getAs("category") + "‘,‘" +
                record.getAs("item") + "‘," + record.getAs("click_count") + ")"
              val stmt = connection.createStatement();
              stmt.executeUpdate(sql);

            })
            ConnectionPool.returnConnection(connection) // return to the pool for future reuse

          }
        }
        }
      }
    }
    }
    /**
      * StreamingContext调用start方法的内部其实是会启动JobSchedulerStart方法,进行消息循环,在JobScheduler
      * start内部会构造JobGeneratorReceiverTacker,并且调用JobGeneratorReceiverTackerstart方法:
      *   1JobGenerator启动后会不断的根据batchDuration生成一个个的Job
      *   2ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到
      *   数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker
      *   内部会通过ReceivedBlockTracker来管理接受到的元数据信息
      每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD
      * DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个
      单独的线程来提交Job到集群运行(其实是在线程中基于RDDAction触发真正的作业的运行),为什么使用线程池呢?
      *   1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;
      *   2,有可能设置了JobFAIR公平调度的方式,这个时候也需要多线程的支持;
      *
      */
    ssc.start()
    ssc.awaitTermination()
  }
}
第二部分源码解析:

1、构建StreamingContext时传递SparkConf参数(或者自己Configuration)在内部创建SparkContext
def this(conf: SparkConfbatchDuration: Duration) = {
  this(StreamingContext.createNewSparkContext(conf)nullbatchDuration)
}

2、事实说明SparkStreaming就是SparkCore上的一个应用程序
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
  new SparkContext(conf)
}

3、创建Socket输入流
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostnameportSocketReceiver.bytesToLinesstorageLevel)
}

4、创建SocketInputDStream
def socketStream[T: ClassTag](
    hostname: String,
    port: Int,
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](thishostnameportconverterstorageLevel)
}

5、SocketInputDstream继承ReceiverInputDStream,通过构建Receiver来接收数据
private[streaming]
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(hostportbytesToObjectsstorageLevel)
  }
}

5.1、ReceiverInputDStream
abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
  extends InputDStream[T](ssc_) {

abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
  extends DStream[T](ssc_) {

5.2、DStream
/*
* DStreams internally is characterized by a few basic properties:
*  - A list of other DStreams that the DStream depends on
*  - A time interval at which the DStream generates an RDD
*  - A function that is used to generate an RDD after each time interval
*/
     1)、依赖于其他DStream
     2)、什么时候依据DStream,依赖关系的模板,构成RDD之间的依赖
     3)、基于DStream它有一个Function,Function 基于Batch Interval(time Interval)生成RDD,这个和定时器有关系

abstract class DStream[T: ClassTag] (
    @transient private[streaming] var ssc: StreamingContext
extends Serializable with Logging {

6、SocketReceiver对象在onStart中创建Thread启动run方法调用执行receive接收数据。
  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

7、创建一个Socket connection连接接收数据
  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(hostport)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }

8、总体流程:在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:
     1)、JobGenerator启动后会不断的根据batchDuration生成一个个的Job
     2)、ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),
在Receiver收到 数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,
在ReceiverTracker 内部会通过ReceivedBlockTracker来管理接受到的元数据信息 每个BatchInterval会产生一个具体的Job(这里的Job主要是封装了业务逻辑例如上面实例中的代码),其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD 的DAG而已,
从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个 单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),
     为什么使用线程池呢?
          a)、作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;
          b)、有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
8.1、StreamingContext.start
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
  sparkContext.setCallSite(startSite.get)
  sparkContext.clearJobGroup()
  sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL"false")
  scheduler.start()
}

补充:线程本地存储,线程ThreadLocal每个线程有自己的私有属性,设置线程的私有属性不会影响当前线程或其他线程
9、JobScheduler.start 创建EventLoop消息线程并启动
def start(): Unit = synchronized {
  if (eventLoop != nullreturn // scheduler has already been started

  logDebug("Starting JobScheduler")
  eventLoop new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler"e)
  }
  eventLoop.start()

9.1、EventLoop中创建Thread线程接收和发送消息,调用JobScheduler中的processEvent方法
private[spark] abstract class EventLoop[E](name: Stringextends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val stopped new AtomicBoolean(false)

  private val eventThread new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {

9.2、会接受不同的任务,JobScheduler是整个Job的调度器,它本身用了一个线程循环,去监听不同的Job启动、Job完成、Job失败等任务(消息驱动系统)
private def processEvent(event: JobSchedulerEvent) {
  try {
    event match {
      case JobStarted(jobstartTime) => handleJobStart(jobstartTime)
      case JobCompleted(jobcompletedTime) => handleJobCompletion(jobcompletedTime)
      case ErrorReported(me) => handleError(me)
    }
  } catch {

10、JobScheduler.start
// attach rate controllers of input streams to receive batch completion updates
for {
  inputDStream <- ssc.graph.getInputStreams
  rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)

10.1、多个InputStream
inputDStream <- ssc.graph.getInputStreams

10.2、RateController控制输入的速度
// Keep track of the freshest rate for this stream using the rateEstimator
protected[streaming] val rateController: Option[RateController] = None

11、JobScheduler.start
listenerBus.start(ssc.sparkContext)
receiverTracker new ReceiverTracker(ssc)
inputInfoTracker new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()

11.1、StreamingListenerBus
override def onPostEvent(listener: StreamingListenerevent: StreamingListenerEvent): Unit = {
  event match {
    case receiverStarted: StreamingListenerReceiverStarted =>
      listener.onReceiverStarted(receiverStarted)
    case receiverError: StreamingListenerReceiverError =>
      listener.onReceiverError(receiverError)
    case receiverStopped: StreamingListenerReceiverStopped =>
      listener.onReceiverStopped(receiverStopped)
    case batchSubmitted: StreamingListenerBatchSubmitted =>
      listener.onBatchSubmitted(batchSubmitted)
    case batchStarted: StreamingListenerBatchStarted =>
      listener.onBatchStarted(batchStarted)
    case batchCompleted: StreamingListenerBatchCompleted =>
      listener.onBatchCompleted(batchCompleted)
    case outputOperationStarted: StreamingListenerOutputOperationStarted =>
      listener.onOutputOperationStarted(outputOperationStarted)
    case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
      listener.onOutputOperationCompleted(outputOperationCompleted)
    case _ =>
  }
}

11.2、receiverTracker.start(),ReceiveTracker是通过发Job的方式到集群的Executor上启动Receiver
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
  if (isTrackerStarted) {
    throw new SparkException("ReceiverTracker already started")
  }

  if (!receiverInputStreams.isEmpty) {
    endpoint = ssc.env.rpcEnv.setupEndpoint(
      "ReceiverTracker"new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
    if (!skipReceiverLaunch) launchReceivers()
    logInfo("ReceiverTracker started")
    trackerState Started
  }
}

11.2.1、创建一个ReceiverTrackerEndpoint消息通信体
override def receive: PartialFunction[Any, Unit] = {
  // Local messages
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receiversgetExecutors)
    for (receiver <- receivers) {
      val executors = scheduledLocations(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamIdexecutors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiverexecutors)
    }
  case RestartReceiver(receiver) =>
    // Old scheduled executors minus the ones that are not active any more
    val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
    val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
        // Try global scheduling again
        oldScheduledExecutors
      } else {
        val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
        // Clear "scheduledLocations" to indicate we are going to do local scheduling
        val newReceiverInfo = oldReceiverInfo.copy(
          state = ReceiverState.INACTIVEscheduledLocations = None)
        receiverTrackingInfos(receiver.streamId) = newReceiverInfo
        schedulingPolicy.rescheduleReceiver(
          receiver.streamId,
          receiver.preferredLocation,
          receiverTrackingInfos,
          getExecutors)
      }
    // Assume there is one receiver restarting at one time, so we don‘t need to update
    // receiverTrackingInfos
    startReceiver(receiverscheduledLocations)
 
11.2.1.1、ReceiverSchedulingPolicy.scheduleReceivers,从下面的代码中可以看出来在那些Executor上启动Receiver,以及怎么具体在Executor上启动Receiver
// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
for (i <- until receivers.length) {
  // Note: preferredLocation is host but executors are host_executorId
  receivers(i).preferredLocation.foreach { host =>
    hostToExecutors.get(host) match {
      case Some(executorsOnHost) =>
        // preferredLocation is a known host. Select an executor that has the least receivers in
        // this host
        val leastScheduledExecutor =
          executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
        scheduledLocations(i) += leastScheduledExecutor
        numReceiversOnExecutor(leastScheduledExecutor) =
          numReceiversOnExecutor(leastScheduledExecutor) + 1
      case None =>
        // preferredLocation is an unknown host.
        // Note: There are two cases:
        // 1. This executor is not up. But it may be up later.
        // 2. This executor is dead, or it‘s not a host in the cluster.
        // Currently, simply add host to the scheduled executors.

        // Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
        // this case
        scheduledLocations(i) += TaskLocation(host)
    }
  }
}

补充:ReceiverTracker本身不直接监管Receiver,它是Driver级别的可间接地,用ReceiverSupervisor监控那台机器上Executor中的Receiver。

11.2.2、if (!skipReceiverLaunch) launchReceivers()
/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  })

  runDummySparkJob()

  logInfo("Starting " + receivers.length + " receivers")
  endpoint.send(StartAllReceivers(receivers))
}

11.2.2.1运行了一个Dummy的作业,确保所有的Slaves正常工作,保证所有的Receiver都在一台机器上
/**
 * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
 * receivers to be scheduled on the same node.
 *
 * TODO Should poll the executor number and wait for executors according to
 * "spark.scheduler.minRegisteredResourcesRatio" and
 * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
 */
private def runDummySparkJob(): Unit = {
  if (!ssc.sparkContext.isLocal) {
    ssc.sparkContext.makeRDD(to 5050).map(x => (x1)).reduceByKey(_ + _20).collect()
  }
  assert(getExecutors.nonEmpty)
}

11.2.2.2、endpoint.send(StartAllReceivers(receivers)
// endpoint is created when generator starts.
// This not being null means the tracker has been started and not stopped
private var endpoint: RpcEndpointRef = null

endpoint = ssc.env.rpcEnv.setupEndpoint(
  "ReceiverTracker"new ReceiverTrackerEndpoint(ssc.env.rpcEnv))

ReceiverTrackerEndpoint
override def receive: PartialFunction[Any, Unit] = {
  // Local messages
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receiversgetExecutors)
    for (receiver <- receivers) {
      val executors = scheduledLocations(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamIdexecutors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiverexecutors)
    }

startReceiver
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
  (iterator: Iterator[Receiver[_]]) => {
    if (!iterator.hasNext) {
      throw new SparkException(
        "Could not start receiver as object not found.")
    }
    if (TaskContext.get().attemptNumber() == 0) {
      val receiver = iterator.next()
      assert(iterator.hasNext == false)
      val supervisor = new ReceiverSupervisorImpl(
        receiverSparkEnv.getserializableHadoopConf.valuecheckpointDirOption)
      supervisor.start()
      supervisor.awaitTermination()
    } else {
      // It‘s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
    }
  }

逆天的设计啊
  // Create the RDD using the scheduledLocations to run the receiver in a Spark job
  val receiverRDD: RDD[Receiver[_]] =
    if (scheduledLocations.isEmpty) {
      ssc.sc.makeRDD(Seq(receiver)1)
    } else {
      val preferredLocations = scheduledLocations.map(_.toString).distinct
      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    }
  receiverRDD.setName(s"Receiver $receiverId")
  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    receiverRDDstartReceiverFuncSeq(0)(__) => Unit, ())
  // We will keep restarting the receiver job until ReceiverTracker is stopped
  future.onComplete {
    case Success(_) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
    case Failure(e) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logError("Receiver has been stopped. Try to restart it."e)
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
  }(submitJobThreadPool)
  logInfo(s"Receiver ${receiver.streamId} started")
}

ReceiverSupervisorImpl.startReceiver
/** Start receiver */
def startReceiver(): Unit = synchronized {
  try {
    if (onReceiverStart()) {
      logInfo("Starting receiver")
      receiverState Started
      receiver.onStart()
      logInfo("Called receiver onStart")
    } else {
      // The driver refused us
      stop("Registered unsuccessfully because Driver refused to start receiver " streamIdNone)
    }

override protected def onReceiverStart(): Boolean = {
  val msg = RegisterReceiver(
    streamIdreceiver.getClass.getSimpleNamehostexecutorIdendpoint)
  trackerEndpoint.askWithRetry[Boolean](msg)
}

11.3、JobScheduler.start  jobGenerator.start()
/** Start generation of jobs */
def start(): Unit = synchronized {
  if (eventLoop != nullreturn // generator has already been started

  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter

  eventLoop new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator"e)
    }
  }
  eventLoop.start()

  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    startFirstTime()
  }
}

根据时间间隔不断发送消息
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(timeclearCheckpointDataLater) =>
      doCheckpoint(timeclearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
  match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(timejobsstreamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + timee)
  }
  eventLoop.post(DoCheckpoint(timeclearCheckpointDataLater = false))
}

def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.timejobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn‘t, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

  private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._

    def run() {
      try {
        val formattedTime = UIUtils.formatBatchTime(
          job.time.millisecondsssc.graph.batchDuration.millisecondsshowYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEYjob.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEYjob.outputOpId.toString)

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it‘s possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(jobclock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(jobclock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEYnull)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEYnull)
      }
    }
  }
}

private def processEvent(event: JobSchedulerEvent) {
  try {
    event match {
      case JobStarted(jobstartTime) => handleJobStart(jobstartTime)
      case JobCompleted(jobcompletedTime) => handleJobCompletion(jobcompletedTime)
      case ErrorReported(me) => handleError(me)
    }
  } catch {
    case e: Throwable =>
      reportError("Error in job scheduler"e)
  }
}
 
private def handleJobStart(job: JobstartTime: Long) {
  val jobSet = jobSets.get(job.time)
  val isFirstJobOfJobSet = !jobSet.hasStarted
  jobSet.handleJobStart(job)
  if (isFirstJobOfJobSet) {
    // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the
    // correct "jobSet.processingStartTime".
    listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
  }
  job.setStartTime(startTime)
  listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))
  logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
}

  private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._

    def run() {
      try {
        val formattedTime = UIUtils.formatBatchTime(
          job.time.millisecondsssc.graph.batchDuration.millisecondsshowYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEYjob.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEYjob.outputOpId.toString)

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it‘s possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(jobclock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(jobclock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEYnull)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEYnull)
      }
    }
  }
}


资料来源于:王家林(Spark版本定制班课程)
新浪微博:http://www.weibo.com/ilovepains


版本定制第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

标签:

原文地址:http://blog.csdn.net/lhui798/article/details/51340828

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!