标签:dep unit gis length 依赖 play new 方法 list
DStream内部其实是RDD序列,所有的DStream操作最终都转换为RDD操作。通过分析源码,可以进一步窥探这种转换是如何进行的。
DStream有一些与RDD类似的基础属性:
DStream的操作分为两类,一类是Transformation操作,对应RDD的Transformation操作。以flatMap为例,DStream中的flatMap不过是返回一个新的DStream派生类FlatMappedDStream,这一点跟RDD的flatMap非常类似。DStream的flatMap定义如下:
/** * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) }
而FlatMappedDStream的实现也很简单,主要作用是像RDD一样维护计算关系链,完整定义如下:
private[streaming] class FlatMappedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], flatMapFunc: T => TraversableOnce[U] ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) } }
其中compute调用DStream的getOrCompute方法用于读取RDD的内存,要么放到缓存中,要么调用接口函数compute计算生成。
DStream另外一类操作是OutPut操作,Output操作才会触发DStream的实际执行,作用非常类似于RDD的Action操作,类如print操作,定义如下:
/** * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(): Unit = ssc.withScope { print(10) } /** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println(s"Time: $time") println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) }
DStream.print调用了RDD.take方法,而后者是一个Action操作,是不是所有的DStream输出操作最后都调用一个RDD的Action操作呢,看看saveAsTextFiles和saveAsObjectFiles,它们没有直接调用RDD Action操作【而是先调用一下rdd.saveAsTextFile】,然后通过foreachRDD来实现的,传入的函数中调用了RDD的Action。saveAsTextFiles的定义如下:
/** * Save each RDD in this DStream as at text file, using string representation * of elements. The file name at each batch interval is generated based on * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } this.foreachRDD(saveFunc, displayInnerRDDOps = false) }
相比之下,另外一个最灵活的Output操作foreachRDD完全依赖传入的函数来实现功能,所以对于foreachRDD的使用至少要包含一个RDD Action调用。因为Spark Streaming的调度是由Output方法触发的,每个周期调用一次所有定义的Output方法。Output内部再调用RDD Action最终完成计算,否则程序只会接收数据,然后丢弃,不执行计算。
标签:dep unit gis length 依赖 play new 方法 list
原文地址:https://www.cnblogs.com/yszd/p/13344251.html