标签:spark sql spark 分布式计算 sql catalyst
Spark SQL的核心执行流程我们已经分析完毕,可以参见Spark SQL核心执行流程,下面我们来分析执行流程中各个核心组件的工作职责。
本文先从入口开始分析,即如何解析SQL文本生成逻辑计划的,主要设计的核心组件式SqlParser是一个SQL语言的解析器,用scala实现的Parser将解析的结果封装为Catalyst TreeNode ,关于Catalyst这个框架后续文章会介绍。
先来看流程图:
一段SQL会经过SQL Parser解析生成UnResolved Logical Plan。
在源代码里是:
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))//sql("select name,value from temp_shengli") 实例化一个SchemaRDD protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql) //实例化SqlParser class SqlParser extends StandardTokenParsers with PackratParsers { def apply(input: String): LogicalPlan = { //传入sql语句调用apply方法,input参数即sql语句 // Special-case out set commands since the value fields can be // complex to handle without RegexParsers. Also this approach // is clearer for the several possible cases of set commands. if (input.trim.toLowerCase.startsWith("set")) { input.trim.drop(3).split("=", 2).map(_.trim) match { case Array("") => // "set" SetCommand(None, None) case Array(key) => // "set key" SetCommand(Some(key), None) case Array(key, value) => // "set key=value" SetCommand(Some(key), Some(value)) } } else { phrase(query)(new lexical.Scanner(input)) match { case Success(r, x) => r case x => sys.error(x.toString) } } }
1. 当我们调用sql("select name,value from temp_shengli")时,实际上是new了一个SchemaRDD
2. new SchemaRDD时,构造方法调用parseSql方法,parseSql方法实例化了一个SqlParser,这个Parser初始化调用其apply方法。
3. apply方法分支:
3.1 如果sql命令是set开头的就调用SetCommand,这个类似Hive里的参数设定,SetCommand其实是一个Catalyst里TreeNode之LeafNode,也是继承自LogicalPlan,关于Catalyst的TreeNode库这个暂不详细介绍,后面会有文章来详细讲解。
3.2 关键是else语句块里,才是SqlParser解析SQL的核心代码:
phrase(query)(new lexical.Scanner(input)) match { case Success(r, x) => r case x => sys.error(x.toString) }可能 phrase方法大家很陌生,不知道是干什么的,那么我们首先看一下SqlParser的类图:
SqlParser类继承了scala内置集合Parsers,这个Parsers。我们可以看到SqlParser现在是具有了分词的功能,也能解析combiner的语句(类似p ~> q,后面会介绍)。
Phrase方法:
/** A parser generator delimiting whole phrases (i.e. programs). * * `phrase(p)` succeeds if `p` succeeds and no input is left over after `p`. * * @param p the parser that must consume all input for the resulting parser * to succeed. * @return a parser that has the same result as `p`, but that only succeeds * if `p` consumed all the input. */ def phrase[T](p: Parser[T]) = new Parser[T] { def apply(in: Input) = lastNoSuccessVar.withValue(None) { p(in) match { case s @ Success(out, in1) => if (in1.atEnd) s else lastNoSuccessVar.value filterNot { _.next.pos < in1.pos } getOrElse Failure("end of input expected", in1) case ns => lastNoSuccessVar.value.getOrElse(ns) } } }
我们注意到Success这个类,出现在Parser里, 在else块里最终返回的也有Success:
/** The success case of `ParseResult`: contains the result and the remaining input. * * @param result The parser's output * @param next The parser's remaining input */ case class Success[+T](result: T, override val next: Input) extends ParseResult[T] {通过源码可知,Success封装了当前解析器的解析结果result, 和还没有解析的语句。
所以上面判断了Success的解析结果中in1.atEnd? 如果输入流结束了,就返回s,即Success对象,这个Success包含了SqlParser解析的输出。
在SqlParser里phrase接受2个参数:
第一个是query,一种带模式的解析规则,返回的是LogicalPlan。
第二个是lexical词汇扫描输入。
SqlParser parse的流程是,用lexical词汇扫描接受SQL关键字,使用query模式来解析符合规则的SQL。
protected case class Keyword(str: String)在我使用的spark1.0.0版本里目前只支持了一下SQL保留字:
protected val ALL = Keyword("ALL") protected val AND = Keyword("AND") protected val AS = Keyword("AS") protected val ASC = Keyword("ASC") protected val APPROXIMATE = Keyword("APPROXIMATE") protected val AVG = Keyword("AVG") protected val BY = Keyword("BY") protected val CACHE = Keyword("CACHE") protected val CAST = Keyword("CAST") protected val COUNT = Keyword("COUNT") protected val DESC = Keyword("DESC") protected val DISTINCT = Keyword("DISTINCT") protected val FALSE = Keyword("FALSE") protected val FIRST = Keyword("FIRST") protected val FROM = Keyword("FROM") protected val FULL = Keyword("FULL") protected val GROUP = Keyword("GROUP") protected val HAVING = Keyword("HAVING") protected val IF = Keyword("IF") protected val IN = Keyword("IN") protected val INNER = Keyword("INNER") protected val INSERT = Keyword("INSERT") protected val INTO = Keyword("INTO") protected val IS = Keyword("IS") protected val JOIN = Keyword("JOIN") protected val LEFT = Keyword("LEFT") protected val LIMIT = Keyword("LIMIT") protected val MAX = Keyword("MAX") protected val MIN = Keyword("MIN") protected val NOT = Keyword("NOT") protected val NULL = Keyword("NULL") protected val ON = Keyword("ON") protected val OR = Keyword("OR") protected val OVERWRITE = Keyword("OVERWRITE") protected val LIKE = Keyword("LIKE") protected val RLIKE = Keyword("RLIKE") protected val UPPER = Keyword("UPPER") protected val LOWER = Keyword("LOWER") protected val REGEXP = Keyword("REGEXP") protected val ORDER = Keyword("ORDER") protected val OUTER = Keyword("OUTER") protected val RIGHT = Keyword("RIGHT") protected val SELECT = Keyword("SELECT") protected val SEMI = Keyword("SEMI") protected val STRING = Keyword("STRING") protected val SUM = Keyword("SUM") protected val TABLE = Keyword("TABLE") protected val TRUE = Keyword("TRUE") protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE")这里根据这些保留字,反射,生成了一个SqlLexical
override val lexical = new SqlLexical(reservedWords)SqlLexical利用它的Scanner这个Parser来读取输入,传递给query。
protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert | cache )没错,返回的是一个Parser,里面的类型是LogicalPlan。
select a,b from c union all select e,f from g这个 *号是一个repeat符号,即可以支持多个union all 子句。
protected lazy val select: Parser[LogicalPlan] = SELECT ~> opt(DISTINCT) ~ projections ~ opt(from) ~ opt(filter) ~ opt(grouping) ~ opt(having) ~ opt(orderBy) ~ opt(limit) <~ opt(";") ^^ { case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l => val base = r.getOrElse(NoRelation) val withFilter = f.map(f => Filter(f, base)).getOrElse(base) val withProjection = g.map {g => Aggregate(assignAliases(g), assignAliases(p), withFilter) }.getOrElse(Project(assignAliases(p), withFilter)) val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection) val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct) val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving) val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder) withLimit }这里我给称它为select模式。
select game_id, user_name from game_log where date<='2014-07-19' and user_name='shengli' group by game_id having game_id > 1 orderBy game_id limit 50.
protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",") protected lazy val projection: Parser[Expression] = expression ~ (opt(AS) ~> opt(ident)) ^^ { case e ~ None => e case e ~ Some(a) => Alias(e, a)() }
protected lazy val from: Parser[LogicalPlan] = FROM ~> relations
protected lazy val relation: Parser[LogicalPlan] = joinedRelation | relationFactor protected lazy val relationFactor: Parser[LogicalPlan] = ident ~ (opt(AS) ~> opt(ident)) ^^ { case tableName ~ alias => UnresolvedRelation(None, tableName, alias) } | "(" ~> query ~ ")" ~ opt(AS) ~ ident ^^ { case s ~ _ ~ _ ~ a => Subquery(a, s) } protected lazy val joinedRelation: Parser[LogicalPlan] = relationFactor ~ opt(joinType) ~ JOIN ~ relationFactor ~ opt(joinConditions) ^^ { case r1 ~ jt ~ _ ~ r2 ~ cond => Join(r1, r2, joinType = jt.getOrElse(Inner), cond) }
case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode { override def output = child.output.map(_.withQualifiers(alias :: Nil)) override def references = Set.empty }
标签:spark sql spark 分布式计算 sql catalyst
原文地址:http://blog.csdn.net/oopsoom/article/details/37943507