标签:
场景:使用Spark Streaming接收实时数据与关系型数据库中的表进行相关的查询操作;
使用技术:Spark Streaming + Spark JDBC External DataSources
代码雏形:
package com.luogankun.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{ Seconds, StreamingContext } import org.apache.spark.sql.hive.HiveContext import org.apache.spark.SparkContext case class Student(id: Int, name: String, cityId: Int) object HDFSStreaming { def main(args: Array[String]) { val location = args(0) //HDFS文件路径 val sparkConf = new SparkConf().setAppName("HDFS JDBC Streaming") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(5)) val sqlContext = new HiveContext(sc) import sqlContext.createSchemaRDD import com.luogankun.spark.jdbc._
//使用External Data Sources处理MySQL中的数据 val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root","root","select id, name from city")
//将cities RDD注册成city临时表 cities.registerTempTable("city") val inputs = ssc.textFileStream(location) inputs.foreachRDD(rdd => { if (rdd.partitions.length > 0) {
//将Streaming中接收到的数据注册成student临时表 rdd.map(_.split("\t")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student");
//关联Streaming和MySQL表进行查询操作 sqlContext.sql("select s.id, s.name, s.cityId, c.name from student s join city_table c on s.cityId=c.id").collect().foreach(println) } }) ssc.start() ssc.awaitTermination() } }
提交到Spark集群处理脚本:
spark-submit --name SparkSubmit_Demo --class com.luogankun.spark.streaming.HDFSStreaming --master spark://hadoop000:7077 \ --executor-memory 1G --total-executor-cores 1 /home/spark/lib/streaming.jar hdfs://hadoop000:8020/data/hdfs
Spark Streaming结合Spark JDBC External DataSouces处理案例
标签:
原文地址:http://www.cnblogs.com/luogankun/p/4250297.html