========== Spark SQL ==========
1、Spark SQL 是 Spark 的一个模块,可以和 RDD 进行混合编程、支持标准的数据源、可以集成和替代 Hive、可以提供 JDBC、ODBC 服务器功能。
2、Spark SQL 的特点:
(1)和 Spark Core 的无缝集成,可以在写整个 RDD 应用的时候,配合 Spark SQL 来实现逻辑。
(2)统一的数据访问方式,Spark SQL 提供标准化的 SQL 查询。
(3)Hive 的集成,Spark SQL 通过内嵌的 Hive 或者连接外部已经部署好的 Hive 实例,实现了对 Hive 语法的集成和操作。
(4)标准化的连接方式,Spark SQL 可以通过启动 thrift Server 来支持 JDBC、ODBC 的访问,即将自己作为一个 BI Server 来使用。
3、Spark SQL 可以执行 SQL 语句,也可以执行 HQL 语句,将运行的结果作为 Dataset 和 DataFrame(将查询出来的结果转换成 RDD,类似于 hive 将 sql 语句转换成 mapreduce)。
4、Spark SQL 的计算速度(Spark sql 比 Hive 快了至少一个数量级,尤其是在 Tungsten 成熟以后会更加无可匹敌),Spark SQL 推出的 DataFrame 可以让数据仓库直接使用机器学习、图计算等复杂的算法库来对数据仓库进行复杂深度数据价值的挖掘。
5、老版本中使用 hivecontext,现在使用 sparkSession。
========== Spark SQL 的数据抽象 ==========
0、RDD(Spark1.0)-> DataFrame(Spark1.3)-> DataSet(Spark1.6)
1、Spark SQL 提供了 DataFrame 和 DataSet 数据抽象。
2、DataFrame 就是 RDD + Schema,可以认为是一张二维表格。DataFrame 也是懒执行的、不可变的。DataFrame 性能上比 RDD 要高。
3、DataFrame 是一个弱类型的数据对象,DataFrame 的劣势是在编译期不进行表格中的字段的类型检查。在运行期进行检查。类似于 java.sql.ResultSet 类,只能通过 getString 这种方式来获取具体数据。
4、DataSet 是 Spark 最新的数据抽象,Spark 的发展会逐步将 DataSet 作为主要的数据抽象,弱化 RDD 和 DataFrame。DataSet 包含了 DataFrame 所有的优化机制。除此之外提供了以样例类为 Schema 模型的强类型。
5、type DataFrame = Dataset[Row]
6、DataFrame 和 DataSet 都有可控的内存管理机制,所有数据都保存在非堆内存
上,节省了大量空间之外,还摆脱了GC的限制。都使用了 catalyst 进行 SQL 的优化。可以使得不太会使用 RDD 的工程师写出相对高效的代码。
7、RDD 和 DataFrame 和 DataSet 之间可以进行数据转换。
========== Spark SQL 的初探 -- 客户端查询 ==========
1、你可以通过 spark-shell 或者 spark-sql 来操作 Spark SQL,注意:spark 作为 SparkSession 的变量名,sc 作为 SparkContext 的变量名。
2、你可以通过 Spark 提供的方法读取 JSON 文件,将 JSON 文件转换成 DataFrame。
3、你可以通过 DataFrame 提供的 API 来操作 DataFrame 里面的数据。
4、你可以通过将 DataFrame 注册成为一个临时表的方式,来通过 Spark.sql 方法运行标准的 SQL 语句来查询。
小细节:
show() --> 表格
collect() --> RDD 打印
========== IDEA 创建 Spark SQL 程序 ==========
1、Spark SQL 读取 json 需要 json 文件中一行是一个 json 对象。
2、通过创建 SparkSession 来使用 SparkSQL:
示例代码如下:
package com.atguigu.sparksql
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object HelloWorld {
val logger = LoggerFactory.getLogger(HelloWorld.getClass)
def main(args: Array[String]) {
// 创建 SparkSession 并设置 App 名称
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// 通过隐式转换将 RDD 操作添加到 DataFrame 上(将 RDD 转成 DataFrame)
import spark.implicits._
// 通过 spark.read 操作读取 JSON 数据
val df = spark.read.json("examples/src/main/resources/people.json")
// show 操作类似于 Action,将 DataFrame 直接打印到 Console 上
df.show()
// DSL 风格的使用方式:属性的获取方法 $
df.filter($"age" > 21).show()
//将 DataFrame 注册为表
df.createOrReplaceTempView("persons")
// 执行 Spark SQL 查询操作
spark.sql("select * from perosns where age > 21").show()
// 关闭资源
spark.stop()
}
}
========== DataFrame 查询方式 ==========
1、DataFrame 支持两种查询方式:一种是 DSL 风格,另外一种是 SQL 风格。
DSL 风格:
(1)你需要引入 import spark.implicit._ 这个隐式转换,可以将 DataFrame 隐式转换成 RDD。
示例:
df.select("name").show()
df.filter($"age" > 25).show()
SQL 风格:
(1)你需要将 DataFrame 注册成一张表格,如果你通过 createOrReplaceTempView 这种方式来创建,那么该表当前 Session 有效,如果你通过 createGlobalTempView 来创建,那么该表跨 Session 有效,但是 SQL 语句访问该表的时候需要加上前缀 global_temp.xxx。
(2)你需要通过 sparkSession.sql 方法来运行你的 SQL 语句。
示例:
一个 SparkContext 可以多次创建 SparkSession。
// Session 内可访问,一个 SparkSession 结束后,表自动删除。
df.createOrReplaceTempView("persons") // 使用表名不需要任何前缀
// 应用级别内可访问,一个 SparkContext 结束后,表自动删除。
df.createGlobalTempView("persons") // 使用表名需要加上“global_temp.” 前缀,比如:global_temp.persons
========== DataSet 创建方式 ==========
1、定义一个 DataSet,首先你需要先定义一个 case 类。
========== RDD、DataFrame、DataSet 之间的转换总结 ==========
1、RDD -> DataFrame : rdd.map(para => (para(0).trim(), para(1).trim().toInt)).toDF("name", "age") // RDD -> 元组 -> toDF()(注意:这是第一种方式)
2、DataFrame -> RDD : df.rdd 注意输出类型
:res2: Array[org.apache.spark.sql.Row] = Array([Michael,29], [Andy,30], [Justin,19])
1、 RDD -> DataSet : rdd.map(para => Person(para(0).trim(), para(1).trim().toInt)).toDS() // 需要先定义样例类 -> toDS()
2、 DataSet -> RDD : ds.rdd注意输出类型
:res5: Array[Person] = Array(Person(Michael,29), Person(Andy,30), Person(Justin,19))
1、 DataFrame -> DataSet : df.as[Person] // 传入类型
2、 DataSet -> DataFrame : ds.toDF()
========== DataFrame 的 Schema 的获取方式 ==========
RDD -> DataFram 的三种方式:
// 将没有包含 case 类的 RDD 转换成 DataFrame
rdd.map(para => (para(0).trim(), para(1).trim().toInt)).toDF("name", "age") // RDD -> 元组 -> toDF()(注意:这是第一种方式)
// 将包含有 case 类的 RDD 转换成 DataFrame,注意:需要我们先定义 case 类
// 通过反射的方式来设置 Schema 信息,适合于编译期能确定列的情况
rdd.map(attributes => Person(attributes(0), attributes(1).trim().toInt)).toDF() // 样例类-> RDD -> toDF()(注意:这是第二种方式)
// 通过编程的方式来设置 Schema 信息,适合于编译期不能确定列的情况(注意:这是第三种方式)
val schemaString = "name age" // 实际开发中 schemaString 是动态生成的
val fields = schemaString.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rdd[Row] = rdd.map(attributes => Row(attributes(0.trim), attributes(1).trim))
val peopeDF = spark.createDataFrame(rdd[Row], schema)
========== 对于 DataFrame Row 对象的访问方式 ==========
1、由 DataFrame = Dataset[Row] 可知, DataFrame 里面每一行都是 Row 对象。
2、如果需要访问 Row 对象中的每一个元素,可以通过索引 row(0);也可以通过列名 row.getAsString 或者索引 row.getAsInt。
========== 应用 UDF 函数(用户自定义函数) ==========
1、通过 spark.udf.register(funcName, func) 来注册一个 UDF 函数,name 是 UDF 调用时的标识符,即函数名,fun 是一个函数,用于处理字段。
2、你需要将一个 DF 或者 DS 注册为一个临时表。
3、通过 spark.sql 去运行一个 SQL 语句,在 SQL 语句中可以通过 funcName(列名) 方式来应用 UDF 函数。
示例代码如下:
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
scala> spark.udf.register("addName", (x: String) => "Name:" + x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select addName(name), age from people").show()
scala> spark.sql("select addName(name) as newName, age from people").show()
========== 应用 UDAF 函数(用户自定义聚合函数) ==========
1、弱类型用户自定义聚合函数
步骤如下:
(1)新建一个 Class 继承UserDefinedAggregateFunction,然后复写方法:
// 聚合函数需要输入参数的数据类型
override def inputSchema: StructType = ???
// 聚合缓冲区中值的数据类型
override def bufferSchema: StructType = ???
// 返回值的数据类型
override def dataType: DataType = ???
// 对于相同的输入一直有相同的输出
override def deterministic: Boolean = true
// 用于初始化你的数据结构
override def initialize(buffer: MutableAggregationBuffer): Unit = ???
// 相同 Execute 间的数据合并(同一分区)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???
// 不同 Execute 间的数据合并(不同分区)
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???
// 计算最终结果
override def evaluate(buffer: Row): Any = ???
(2)你需要通过 spark.udf.resigter 去注册你的 UDAF 函数。
(3)需要通过 spark.sql 去运行你的 SQL 语句,可以通过 select UDAF(列名) 来应用你的用户自定义聚合函数。
2、强类型的用户自定义聚合函数
步骤如下:
(1)新建一个class,继承Aggregator[Employee, Average, Double]
其中 Employee 是在应用聚合函数的时候传入的对象,Average 是聚合函数在运行的时候内部需要的数据结构,Double 是聚合函数最终需要输出的类型。这些可以根据自己的业务需求去调整。
复写相对应的方法:
// 用于定义一个聚合函数内部需要的数据结构
override def zero: Average = ???
// 针对每个分区内部每一个输入来更新你的数据结构
override def reduce(b: Average, a: Employee): Average = ???
// 用于对于不同分区的结构进行聚合
override def merge(b1: Average, b2: Average): Average = ???
// 计算输出
override def finish(reduction: Average): Double = ???
// 设定之间值类型的编码器,要转换成 case 类
// Encoders.product 是进行 scala 元组和 case 类转换的编码器
override def bufferEncoder: Encoder[Average] = ???
// 设定最终输出值的编码器
override def outputEncoder: Encoder[Double] = ???
2、新建一个 UDAF 实例,通过 DF 或者 DS 的 DSL 风格语法去应用。
========== Spark SQL 的输入和输出 ==========
1、对于 Spark SQL 的输入需要使用 sparkSession.read 方法
(1)通用模式 sparkSession.read.format("json").load("path") 支持的类型有:parquet、json、text、csv、orc、jdbc、......
(2)专业模式 sparkSession.read.json("path") 或 csv 或 ... 即直接指定类型
2、对于 Spark SQL 的输出需要使用 sparkSession.write 方法
(1)通用模式 dataFrame.write.format("json").save("path") 支持的类型有:parquet、json、text、csv、orc、jdbc、......
(2)专业模式 dataFrame.write.csv("path") 或 json 或 ... 即直接指定类型
3、如果使用通用模式,则 spark 默认的 parquet 是默认格式,那么 sparkSession.read.load 它加载的默认是 parquet 格式;dataFrame.write.save 也是默认保存成 parquet 格式。
4、注意
:如果需要保存成一个 text 文件,那么需要 dataFrame 里面只有一列数据。
========== Spark SQL 与 Hive 的集成 ==========
内置 Hive
1、Spark 内置有 Hive,Spark 2.1.1 内置的 Hive 是 1.2.1。
2、如果要使用内嵌的 Hive,什么都不用做,直接用就可以了。但是呢,此时的我们只能创建表,如果查询表的话会报错,原因是:本地有 spark-warehouse 目录,而其他机器节点没有 spark-warehouse 目录。解决办法如下:
3、需要将 core-site.xml 和 hdfs-site.xml 拷贝到 spark 的 conf 目录下,然后分发至其他机器节点。如果 spark 路径下发现有 metastore_db 和 spark-warehouse,删除掉。然后重启集群。
4、在第一次启动创建 metastore 的时候,需要指定 spark.sql.warehouse.dir 这个参数,
比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://hadoop102:9000/spark_warehouse
5、注意
:如果在 load 数据的时候,需要先将数据放到 HDFS 上。
外部 Hive
1、需要将 hive-site.xml 拷贝到 spark 的 conf 目录下,然后分发至其他机器节点。
2、如果 hive 的 metestore 使用的是 mysql 数据库,那么需要将 mysql 的 jdbc 驱动包放到 spark 的 jars 目录下。
3、可以通过 spark-sql 或者 spark-shell 来进行 sql 的查询,完成和 hive 的连接。
hive、spark、hdfs 关系:
spark 文件中有两个文件夹:spark-warehouse、metastore_db,当我们拷贝 hive-site.xml 文件到 spark 的 conf 目录后,会读取 Hive 中的 warehouse 文件,获取到 hive 中的表格数据。