码迷,mamicode.com
首页 > 数据库 > 详细

Spark学习五:spark sql

时间:2016-05-09 07:02:30      阅读:450      评论:0      收藏:0      [点我收藏+]

标签:

Spark学习五:spark sql

标签(空格分隔): Spark


一,概述:

技术分享

技术分享

技术分享

二,Spark的发展历史

技术分享

技术分享

技术分享

技术分享

技术分享

三,Spark sql和hive对比

技术分享

技术分享

技术分享

四,spark sql 架构

技术分享

五,sprk sql访问hive数据

技术分享

技术分享

hive-site.xml需要拷贝到spark的conf目录下面

启动方式一:

//启动应用
bin/spark-shell --driver-class-path jars/mysql-connector-java-5.1.27-bin.jar --master local[2]
sqlContext.sql("show databases").show()

技术分享

sqlContext.sql("use default").show()

sqlContext.sql("show tables").show()

启动方式二:

//启动应用
bin/spark-sql --driver-class-path jars/mysql-connector-java-5.1.27-bin.jar --master local[2]
show databases;

技术分享

//缓存
cache table emp;
//取消缓存
uncache table emp;

技术分享

六,catalyst

技术分享

技术分享

技术分享

七,thriftserver

启动服务

sbin/start-thriftserver.sh --master local[2] --driver-class-path jars/mysql-connector-java-5.1.27-bin.jar

启动beeline客户端

bin/beeline
beeline> !connect jdbc:hive2://localhost:10000

技术分享

八,Dataframe

技术分享

技术分享

技术分享

技术分享

技术分享

技术分享

技术分享

技术分享

技术分享

技术分享

技术分享

技术分享

技术分享

九,加载外部数据源

1,加载json数据

val json_df=sqlContext.jsonFile("hdfs://study.com.cn:8020/spark/people.json")

json_df.show()

2,加载hive数据

sqlContext.table("default").show()

3,加载parquet格式数据

val parquet_df=sqlContext.jsonFile("hdfs://study.com.cn:8020/spark/users.parquet")
parquet_df.show()

4,jdbc方式获取数据

val df = sqlContext.jdbc("jdbc:mysql://localhost:3306/db_0306?user=root&password=123456", "my_user")

val mysql_df = sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://localhost:3306/db_0306?user=root&password=123456","dbtable" -> "my_user"))

5,读取text file
第一种方式:

case class Person(name:String,age:Int)
val people_rdd = sc.textFile("spark/sql/people.txt")
val rowRdd = people_rdd.map(x => x.split(",")).map(x => Person(x(0), x(1).trim.toInt))
val people_df=rowRdd.toDF()

第二种方式:

val people_rdd = sc.textFile("spark/sql/people.txt")
import org.apache.spark.sql._
val rowRdd = people_rdd.map(x => x.split(",")).map(x => Row(x(0), x(1).trim.toInt))

import org.apache.spark.sql.types._
val schema = StructType(Array(StructField("name",StringType, true), StructField("age", IntegerType, false)))

val rdd2df = sqlContext.createDataFrame(rowRdd, schema)

测试:

Spark SQL强大诞生了,

Hive Table
emp
MySQL Table
dept

针对上述两个表进行join,

val hive_emp_df = sqlContext.table("db_0228.emp")
val mysql_dept_df = sqlContext.jdbc("jdbc:mysql://localhost:3306/db_0306?user=root&password=123456", "tb_dept")
val join_df = hive_emp_df.join(mysql_dept_df, hive_emp_df("deptno") === mysql_dept_df("deptno"))
join_df.show

案例分析

SQLLogAnalyzer.scala

package com.ibeifeng.bigdata.spark.app

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}


/**
 * Created by XuanYu on 2016/4/17.
 */

object SQLLogAnalyzer {
  def main(args: Array[String]) {

    // create SparkConf instance
    val sparkConf = new SparkConf()
      .setAppName("SQLLogAnalyzer")
      .setMaster("local[2]")
    // create SparkContext instance
    val sc = new SparkContext(sparkConf)

    // create SQLcontext instance
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // ==============================================================
    // input files
    val logFile = "hdfs://bigdata-senior01.ibeifeng.com:8020/user/beifeng/apache.access.log" //

    //create rdd
    val accessLogs_df = sc.textFile(logFile)
      /**
       *  filter log datas
       */
      .filter(ApacheAccessLog.isValidateLogLine)
      /**
       * parse log
       */
      .map(log => ApacheAccessLog.parseLogLine(log))
      .toDF()

    accessLogs_df.registerTempTable("accessLogs")

    // cache
    accessLogs_df.cache()

// =======================================================================================

    // compute
    val avgContentSize = sqlContext.sql("select avg(contentSize) from accessLogs").first().get(0)
    val minContentSize = sqlContext.sql("select min(contentSize) from accessLogs").first().get(0)
    val maxcontentSize = sqlContext.sql("select max(contentSize) from accessLogs").first().get(0)

    // println
    println("Content Size Avg: %s, Min: %s , Max: %s".format(
      avgContentSize, minContentSize, maxcontentSize
    ))

    //
    accessLogs_df.unpersist()

    val avg_df = accessLogs_df.agg("contentSize" -> "avg")
    val min_df = accessLogs_df.agg("contentSize" -> "min")
    val max_df = accessLogs_df.agg("contentSize" -> "max")

    // println
    println(" === Content Size Avg: %s, Min: %s , Max: %s".format(
      avg_df.first().get(0),min_df.first().get(0),max_df.first().get(0)
    ))

    // ==============================================================

    // stop SparkContext
    sc.stop()
  }

}

Spark学习五:spark sql

标签:

原文地址:http://blog.csdn.net/youfashion/article/details/51348759

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!