码迷,mamicode.com
首页 > 其他好文 > 详细

(版本定制)第9课:Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

时间:2016-05-22 00:48:17      阅读:272      评论:0      收藏:0      [点我收藏+]

标签:spark streaming receiver解析

本期内容:

    1、Receiver启动方式的设想

    2、Receiver启动源码彻底分析

一:Receiver启动方式的设想 
1. 
Spark Streaming通过Receiver持续不断的从外部数据源接收数据,并把数据汇报给Driver端,由此每个Batch Durations就可以根据汇报的数据生成不同的Job。 
2. Receiver是在Spark Streaming应用程序启动时启动的,那么我们找Receiver在哪里启动就应该去找Spark Streaming的启动。 
3. Receivers和InputDStreams是一一对应的,默认情况下一般只有一个Receiver.

如何启动Receiver? 
1. 从Spark Core的角度来看,Receiver的启动Spark Core并不知道,就相当于Linux的内核之上所有的都是应用程序,因此Receiver是通过Job的方式启动的

2. 一般情况下,只有一个Receiver,但是可以创建不同的数据来源的InputDStream.

final private[streaming] class DStreamGraph extends Serializable with Logging {

 private val inputStreams = new ArrayBuffer[InputDStream[_]]()
//数组
 
private val outputStreams = new ArrayBuffer[DStream[_]]()
3.  启动Receiver的时候,启动一个Job,这个Job里面有RDD的transformations操作和action的操作,这个Job只有一个partition.这个partition的特殊是里面只有一个成员,
这个成员就是启动的Receiver.
4.  这样做的问题:
a)  如果有多个InputDStream,那就要启动多个Receiver,每个Receiver也就相当于分片partition,那我们启动Receiver的时候理想的情况下是在不同的机器上启动Receiver,
但是Spark Core的角度来看就是应用程序,感觉不到Receiver的特殊性,所以就会按照正常的Job启动的方式来处理,极有可能在一个Executor上启动多个Receiver.
这样的话就可能导致负载不均衡。
b)  有可能启动Receiver失败,只要集群存在Receiver就不应该失败。
c)  运行过程中,就默认的而言如果是一个partition的话,那启动的时候就是一个Task,但是此Task也很可能失败,因此以Task启动的Receiver也会挂掉。

由此,可以得出,对于Receiver失败的话,后果是非常严重的,那么Spark Streaming如何防止这些事的呢,下面就寻找Receiver的创建

这里先给出答案,后面源码会详细分析: 
a) Spark使用一个Job启动一个Receiver.最大程度的保证了负载均衡。 
b) Spark Streaming指定每个Receiver运行在哪些Executor上。 
c) 如果Receiver启动失败,此时并不是Job失败,在内部会重新启动Receiver.

接下来我们通过代码一步一步解析Receiver是如何启动的

1、首先我们在编写具体的应用程序的时候,都会调用StreamingContext的start方法,其实这就是job启动的源头,我们先来看下start方法的源码:

