码迷,mamicode.com
首页 > 数据库 > 详细

Spark Streaming结合Spark JDBC External DataSouces处理案例

时间:2015-01-26 15:03:00      阅读:275      评论:0      收藏:0      [点我收藏+]

标签:

场景:使用Spark Streaming接收实时数据与关系型数据库中的表进行相关的查询操作;

使用技术:Spark Streaming + Spark JDBC External DataSources

代码雏形:

package com.luogankun.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext

case class Student(id: Int, name: String, cityId: Int)

object HDFSStreaming {
  def main(args: Array[String]) {

    val location = args(0)  //HDFS文件路径

    val sparkConf = new SparkConf().setAppName("HDFS JDBC Streaming")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))

    val sqlContext = new HiveContext(sc)
    import sqlContext.createSchemaRDD

    import  com.luogankun.spark.jdbc._

//使用External Data Sources处理MySQL中的数据 val cities
= sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root","root","select id, name from city")

//将cities RDD注册成city临时表 cities.registerTempTable(
"city") val inputs = ssc.textFileStream(location) inputs.foreachRDD(rdd => { if (rdd.partitions.length > 0) {
//将Streaming中接收到的数据注册成student临时表 rdd.map(_.split(
"\t")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student");

//关联Streaming和MySQL表进行查询操作 sqlContext.sql(
"select s.id, s.name, s.cityId, c.name from student s join city_table c on s.cityId=c.id").collect().foreach(println) } }) ssc.start() ssc.awaitTermination() } }

 

提交到Spark集群处理脚本:

spark-submit --name SparkSubmit_Demo --class com.luogankun.spark.streaming.HDFSStreaming --master spark://hadoop000:7077 \
--executor-memory 1G --total-executor-cores 1 /home/spark/lib/streaming.jar hdfs://hadoop000:8020/data/hdfs

 

Spark Streaming结合Spark JDBC External DataSouces处理案例

标签:

原文地址:http://www.cnblogs.com/luogankun/p/4250297.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!