标签:leo 科学 stopped nts hdfs cer efault 多少 local
val rdd = sc.textFile("hdfs://Master.hdp:9000/wc").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
rdd.saveAsTextFile("hdfs://Master.hdp:9000/out01")
思考:在spark的wordcount过程一共产生多少个RDD?
通过该命令(scala> rdd.toDebugString)可以查看RDD的依赖关系
(6个,除了图中的五个,rdd.saveAsTextFile也还会产生一个RDD)
接下来一步步分析(通过查看spark源码进行分析)
(1) sc.textFile("hdfs://Master.hdp:9000/wc")
产生两个RDD:HadoopRDD -> MapPartitinsRDD
查看Spark的textFile的源码:
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
参数:路径,分区数(默认分区:如果是从HDFS里面读数据,分区的数量由切片数量决定,hadoop2.0一个切片默认128M)
调用hadoopFile方法: 传入path,读入方式是以InputFormat方式读取(也可以自定义读取方式),
classOf[LongWritable], classOf[Text]参数:(读取数据的类型)hadoop是以K—V形式读取数据
LongWritable是偏移量(这是hadoop对Int类型的序列化),Text是每一行的数据
在hadoopFile方法中,new了一个 HadoopRDD,这里产生了第一个RDD。
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
再回到textFlie方法中,在调用hadoopFile方法产生一个hadoopRDD,又调用.map方法
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
看下源码中hadoopRDD究竟是什么?
class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableConfiguration], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging { if (initLocalJobConfFuncOpt.isDefined) { sparkContext.clean(initLocalJobConfFuncOpt.get) } def this( sc: SparkContext, conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int) = { this( sc, sc.broadcast(new SerializableConfiguration(conf)) .asInstanceOf[Broadcast[SerializableConfiguration]], initLocalJobConfFuncOpt = None, inputFormatClass, keyClass, valueClass, minPartitions) }
hadoopRDD中存放的是key和value,key是偏移量,value才是我们需要的值所以接着调用.map(pair => pair._2.toString)取出我们需要的value值,map过程产生第二个RDD
def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
可以看到map方法是会new 一个新的MapPartitionsRDD,传入的参数是一个hadoopRDD,pid是分区ID,iter是一个迭代器
(2).flatMap(_.split(" ")) //产生一个RDD :MapPartitinsRDD
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
//iter是每个分区数据的迭代器
//clean方法是对数据进行检测,确定数据没有问题(比如说是否序列化等等)
/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer‘s, updates REPL variables)
* If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
* check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
* if not.
*
* @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
* serializable
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}
(3).map((_, 1))//产生一个RDD MapPartitionsRDD
源代码同上
(4).reduceByKey(_+_)//产生一个RDD ShuffledRDD
/** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
// reduceByKey调用了combineByKeyWithClassTag
/**
* :: Experimental ::
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C‘s into a single one.
*
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
// 重点::new ShuffledRDD
ShuffledRDD进行聚合,先局部聚合,再全局聚合
(5).saveAsTextFile("hdfs://Master.hdp:9000/out01")//产生一个RDD: mapPartitions
重点:spark向HDFS中写入数据,通过流的方式写入,如果一次一个数据就打开流写一次太不科学了,
通过每次将一个分区的数据以流的方式传入到HDFS中再关闭流,所以该方法中调用.mapPartitions,又产生了一个mapPartitionsRDD
/** * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit // Ordering for it and will use the default `null`. However, it‘s a `Comparable[NullWritable]` // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an // Ordering for `NullWritable`. That‘s why the compiler will generate different anonymous // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. // // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate // same bytecodes for `saveAsTextFile`. val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] val r = this.mapPartitions { iter =>
val text = new Text() iter.map { x => text.set(x.toString) (NullWritable.get(), text) } } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) }
/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn‘t modify the keys.
*/
def mapPartitions[U: ClassTag]( //Iterator是一个分区的数据
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
标签:leo 科学 stopped nts hdfs cer efault 多少 local
原文地址:http://www.cnblogs.com/Khaleesi-yu/p/7425116.html