在Spark Streaming中对于ReceiverInputDStream来说,都是现实一个Receiver,用来接收数据。而Receiver可以有很多个,并且运行在不同的worker节点上。这些Receiver都是由ReceiverTracker来管理的。
在ReceiverTracker的start方法中,会创建一个消息通信体ReceiverTrackerEndpoint:
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}然后再调用launchReceivers()方法
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
endpoint.send(StartAllReceivers(receivers))
}在上面的代码中,首先会从InputDStream中获取Receiver。每个InputDStream对于一个Receiver。而对Spark Streaming程序来说,可以有多个InputDStream 。
这里需要说明的是,它会运行一个方法runDummySparkJob(),从命名上可以看出,这是一个虚拟的Job。该Job的主要作用是让receivers尽量的分散到不同的worker上运行。
也行你会想Master不是知道系统中有哪些worker吗?直接用这些worker上的Executor不就可以了吗?这里会有一个问题,可能worker上的Executor宕机了,但是master并不知道。这样就会导致receiver被分配到一个不能执行的Executor上。使用了runDummySparkJob()方法后,在通过BlockManager获取到的Executor肯定当前是“活着”的。
怎么实现的呢?
private def runDummySparkJob(): Unit = {
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
assert(getExecutors.nonEmpty)
}private def getExecutors: Seq[ExecutorCacheTaskLocation] = {
if (ssc.sc.isLocal) {
val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId
Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))
} else {
ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
}.map { case (blockManagerId, _) =>
ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)
}.toSeq
}
}然后将StartAllReceivers消息发送给ReceiverTrackerEndpoint。接到消息后做如下处理:
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}在for循环中为每个receiver分配相应的executor。并调用startReceiver方法:
Receiver是以job的方式启动的!!! 这里你可能会有疑惑,没有RDD和来的Job呢?首先,在startReceiver方法中,会将Receiver封装成RDD
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}这个RDD中只有一条"数据",这个数据的本身就是一个Receiver对象。该Receiver对象通过job的方式会被发送到远程的executor中执行,可见它必须是可以序列化的。
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
封装成RDD后,将RDD提交到集群中运行
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
task被发送到executor中,从RDD中取出“Receiver”然后对它执行startReceiverFunc:
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It‘s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}在函数中创建了一个ReceiverSupervisorImpl对象。它用来管理具体的Receiver。
首先它会将Receiver注册到ReceiverTracker中
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}如果注册成功,则启动Receiver
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart()
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}回到receiverTracker的startReceiver方法中,如果Receiver启动失败了,它将会向ReceiverTrackerEndpoint发送一个ReStartReceiver的方法。
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)重新为Receiver选择一个executor,并再次运行Receiver。直到receiver启动为止。
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
本文出自 “叮咚” 博客,请务必保留此出处http://lqding.blog.51cto.com/9123978/1773912
第9课:Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考
原文地址:http://lqding.blog.51cto.com/9123978/1773912