码迷,mamicode.com
首页 > 其他好文 > 详细

Spark操作HBase

时间:2020-02-08 00:03:34      阅读:53      评论:0      收藏:0      [点我收藏+]

标签:scala   tab   操作   format   empty   obj   ble   value   out   

Spark写HBase

  要通过Spark向 HBase 写入数据,我们需要用到PairRDDFunctions.saveAsHadoopDataset的方式

package cn.com.win

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}

object TestHbase {

  def main(args: Array[String]) {

    val log = Logger.getLogger("TestHbase")
    //初始化Spark
    val conf = new SparkConf().setMaster("local[2]").setAppName("testHbase")
    val sc = new SparkContext(conf)

   // 定义HBase 的配置 val hconf = HBaseConfiguration.create() val jobConf = new JobConf(hconf, this.getClass)
   // 指定输出格式和输出的表名 jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "wifiTarget") val arr = Array(("tjiloaB#3#20190520", 10, 11), ("tjiloaB#3#20190521", 12, 22), ("tjiloaB#3#20190522", 13, 42)) val rdd = sc.parallelize(arr) val localData = rdd.map(convert) localData.saveAsHadoopDataset(jobConf) sc.stop() }  

// 定义函数 RDD -> RDD[(ImmutableBytesWritable,Put)] def convert(triple: (String, Int, Int)) = { val p = new Put(Bytes.toBytes(triple._1)) p.addColumn(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("inNum"), Bytes.toBytes(triple._2)) p.addColumn(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("outNum"), Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, p) } }

  执行结果:

  技术图片

 

 

 Spark读取HBase

  Spark读取HBase,我们主要使用SparkContext 提供的newAPIHadoopRDDAPI将表的内容以 RDDs 的形式加载到 Spark 中。

指定列:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * AUTHOR Guozy
  * DATE   2020/2/7-0:33
  **/
object TestHbase2 {
  def main(args: Array[String]): Unit = {

    //初始化Spark
    val conf = new SparkConf().setMaster("local[2]").setAppName("testHbase")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)

    val scan = new Scan()
    val filter = new PrefixFilter("tjiloaB#3#20190520".getBytes())
    scan.setFilter(filter)
    val hconf = HBaseConfiguration.create()
    hconf.set(TableInputFormat.INPUT_TABLE, "wifiTarget")
    hconf.set(TableInputFormat.SCAN, convertScanToString(scan))

    val dataRdd = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    val count = dataRdd.count()
    println("dataRdd Count is " + count)
    dataRdd.cache()

    dataRdd.map(_._2).filter(!_.isEmpty).take(20).foreach { result =>
      val key = Bytes.toString(result.getRow)
      val innum = Bytes.toInt(result.getValue(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("inNum")))
      val outnum = Bytes.toInt(result.getValue(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("outNum")))
      println(s"key:${key},inNum:${innum},outNum:${outnum}")
    }
    sc.stop()
  }

  /**
    * 将Scan转换为String
    */
  def convertScanToString(scan: Scan): String = {
    val proto = ProtobufUtil.toScan(scan);
    Base64.encodeBytes(proto.toByteArray());
  }

运行结果:

技术图片

循环遍历列:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * AUTHOR Guozy
  * DATE   2020/2/7-0:33
  **/
object TestHbase2 {
  def main(args: Array[String]): Unit = {

    //初始化Spark
    val conf = new SparkConf().setMaster("local[2]").setAppName("testHbase")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)

    val scan = new Scan()
    val filter = new PrefixFilter("tjiloaB#3#20190520".getBytes())
    scan.setFilter(filter)
    val hconf = HBaseConfiguration.create()
    hconf.set(TableInputFormat.INPUT_TABLE, "wifiTarget")
    hconf.set(TableInputFormat.SCAN, convertScanToString(scan))

    val dataRdd = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    val count = dataRdd.count()
    println("dataRdd Count is " + count)
    dataRdd.cache()

    dataRdd.map(_._2).filter(!_.isEmpty).take(20).foreach { result =>
      val key = Bytes.toString(result.getRow)
      val cells = result.listCells().iterator()
         while (cells.hasNext) {
           val cell = cells.next()
           val innum = Bytes.toInt(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
           val outnum = Bytes.toInt(result.getValue(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("outNum")))
           println(s"key:${key},inNum:${innum},outNum:${outnum}")
         }
    }
    sc.stop()
  }

  /**
    * 将Scan转换为String
    */
  def convertScanToString(scan: Scan): String = {
    val proto = ProtobufUtil.toScan(scan);
    Base64.encodeBytes(proto.toByteArray());
  }

运行结果

技术图片

 

 

注意:在导入包的时候,TableInputFormat对应的包是 org.apache.hadoop.hbase.mapreduce,而不是 org.apache.hadoop.hbase.maped

 

Spark操作HBase

标签:scala   tab   操作   format   empty   obj   ble   value   out   

原文地址:https://www.cnblogs.com/Gxiaobai/p/12275218.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!