码迷,mamicode.com
首页 > 其他好文 > 详细

Spark读取Hbase中的数据_云帆大数据分享

时间:2015-04-07 20:01:18      阅读:285      评论:0      收藏:0      [点我收藏+]

标签:hadoop   云计算   spark   hdfs   大数据   mapreduce   

Spark读取Hbase中的数据

 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1)、调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:

1

JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3));

Scala版本如下:

1

val myRDD= sc.parallelize(List(1,2,3))

  这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFShdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。Java版如下:

01

/////////////////////////////////////////////////////////////////////

 

 

05

 http://www.yfteach.com

 

06


 

07

 云帆大数据博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货

 

08

 微信公共帐号:yfteach

 

09

/////////////////////////////////////////////////////////////////////

 

10

import org.apache.spark.SparkConf;

 

11

import org.apache.spark.api.java.JavaRDD;

 

12

import org.apache.spark.api.java.JavaSparkContext;

 

13


 

14

SparkConf conf = new SparkConf().setAppName("Simple Application");

 

15

JavaSparkContext sc = new JavaSparkContext(conf);

 

16

sc.addFile("wyp.data");

 

17

JavaRDD<String> lines = sc.textFile(SparkFiles.get("wyp.data"));

Scala版本如下:

1

import org.apache.spark.SparkContext

 

2

import org.apache.spark.SparkConf

 

3


 

4

val conf = new SparkConf().setAppName("Simple Application")

 

5

val sc = new SparkContext(conf)

 

6

sc.addFile("spam.data")

 

7

val inFile = sc.textFile(SparkFiles.get("spam.data"))

  在实际情况下,我们需要的数据可能不是简单的存放在HDFS文本中,我们需要的数据可能就存放在Hbase中,那么我们如何用Spark来读取Hbase中的数据呢?本文的所有测试是基于Hadoop 2.2.0Hbase 0.98.2Spark 0.9.1,不同版本可能代码的编写有点不同。本文只是简单地用Spark来读取Hbase中的数据,如果需要对Hbase进行更强的操作,本文可能不能帮你。话不多说,Spark操作Hbase的核心的Java版本代码如下:

01

import org.apache.hadoop.conf.Configuration;

 

02

import org.apache.hadoop.hbase.HBaseConfiguration;

 

03

import org.apache.hadoop.hbase.client.Result;

 

04

import org.apache.hadoop.hbase.client.Scan;

 

05

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

 

06

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

 

07

import org.apache.hadoop.hbase.protobuf.ProtobufUtil;

 

08

import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;

 

09

import org.apache.hadoop.hbase.util.Base64;

 

10

import org.apache.hadoop.hbase.util.Bytes;

 

11

import org.apache.spark.api.java.JavaPairRDD;

 

12

import org.apache.spark.api.java.JavaSparkContext;

 

13


31


23


 

24

JavaSparkContext sc = new JavaSparkContext(master, "hbaseTest",

 

25

                System.getenv("SPARK_HOME"), System.getenv("JARS"));

 

26


 

27

Configuration conf = HBaseConfiguration.create();

 

28

Scan scan = new Scan();

 

29

scan.addFamily(Bytes.toBytes("cf"));

 

30

scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("airName"));

 

 

32

try {

 

33

        String tableName = "flight_wap_order_log";

 

34

        conf.set(TableInputFormat.INPUT_TABLE, tableName);

 

35

        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);

 

36

        String ScanToString = Base64.encodeBytes(proto.toByteArray());

 

37

        conf.set(TableInputFormat.SCAN, ScanToString);

 

38


 

39

        JavaPairRDD<ImmutableBytesWritable, Result> myRDD =

 

40

                sc.newAPIHadoopRDD(conf,  TableInputFormat.class,

 

41

                ImmutableBytesWritable.class, Result.class);

 

42


 

43

catch (Exception e) {

 

44

            e.printStackTrace();

 

45

}

这样本段代码段是从Hbase表名为flight_wap_order_log的数据库中读取cf列簇上的airName一列的数据,这样我们就可以对myRDD进行相应的操作:

1

System.out.println(myRDD.count());

本段代码需要在pom.xml文件加入以下依赖:

01

<dependency>

 

02

        <groupId>org.apache.spark</groupId>

 

03

        <artifactId>spark-core_2.10</artifactId>

 

04

        <version>0.9.1</version>

 

05

</dependency>

 

06


 

07

<dependency>

 

08

        <groupId>org.apache.hbase</groupId>

 

09

        <artifactId>hbase</artifactId>

 

10

        <version>0.98.2-hadoop2</version>

 

11

</dependency>

 

12


 

13

<dependency>

 

14

        <groupId>org.apache.hbase</groupId>

 

15

        <artifactId>hbase-client</artifactId>

 

16

        <version>0.98.2-hadoop2</version>

 

17

</dependency>

 

18


 

19

<dependency>

 

20

        <groupId>org.apache.hbase</groupId>

 

21

        <artifactId>hbase-common</artifactId>

 

22

        <version>0.98.2-hadoop2</version>

 

23

</dependency>

 

24


 

25

<dependency>

 

26

        <groupId>org.apache.hbase</groupId>

 

27

        <artifactId>hbase-server</artifactId>

 

28

        <version>0.98.2-hadoop2</version>

 

29

</dependency>

Scala版如下:

01

import org.apache.spark._

 

02

import org.apache.spark.rdd.NewHadoopRDD

 

03

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}

 

04

import org.apache.hadoop.hbase.client.HBaseAdmin

 

05

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

 

06


22

    val conf = HBaseConfiguration.create()

15

/////////////////////////////////////////////////////////////////////

 

16


 

17

object HBaseTest {

 

18

  def main(args: Array[String]) {

 

19

    val sc = new SparkContext(args(0), "HBaseTest",

 

20

      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))

 

21


 

 

23

    conf.set(TableInputFormat.INPUT_TABLE, args(1))

 

24


 

25

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],

 

26

      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

 

27

      classOf[org.apache.hadoop.hbase.client.Result])

 

28


 

29

    hBaseRDD.count()

 

30


 

31

    System.exit(0)

 

32

  }

 

33

}

我们需要在加入如下依赖:

1

libraryDependencies ++= Seq(

 

2

        "org.apache.spark" % "spark-core_2.10" % "0.9.1",

 

3

        "org.apache.hbase" % "hbase" % "0.98.2-hadoop2",

 

4

        "org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2",

 

5

        "org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2",

 

6

        "org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2"

 

7

)

  在测试的时候,需要配置好HbaseHadoop环境,否则程序会出现问题,特别是让程序找到Hbase-site.xml配置文件

云帆大数据学院www.cloudyhadoop.com

详情请加入QQ374152400 ,咨询课程顾问!

关注云帆教育微信公众号yfteach,第一时间获取公开课信息。

 

 

 

 

 

 

 

 

 

 

 


本文出自 “云帆大数据” 博客,请务必保留此出处http://yfteach01.blog.51cto.com/9428662/1629668

Spark读取Hbase中的数据_云帆大数据分享

标签:hadoop   云计算   spark   hdfs   大数据   mapreduce   

原文地址:http://yfteach01.blog.51cto.com/9428662/1629668

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!