def start(): Unit = synchronized {
 state match {
   case INITIALIZED =>
     startSite.set(DStream.getCreationSite())
     StreamingContext.ACTIVATION_LOCK.synchronized {
       StreamingContext.assertNoOtherContextIsActive()
       try {
         validate()

         // Start the streaming scheduler in a new thread, so that thread local properties
         // like call sites and job groups can be reset without affecting those of the
         // current thread.
         
ThreadUtils.runInNewThread("streaming-start") {
           sparkContext.setCallSite(startSite.get)
           sparkContext.clearJobGroup()
           sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
           scheduler.start()
//启动JobScheduler的start方法,启动子线程,一方面为了本地初始化工作,另外一方面是不要阻塞主线程。
         }
         
state = StreamingContextState.ACTIVE
       
} catch {
         
case NonFatal(e) =>
           logError(
"Error starting the context, marking it as stopped", e)
           
scheduler.stop(false)
           
state = StreamingContextState.STOPPED
           
throw e
       }
       StreamingContext.
setActiveContext(this)
     }
     
shutdownHookRef = ShutdownHookManager.addShutdownHook(
       StreamingContext.
SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
     
// Registering Streaming Metrics at the start of the StreamingContext
     
assert(env.metricsSystem != null)
     
env.metricsSystem.registerSource(streamingSource)
     
uiTab.foreach(_.attach())
     logInfo(
"StreamingContext started")
   
case ACTIVE =>
     logWarning(
"StreamingContext has already been started")
   
case STOPPED =>
     
throw new IllegalStateException("StreamingContext has already been stopped")
 }
}

2、上面调用start方法的时候,会调用JobScheduler的start()方法,在该方法里面,receiverTracker启动了,源码如下:

def start(): Unit = synchronized {
 if (eventLoop != null) return // scheduler has already been started

 
logDebug("Starting JobScheduler")
 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
   override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
   override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
 }
 eventLoop.start()
 // attach rate controllers of input streams to receive batch completion updates
 
for {
   inputDStream <- ssc.graph.getInputStreams
   rateController <- inputDStream.rateController
 
} ssc.addStreamingListener(rateController)

 listenerBus.start(ssc.sparkContext)
 receiverTracker = new ReceiverTracker(ssc)
 inputInfoTracker = new InputInfoTracker(ssc)
 receiverTracker.start()
//启动Receiver
 
jobGenerator.start()
 logInfo(
"Started JobScheduler")
}

3、我们接着看下receiverTracker的start()方法,在start方法里启动了RPC消息通信体,为啥呢?因为receiverTracker会监控整个集群中的Receiver,Receiver转过来要向ReceiverTrackerEndpoint汇报自己的状态,接收的数据,包括生命周期等信息

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
 if (isTrackerStarted) {
   throw new SparkException("ReceiverTracker already started")
 }

 if (!receiverInputStreams.isEmpty) {
//Receiver的启动是依据数据流的
   
endpoint = ssc.env.rpcEnv.setupEndpoint(
     
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //汇报状态信息
   
if (!skipReceiverLaunch) launchReceivers() //发起Receiver
   logInfo(
"ReceiverTracker started")
   
trackerState = Started
 
}
}

4、基于ReceiverInputDStream(是在Driver端)来获得具体的Receivers实例,然后再把他们分不到Worker节点上。一个ReceiverInputDStream只产生一个Receiver

/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
 val receivers = receiverInputStreams.map(nis => {
   //一个输入数据来源只产生一个Receiver
   
val rcvr = nis.getReceiver()
   rcvr.setReceiverId(nis.id)
   rcvr
 })

 runDummySparkJob()
//启动虚拟Job来分配Receiver到不同的executor上

 logInfo(
"Starting " + receivers.length + " receivers")
 
endpoint.send(StartAllReceivers(receivers))
}
5、其中runDummySparkJob()为了确保所有节点活着,而且避免所有的receivers集中在一个节点上。
private def runDummySparkJob(): Unit = {
  if (!ssc.sparkContext.isLocal) {
    ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
  }
  assert(getExecutors.nonEmpty)
}

ReceiverInputDStream中的getReceiver()方法获得receiver对象然后将它发送到worker节点上实例化receiver,然后去接收数据。 
此方法必须要在子类中实现。

/**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
* of a ReceiverInputDStream.
*/
def getReceiver(): Receiver[T]

ReceiverInputDStream是抽象类,所以getReceiver方法必须要在继承的子类中实现

private[streaming]
class SocketInputDStream[T: ClassTag](
   ssc_ : StreamingContext,
   
host: String,
   
port: Int,
   
bytesToObjects: InputStream => Iterator[T],
   
storageLevel: StorageLevel
 ) extends ReceiverInputDStream[T](ssc_) {

 def getReceiver(): Receiver[T] = {
   new SocketReceiver(host, port, bytesToObjects, storageLevel)
//调用SocketReceiver
 }
}

