标签:style blog http io ar os java sp for
/** Spark SQL源代码分析系列文章*/
自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,并且发展速度异常迅猛,究其原因,个人觉得有下面2点:
1、整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里。这样能够应用于多种任务,流处理,批处理,包含机器学习里都能够引入Sql。前一段时间測试过Shark,而且对Spark SQL也进行了一些測试,可是还是忍不住对Spark SQL一探到底,就从源码的角度来看一下Spark SQL的核心运行流程吧。
1. val sqlContext = new org.apache.spark.sql.SQLContext(sc) 2. import sqlContext._ 3.case class Person(name: String, age: Int) 4.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) 5.people.registerAsTable("people") 6.val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") 7.teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
一个存储<tableName,logicalPlan>的map结构,查找关系的文件夹,注冊表,注销表,查询表和逻辑计划关系的类。
Parse 传入的sql来对语法分词,构建语法树,返回一个logical plan
logical plan的语法分析器
logical Plan的优化器
逻辑计划,由catalyst的TreeNode组成,能够看到有3种语法树
包括不同策略的优化策略来优化物理运行计划
sql运行的环境上下文
就是这些对象组成了Spark SQL的执行时,看起来非常酷,有静态的metadata存储,有分析器、优化器、逻辑计划、物理计划、执行执行时。
那这些对象是怎么相互协作来运行sql语句的呢?
核心组件都是绿色的方框,每一步流程的结果都是蓝色的框框,调用的方法是橙色的框框。
先概括一下,大致的运行流程是:
Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed Spark Plan -> Execute SQL -> Generate RDD
更详细的运行流程:
sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 採用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 运行sql生成RDD
/** * Executes a SQL query using Spark, returning the result as a SchemaRDD. * * @group userf */ def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))结果是会生成一个逻辑计划
@transient protected[sql] val parser = new catalyst.SqlParser protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)
override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()我们能够非常清晰的看到运行步骤:
protected abstract class QueryExecution { def logical: LogicalPlan lazy val analyzed = analyzer(logical) //首先分析器会分析逻辑计划 lazy val optimizedPlan = optimizer(analyzed) //随后优化器去优化分析后的逻辑计划 // TODO: Don't just pick the first one... lazy val sparkPlan = planner(optimizedPlan).next() //依据策略生成plan物理计划 // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //最后生成已经准备好的Spark Plan /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[Row] = executedPlan.execute() //最后调用toRDD方法运行任务将结果转换为RDD protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } def simpleString: String = stringOrError(executedPlan) override def toString: String = s"""== Logical Plan == |${stringOrError(analyzed)} |== Optimized Logical Plan == |${stringOrError(optimizedPlan)} |== Physical Plan == |${stringOrError(executedPlan)} """.stripMargin.trim }
原创文章:转载请注明出自:http://blog.csdn.net/oopsoom/article/details/37658021
标签:style blog http io ar os java sp for
原文地址:http://www.cnblogs.com/hrhguanli/p/4084490.html