标签:spark catalyst sql spark sql shark
接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节:lazy val toRdd: RDD[Row] = executedPlan.execute()Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join、Aggregate和Sort这种稍复杂的。
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output = projectList.map(_.toAttribute)
override def execute() = child.execute().mapPartitions { iter => //对每个分区进行f映射
@transient val reusableProjection = new MutableProjection(projectList)
iter.map(reusableProjection)
}
} 通过观察MutableProjection的定义,可以发现,就是bind references to a schema 和 eval的过程:case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema))) //bound schema
private[this] val exprArray = expressions.toArray
private[this] val mutableRow = new GenericMutableRow(exprArray.size) //新的Row
def currentValue: Row = mutableRow
def apply(input: Row): Row = {
var i = 0
while (i < exprArray.length) {
mutableRow(i) = exprArray(i).eval(input) //根据输入的input,即一个Row,计算生成的Row
i += 1
}
mutableRow //返回新的Row
}
}case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
override def output = child.output
override def execute() = child.execute().mapPartitions { iter =>
iter.filter(condition.eval(_).asInstanceOf[Boolean]) //计算表达式 eval(input row)
}
}
case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)
extends UnaryNode
{
override def output = child.output
// TODO: How to pick seed?
override def execute() = child.execute().sample(withReplacement, fraction, seed)
}
case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
override def output = children.head.output
override def execute() = sqlContext.sparkContext.union(children.map(_.execute())) //子查询的结果进行union
override def otherCopyArgs = sqlContext :: Nil
}
case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext)
extends UnaryNode {
// TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
// partition local limit -> exchange into one partition -> partition local limit again
override def otherCopyArgs = sqlContext :: Nil
override def output = child.output
override def executeCollect() = child.execute().map(_.copy()).take(limit) //直接在driver调用take
override def execute() = {
val rdd = child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Boolean, Row]()
iter.take(limit).map(row => mutablePair.update(false, row)) //每个分区先计算limit
}
val part = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) //需要shuffle,来repartition
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
shuffled.mapPartitions(_.take(limit).map(_._2)) //最后单独一个partition来take limit
}
}
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
(@transient sqlContext: SQLContext) extends UnaryNode {
override def otherCopyArgs = sqlContext :: Nil
override def output = child.output
@transient
lazy val ordering = new RowOrdering(sortOrder) //这里是通过RowOrdering来实现排序的
override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering)
// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1)
}
case class Sort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
extends UnaryNode {
override def requiredChildDistribution =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
@transient
lazy val ordering = new RowOrdering(sortOrder) //排序顺序
override def execute() = attachTree(this, "sort") {
// TODO: Optimize sorting operation?
child.execute()
.mapPartitions(
iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator, //每个分区调用sorted方法,传入<span style="font-family: Arial, Helvetica, sans-serif;">ordering排序规则,进行排序</span>
preservesPartitioning = true)
}
override def output = child.output
}
object ExistingRdd {
def convertToCatalyst(a: Any): Any = a match {
case o: Option[_] => o.orNull
case s: Seq[Any] => s.map(convertToCatalyst)
case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)
case other => other
}
def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
data.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val bufferedIterator = iterator.buffered
val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)
bufferedIterator.map { r =>
var i = 0
while (i < mutableRow.length) {
mutableRow(i) = convertToCatalyst(r.productElement(i))
i += 1
}
mutableRow
}
}
}
}
def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
}
}Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现,布布扣,bubuko.com
Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现
标签:spark catalyst sql spark sql shark
原文地址:http://blog.csdn.net/oopsoom/article/details/38274621