private[streaming]
class SocketReceiver[T: ClassTag](
   host:
String,
   
port: Int,
   
bytesToObjects: InputStream => Iterator[T],
   
storageLevel: StorageLevel
 )
extends Receiver[T](storageLevel) with Logging {

 
def onStart() {
   
// Start the thread that receives data over a connection
   
new Thread("Socket Receiver") {
     setDaemon(
true)
     
override def run() { receive() } //启动线程,调用Receiver方法
   }.start()
 }

在receive()方法中启动socket接收数据

  /** Create a socket connection and receive data until receiver is stopped */
 
def receive() {
   var socket: Socket = null
   try
{
     logInfo("Connecting to " + host + ":" + port)
     socket = new Socket(host, port)
//根据我们应用程序传入的host和post创建socket对象
     logInfo(
"Connected to " + host + ":" + port)
     
val iterator = bytesToObjects(socket.getInputStream()) //接收数据
     
while(!isStopped && iterator.hasNext) {
       store(iterator.next)
//接收后的数据进行存储
     }
     
if (!isStopped()) {
       restart(
"Socket data stream had no more data")
     }
else {
       logInfo(
"Stopped receiving")
     }
   }
catch {
     
case e: java.net.ConnectException =>
       restart(
"Error connecting to " + host + ":" + port, e)
     
case NonFatal(e) =>
       logWarning(
"Error receiving data", e)
       restart(
"Error receiving data", e)
   }
finally {
     
if (socket != null) {
       socket.close()
       logInfo(
"Closed socket to " + host + ":" + port)
     }
   }
 }
}
6、ReceiverTrackerEndpoint源码如下:
override def receive: PartialFunction[Any, Unit] = {
 // Local messages
 
case StartAllReceivers(receivers) =>
   val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
// receivers就是要启动的receiver,getExecutors获得集群中的Executors的列表
   
for (receiver <- receivers) {
     
val executors = scheduledLocations(receiver.streamId)
     updateReceiverScheduledExecutors(receiver.streamId
, executors)
     
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
     startReceiver(receiver
, executors) //循环receivers,每次将一个receiver传入过去。
   }
 
case RestartReceiver(receiver) =>
   
// Old scheduled executors minus the ones that are not active any more
   
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
   
val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
       
// Try global scheduling again
       
oldScheduledExecutors
     }
else {
       
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
       
// Clear "scheduledLocations" to indicate we are going to do local scheduling
       
val newReceiverInfo = oldReceiverInfo.copy(
         state = ReceiverState.
INACTIVE, scheduledLocations = None)
       
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
       
schedulingPolicy.rescheduleReceiver(
         receiver.streamId
,
         
receiver.preferredLocation,
         
receiverTrackingInfos,
         
getExecutors)
     }
   
// Assume there is one receiver restarting at one time, so we don‘t need to update
   // receiverTrackingInfos
   
startReceiver(receiver, scheduledLocations)
 
case c: CleanupOldBlocks =>
   
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
 
case UpdateReceiverRateLimit(streamUID, newRate) =>
   
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
     eP.send(
UpdateRateLimit(newRate))
   }
 
// Remote messages
 
case ReportError(streamId, message, error) =>
   reportError(streamId
, message, error)
}
从注释中可以看到,Spark Streaming指定receiver在那些Executors运行,而不是基于Spark Core中的Task来指定。
通过StartAllReceivers将消息发送给ReceiverTrackerEndpoint

在for循环中为每个receiver分配相应的executor。并调用startReceiver方法:

Receiver是以job的方式启动的!!! 这里你可能会有疑惑,没有RDD和来的Job呢?首先,在startReceiver方法中,会将Receiver封装成RDD

