标签:
概要
准备
local-cluster mode
MASTER=local[1,2,1024] bin/spark-shell
DriverProgramme的初始化过程分析初始化过程的涉及的主要源文件
初始化过程步骤详解
private[spark] val env = SparkEnv.create( conf, "", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true, isLocal = isLocal) SparkEnv.set(env)
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName) taskScheduler.start()
override def start() { backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { checkSpeculatableTasks() } } }
@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start()
ui.start() RDD的转换过程
sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
步骤1:val rawFile = sc.textFile("README.md")
scala> sc.textFile("README.md")14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=31138775014/04/23 13:11:48 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB)14/04/2313:11:48 DEBUG BlockManager: Put block broadcast_0 locally took 277 ms14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took 281 msres0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13 步骤2: valsplittedText = rawFile.flatMap(line => line.split(" "))
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f)) 步骤3:val wordCount = splittedText.map(word => (word, 1))
步骤4:val reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂
implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) = newPairRDDFunctions(rdd)
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { reduceByKey(defaultPartitioner(self), func) } def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)] = { if (getKeyClass().isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => { newInterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else if (mapSideCombine) { val combined = self.mapPartitionsWithContext((context, iter) => { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) valpartitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) } else { // Don‘t apply map-side combiner. valvalues = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) => { newInterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } }
res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13RDD转换小结
运行过程分析数据集操作分类
Task运行期的函数调用
override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f)
堆栈输出 80 at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111) 81 at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154) 82 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) 83 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) 84 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 85 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 86 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 87 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 88 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 89 at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 90 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 91 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 92 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 93 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 94 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 95 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) 96 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 97 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 98 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) 99 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)100 at org.apache.spark.scheduler.Task.run(Task.scala:53)101 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)ResultTask
override def runTask(context: TaskContext): U = { metrics = Some(context.taskMetrics) try{ func(context, rdd.iterator(split, context)) } finally { context.executeOnCompleteCallbacks() } } 计算结果的传递
val blockManager = SparkEnv.get.blockManager val startTime = System.currentTimeMillis val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock)
Apache Spark源码走读之3 -- Task运行期之函数调用关系分析
标签:
原文地址:http://www.cnblogs.com/CRXY/p/4544475.html