标签:
本期要点:
1、探讨Spark Streaming Job架构和运行机制
2、探讨Spark Streaming 容错机制
关于SparkStreaming我们在前面的博客中其实有所探讨,SparkStreaming是运行在SparkCode之前的一个子框架,下面我们通过一个简单例子来逐一探讨SparkStreaming运行机制和架构
//新浪微博:http://weibo.com/ilovepains/
SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("WordCountOnline");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaReceiverInputDStream lines = jsc.socketTextStream("Master", 9999);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
这是一个SparkStreaming单词记数的例子
在SparkStreaming程序中是StreamingContext是SparkStreaming应用程序所有功能的起始点和程序调度的核心,我们来看一下StreamingContext初始化的部分源码:
//StreamingContext.scala 183行
private[streaming] val scheduler = new JobScheduler(this)
我们可以看到在构建StreamingContext的时候,StreamingContext初始化了JobScheduler,而在JobScheduler中又初始化了JobGenerator,同时定义了receiverTracker变量,如下
//JobScheduler.scala 50行
private val jobGenerator = new JobGenerator(this)
val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus()
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
下面我们来看jsc.socketTextStream(“Master”, 9999)创建DStream背后的部分源码:
// StreamingContext.scala 327行
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
// StreamingContext.scala 345行
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
从上面我们可以看到StreamingContext对socketStream方法进行了方法重载,最终调用的是SocketInputDStream,那我们接着来看一下SocketInputDStream
private[streaming]
class SocketInputDStream[T: ClassTag](
ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
在SocketInputDStream中定了接受数据的getReceiver方法,当然咋们看到的这些都处于方法定义或者对象初始化的阶段,还没真正开始执行
那现在我们接着来看jsc.start()开始启动程序执行方法
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
我们可以开到jsc.start(),其实做了很多工作,但我们重点关注一下:scheduler.start()
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext)
//JobScheduler.scala 80行
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
//JobScheduler.scala 83行
jobGenerator.start()
logInfo("Started JobScheduler")
}
我现在可以看到在JobScheduler的start方法中receiverTracker得到了初始化,并且调用了其start方法
//ReceiverTracker.scala 149行
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
//ReceiverTracker.scala 413行
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
endpoint.send(StartAllReceivers(receivers))
}
至此我们可以看到,在StreamingContext执行start方法时会调用JobScheduler的start方法,而在JobScheduler的start方法中会初始化ReceiverTracker并执行其start方法,ReceiverTracker执行start方法时最终是通过rpc通信的方式通知Worker中的excutor进程开始不断接受数据,并将元数据信息汇报给driver
下面我们接着回到JobScheduler.scala 83行,看jobGenerator.start()方法:
//JobGenerator.scala 79行,
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started
// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
到这块已经完成了SparkStreaming启动ReceiverTracker接受数据并且通过JobGenerator Job生成器产生Job,运行在cluster之上
当然我们在程序当中可以看到源码当中其实有很多线程池的使用,笔者认为其中最大的好处在于可以减少创建新线程的时间消耗而又可以达到对线程的高度复用(类似于数据库的连接池是一个道理)
Spark Streaming底层实际上就是RDD的集合,基于这种特性,它的容错机制主要就是两种:一是checkpoint,二是基于lineage(血统)的容错。当然如果lineage链条过于复杂和冗长,这时候就需要做checkpoint
由于RDD的依赖关系,如果stage之间都是窄依赖,此时一般基于lineage容错,方便高效。在stage之间如果是宽依赖,而宽依赖一般会产生shuffle操作,这时候我们就需要考虑checkpoint了
解密SparkStreaming运行机制和架构进阶之Job和容错(第三篇)
标签:
原文地址:http://blog.csdn.net/xiaojun220/article/details/51336727