receiverRDD: RDD[Receiver[_]] =
  (scheduledLocations.isEmpty) {
    ssc..makeRDD((receiver))
  } {
    preferredLocations = scheduledLocations.map(_.toString).distinct
    ssc..makeRDD((receiver -> preferredLocations))
  }

封装成RDD后,将RDD提交到集群中运行

future = ssc.sparkContext.submitJob[Receiver[_]](
  receiverRDDstartReceiverFunc()(__) => ())

task被发送到executor中,从RDD中取出“Receiver”然后对它执行startReceiverFunc:

// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
 (iterator:
Iterator[Receiver[_]]) => {
   
if (!iterator.hasNext) {
     
throw new SparkException(
       
"Could not start receiver as object not found.")
   }
   
if (TaskContext.get().attemptNumber() == 0) {
     
val receiver = iterator.next()
     
assert(iterator.hasNext == false)
     
val supervisor = new ReceiverSupervisorImpl( //Receiver注册
       receiver
, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
     supervisor.start()
//启动Receiver
     supervisor.awaitTermination()
   }
else {
     
// It‘s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
   
}
 }

在函数中创建了一个ReceiverSupervisorImpl对象。它用来管理具体的Receiver。

首先它会将Receiver注册到ReceiverTracker中
override protected def onReceiverStart(): Boolean = {
 val msg = RegisterReceiver(
   streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
 trackerEndpoint.askWithRetry[Boolean](msg)
}

如果注册成功,通过supervisor.start()来启动Receiver

/** Start the supervisor */
def start() {
 onStart()
 startReceiver()
//启动Receiver
}
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
  case Success(_) =>
    if (!shouldStartReceiver) {
      onReceiverJobFinish(receiverId)
    } else {
      logInfo(s"Restarting Receiver $receiverId")
      self.send(RestartReceiver(receiver))
    }
  case Failure(e) =>
    if (!shouldStartReceiver) {
      onReceiverJobFinish(receiverId)
    } else {
      logError("Receiver has been stopped. Try to restart it.", e)
      logInfo(s"Restarting Receiver $receiverId")
      self.send(RestartReceiver(receiver))
    }
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")

回到receiverTracker的startReceiver方法中,只要Receiver对应的Job结束了(无论是正常还是异常结束),而ReceiverTracker还没有停止。
它将会向ReceiverTrackerEndpoint发送一个ReStartReceiver的方法。

// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
  case Success(_) =>
    if (!shouldStartReceiver) {
      onReceiverJobFinish(receiverId)
    } else {
      logInfo(s"Restarting Receiver $receiverId")
      self.send(RestartReceiver(receiver))
    }
  case Failure(e) =>
    if (!shouldStartReceiver) {
      onReceiverJobFinish(receiverId)
    } else {
      logError("Receiver has been stopped. Try to restart it.", e)
      logInfo(s"Restarting Receiver $receiverId")
      self.send(RestartReceiver(receiver))
    }
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")

重新为Receiver选择一个executor,并再次运行Receiver。直到ReceiverTracker启动为止。

Spark使用submit Job的方式启动Receiver,而在应用程序执行的时候会有很多Receiver,这个时候是启动一个Receiver呢,还是把所有的Receiver通过这一个Job启动? 
在ReceiverTracker的receive方法中startReceiver方法第一个参数就是receiver,从实现的可以看出for循环不断取出receiver,然后调用startReceiver。由此就可以得出一个Job只启动一个Receiver. 
如果Receiver启动失败,此时并不会认为是作业失败,会重新发消息给ReceiverTrackerEndpoint重新启动Receiver,这样也就确保了Receivers一定会被启动,这样就不会像Task启动Receiver的话如果失败受重试次数的影响。

简单的流程图:

技术分享

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

本文出自 “DT_Spark大数据梦工厂” 博客,请务必保留此出处http://18610086859.blog.51cto.com/11484530/1775759

(版本定制)第9课:Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

标签:spark streaming receiver解析

原文地址:http://18610086859.blog.51cto.com/11484530/1775759

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!