码迷,mamicode.com
首页 > 其他好文 > 详细

spark 数据写入到 hbase

时间:2015-06-23 14:58:59      阅读:516      评论:0      收藏:0      [点我收藏+]

标签:

1)spark把数据写入到hbase需要用到:PairRddFunctions的saveAsHadoopDataset方法,这里用到了 implicit conversion,需要我们引入

import org.apache.spark.SparkContext._

2)spark写入hbase,实质是借用了org.apache.hadoop.hbase.mapreduce.TableInputFormat这个对象,用其内部的recorderWriter将数据写入hbase

同时,也借用了hadoop的JobConf,配置和写MR的配置方式一样

3)请看下面代码,这里使用sparksql从hive里面读出数据,经过处理,写入到hbase

    //创建jobConf
    val conf = HBaseConfiguration.create()
    val jobConf = new JobConf(conf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")

    //创建hiveContext
    val sparkConf = new SparkConf().setAppName("test")
    val sc = new SparkContext(sparkConf)
    @transient  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.setConf("spark.sql.shuffle.partitions","3")

    //保存到hbase
    val rdd = sqlContext.sql("select C1,C2,C3 from test")
      .map(row => {
       val c1 = row(0).asInstanceOf[String]
       val c2 = row(1).asInstanceOf[String]
       val c3 = row(2).asInstanceOf[String]
       val p = new Put(Bytes.toBytes(c1))
       p.add(Bytes.toBytes("f"),Bytes.toBytes("c2"),Bytes.toBytes(c2))
       p.add(Bytes.toBytes("f"),Bytes.toBytes("c3"),Bytes.toBytes(c3))
       (new ImmutableBytesWritable,p) 
     }).saveAsHadoopDataset(jobConf)

 

spark 数据写入到 hbase

标签:

原文地址:http://www.cnblogs.com/smartcuning/p/4595239.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!