标签:scala 超人学院 hadoop
概要
本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。
准备
1. spark已经安装完毕
2. spark运行在local mode或local-cluster mode
local-cluster
mode
local-cluster模式也称为伪分布式,可以使用如下指令运行
MASTER=local[1,2,1024]
bin/spark-shell
[1,2,1024] 分别表示,executor number, corenumber和内存大小,其中内存大小不应小于默认的512M
DriverProgramme的初始化过程分析初始化过程的涉及的主要源文件
1. SparkContext.scala 整个初始化过程的入口
2. SparkEnv.scala 创建BlockManager, MapOutputTrackerMaster, ConnectionManager,CacheManager
3. DAGScheduler.scala 任务提交的入口,即将Job划分成各个stage的关键
4. TaskSchedulerImpl.scala 决定每个stage可以运行几个task,每个task分别在哪个executor上运行
5. SchedulerBackend
1. 最简单的单机运行模式的话,看LocalBackend.scala
2. 如果是集群模式,看源文件SparkDeploySchedulerBackend
初始化过程步骤详解
步骤1: 根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv, SparkEnv中主要包含以下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4.ConnectionManager
private[spark] val env
= SparkEnv.create(
conf, "",
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
isDriver = true,
isLocal = isLocal) SparkEnv.set(env)
步骤2:创建TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键
private[spark] var taskScheduler
= SparkContext.createTaskScheduler(this,
master, appName) taskScheduler.start()
TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测
override def start()
{ backend.start()
if (!isLocal
&& conf.getBoolean("spark.speculation", false))
{ logInfo("Starting
speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
checkSpeculatableTasks()
} } }
步骤3:以上一步中创建的TaskScheduler实例为入参创建DAGScheduler并启动运行
@volatile private[spark] var dagScheduler
= new DAGScheduler(taskScheduler) dagScheduler.start()
步骤4:启动WEB UI
ui.start()
RDD的转换过程
还是以最简单的wordcount为例说明rdd的转换过程
sc.textFile("README.md").flatMap(line=>line.split("
")).map(word => (word, 1)).reduceByKey(_
+ _)
上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果
步骤1:val
rawFile = sc.textFile("README.md")
textFile先是生成hadoopRDD,然后再通过map操作生成MappedRDD,如果在spark-shell中执行上述语句,得到的结果可以证明所做的分析
scala> sc.textFile("README.md")14/04/23 13:11:48 WARN
SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes14/04/23 13:11:48 INFO
MemoryStore: ensureFreeSpace(119741)
called with curMem=0,
maxMem=31138775014/04/23 13:11:48 INFO
MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB,
free 296.8 MB)14/04/2313:11:48 DEBUG
BlockManager: Put block broadcast_0 locally took 277 ms14/04/23 13:11:48 DEBUG
BlockManager: Put for block
broadcast_0 without replication took 281 msres0:
org.apache.spark.rdd.RDD[String] = MappedRDD[1]
at textFile at :13
步骤2:
valsplittedText = rawFile.flatMap(line => line.split(" "))
flatMap将原来的MappedRDD转换成为FlatMappedRDD
def flatMap[U:
ClassTag](f: T => TraversableOnce[U]): RDD[U]=new FlatMappedRDD(this,sc.clean(f))
步骤3:val
wordCount = splittedText.map(word => (word, 1))
利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为MappedRDD
步骤4:val
reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂
步骤2,3中使用到的operation全部定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪迹。reduceByKey的定义出现在源文件PairRDDFunctions.scala
细心的你一定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctions
implicit def rddToPairRDDFunctions[K:
ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
newPairRDDFunctions(rdd)
这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scalaimplicit method"进行查询,会有不少的文章对此进行详尽的介绍。
接下来再看一看reduceByKey的定义
def reduceByKey(func:
(V, V) => V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func) } def reduceByKey(partitioner:
Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner) } def combineByKey[C](createCombiner:
V => C, mergeValue:
(C, V) => C, mergeCombiners:
(C, C) => C, partitioner:
Partitioner, mapSideCombine:
Boolean = true,
serializerClass: String = null):
RDD[(K, C)] = { if (getKeyClass().isArray)
{ if (mapSideCombine)
{ throw new SparkException("Cannot
use map-side combining with array keys.")
} if (partitioner.isInstanceOf[HashPartitioner])
{ throw new SparkException("Default
partitioner cannot partition array keys.")
} }
val aggregator
= new Aggregator[K,
V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner
== Some(partitioner)) {
self.mapPartitionsWithContext((context, iter) => {
newInterruptibleIterator(context,
aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else if (mapSideCombine)
{ val combined
= self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
valpartitioned
= new ShuffledRDD[K,
C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context,
aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// Don‘t apply map-side combiner.
valvalues
= new ShuffledRDD[K,
V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
newInterruptibleIterator(context,
aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} }
reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext之后,shuffleRDD被转换成为MapPartitionsRDD
Log输出能证明我们的分析
res1: org.apache.spark.rdd.RDD[(String,
Int)] = MapPartitionsRDD[8]
at reduceByKey at :13RDD转换小结
小结一下整个RDD转换过程
HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD
整个转换过程好长啊,这一切的转换都发生在任务提交之前。
运行过程分析数据集操作分类
在对任务运行过程中的函数调用关系进行分析之前,我们也来探讨一个偏理论的东西,作用于RDD之上的Transformantion为什么会是这个样子?
对这个问题的解答和数学搭上关系了,从理论抽象的角度来说,任务处理都可归结为“input->processing->output"。input和output对应于数据集dataset.
在此基础上作一下简单的分类
1. one-one 一个dataset在转换之后还是一个dataset,而且dataset的size不变,如map
2. one-one 一个dataset在转换之后还是一个dataset,但size发生更改,这种更改有两种可能:扩大或缩小,如flatMap是size增大的操作,而subtract是size变小的操作
3. many-one 多个dataset合并为一个dataset,如combine, join
4. one-many 一个dataset分裂为多个dataset, 如groupBy
Task运行期的函数调用
task的提交过程参考本系列中的第二篇文章。本节主要讲解当task在运行期间是如何一步步调用到作用于RDD上的各个operation
-
TaskRunner.run
-
Task.run
-
Task.runTask (Task是一个基类,有两个子类,分别为ShuffleMapTask和ResultTask)
-
RDD.iterator
-
RDD.computeOrReadCheckpoint
或许当看到RDD.compute函数定义时,还是觉着f没有被调用,以MappedRDD的compute定义为例
override def compute(split:
Partition, context: TaskContext) =
firstParent[T].iterator(split, context).map(f)
堆栈输出 80
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111) 81
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154) 82
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) 83
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) 84
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 85
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 86
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 87
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 88
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 89
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 90
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 91
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 92
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 93
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 94
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 95
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) 96
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 97
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 98
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) 99
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)100
at org.apache.spark.scheduler.Task.run(Task.scala:53)101
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)ResultTask
compute的计算过程对于ShuffleMapTask比较复杂,绕的圈圈比较多,对于ResultTask就直接许多。
override def runTask(context:
TaskContext): U = {
metrics = Some(context.taskMetrics)
try{
func(context, rdd.iterator(split, context))
} finally {
context.executeOnCompleteCallbacks()
} }
计算结果的传递
上面的分析知道,wordcount这个job在最终提交之后,被DAGScheduler分为两个stage,第一个Stage是shuffleMapTask,第二个Stage是ResultTask.
那么ShuffleMapTask的计算结果是如何被ResultTask取得的呢?这个过程简述如下
1. ShffuleMapTask将计算的状态(注意不是具体的数据)包装为MapStatus返回给DAGScheduler
2. DAGScheduler将MapStatus保存到MapOutputTrackerMaster中
3. ResultTask在执行到ShuffleRDD时会调用BlockStoreShuffleFetcher的fetch方法去获取数据
1. 第一件事就是咨询MapOutputTrackerMaster所要取的数据的location
2. 根据返回的结果调用BlockManager.getMultiple获取真正的数据
BlockStoreShuffleFetcher的fetch函数伪码
val blockManager
= SparkEnv.get.blockManager
val startTime
= System.currentTimeMillis
val statuses
= SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
shuffleId, reduceId, System.currentTimeMillis - startTime))
val blockFetcherItr
= blockManager.getMultiple(blocksByAddress, serializer)
val itr
= blockFetcherItr.flatMap(unpackBlock)
注意上述代码中的getServerStatuses及getMultiple,一个是询问数据的位置,一个是去获取真正的数据。
更多精彩内容请关注:http://bbs.superwu.cn
关注超人学院微信二维码:
关注超人学院java免费学习交流群:
Apache Spark源码走读之3 -- Task运行期之函数调用关系分析
标签:scala 超人学院 hadoop
原文地址:http://blog.csdn.net/crxy2014/article/details/46311211