标签:select 问题 sys ali org deb evel from 情况下
大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1)、调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:
| JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); | 
Scala版本如下:
| val myRDD= sc.parallelize(List(1,2,3)) | 
这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。Java版如下:
| ///////////////////////////////////////////////////////////////////// User: 过往记忆 Date: 14-6-29 Time: 23:59 bolg:  本文地址:/archives/1051 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 过往记忆博客微信公共帐号:iteblog_hadoop/////////////////////////////////////////////////////////////////////importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;SparkConf conf = newSparkConf().setAppName("Simple Application");JavaSparkContext sc = newJavaSparkContext(conf);sc.addFile("wyp.data");JavaRDD<String> lines = sc.textFile(SparkFiles.get("wyp.data")); | 
Scala版本如下:
| importorg.apache.spark.SparkContextimportorg.apache.spark.SparkConfval conf = newSparkConf().setAppName("Simple Application")val sc = newSparkContext(conf)sc.addFile("spam.data")val inFile = sc.textFile(SparkFiles.get("spam.data")) | 
在实际情况下,我们需要的数据可能不是简单的存放在HDFS文本中,我们需要的数据可能就存放在Hbase中,那么我们如何用Spark来读取Hbase中的数据呢?本文的所有测试是基于Hadoop 2.2.0、Hbase 0.98.2、Spark 0.9.1,不同版本可能代码的编写有点不同。本文只是简单地用Spark来读取Hbase中的数据,如果需要对Hbase进行更强的操作,本文可能不能帮你。话不多说,Spark操作Hbase的Java版本代码如下:
package com.iteblog.spark; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Serializable; import scala.Tuple2; import java.io.IOException; import java.util.List; /** * User: iteblog * Date: 14-6-27 * Time: 下午5:18 *blog: http://www.iteblog.com * * Usage: bin/spark-submit --master yarn-cluster --class com.iteblog.spark.SparkFromHbase * --jars /home/q/hbase/hbase-0.96.0-hadoop2/lib/htrace-core-2.01.jar, * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-common-0.96.0-hadoop2.jar, * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-client-0.96.0-hadoop2.jar, * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-protocol-0.96.0-hadoop2.jar, * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-server-0.96.0-hadoop2.jar * ./spark_2.10-1.0.jar */ public class SparkFromHbase implements Serializable { /** * copy from org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil * * @param scan * @return * @throws IOException */ String convertScanToString(Scan scan) throws IOException { ClientProtos.Scan proto = ProtobufUtil.toScan(scan); return Base64.encodeBytes(proto.toByteArray()); } public void start() { SparkConf sparkConf = new SparkConf(); JavaSparkContext sc = new JavaSparkContext(sparkConf); Configuration conf = HBaseConfiguration.create(); Scan scan = new Scan(); //scan.setStartRow(Bytes.toBytes("195861-1035177490")); //scan.setStopRow(Bytes.toBytes("195861-1072173147")); scan.addFamily(Bytes.toBytes("cf")); scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col_1")); try { String tableName = "wyp"; conf.set(TableInputFormat.INPUT_TABLE, tableName); conf.set(TableInputFormat.SCAN, convertScanToString(scan)); JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); JavaPairRDD<String, Integer> levels = hBaseRDD.mapToPair( new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception { byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("col_1")); if (o != null) { return new Tuple2<String, Integer>(new String(o), 1); } return null; } }); JavaPairRDD<String, Integer> counts = levels.reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2 tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } sc.stop(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { new SparkFromHbase().start(); System.exit(0); } }
这样本段代码段是从Hbase表名为flight_wap_order_log的数据库中读取cf列簇上的airName一列的数据,这样我们就可以对myRDD进行相应的操作:
| System.out.println(myRDD.count()); | 
本段代码需要在pom.xml文件加入以下依赖:
| <dependency>        <groupId>org.apache.spark</groupId>        <artifactId>spark-core_2.10</artifactId>        <version>0.9.1</version></dependency><dependency>        <groupId>org.apache.hbase</groupId>        <artifactId>hbase</artifactId>        <version>0.98.2-hadoop2</version></dependency><dependency>        <groupId>org.apache.hbase</groupId>        <artifactId>hbase-client</artifactId>        <version>0.98.2-hadoop2</version></dependency><dependency>        <groupId>org.apache.hbase</groupId>        <artifactId>hbase-common</artifactId>        <version>0.98.2-hadoop2</version></dependency><dependency>        <groupId>org.apache.hbase</groupId>        <artifactId>hbase-server</artifactId>        <version>0.98.2-hadoop2</version></dependency> | 
Scala版如下:
| importorg.apache.spark._importorg.apache.spark.rdd.NewHadoopRDDimportorg.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}importorg.apache.hadoop.hbase.client.HBaseAdminimportorg.apache.hadoop.hbase.mapreduce.TableInputFormat///////////////////////////////////////////////////////////////////// User: 过往记忆 Date: 14-6-29 Time: 23:59 bolg:  本文地址:/archives/1051 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 过往记忆博客微信公共帐号:iteblog_hadoop/////////////////////////////////////////////////////////////////////object HBaseTest {  def main(args: Array[String]) {    val sc = newSparkContext(args(0), "HBaseTest",      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))    val conf = HBaseConfiguration.create()    conf.set(TableInputFormat.INPUT_TABLE, args(1))    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],      classOf[org.apache.hadoop.hbase.client.Result])    hBaseRDD.count()    System.exit(0)  }} | 
我们需要在加入如下依赖:
| libraryDependencies ++= Seq(        "org.apache.spark"% "spark-core_2.10"% "0.9.1",        "org.apache.hbase"% "hbase"% "0.98.2-hadoop2",        "org.apache.hbase"% "hbase-client"% "0.98.2-hadoop2",        "org.apache.hbase"% "hbase-common"% "0.98.2-hadoop2",        "org.apache.hbase"% "hbase-server"% "0.98.2-hadoop2") | 
在测试的时候,需要配置好Hbase、Hadoop环境,否则程序会出现问题,特别是让程序找到Hbase-site.xml配置文件。
| package com.iteblog.spark; | |
| import org.apache.hadoop.conf.Configuration; | |
| import org.apache.hadoop.hbase.HBaseConfiguration; | |
| import org.apache.hadoop.hbase.client.Result; | |
| import org.apache.hadoop.hbase.client.Scan; | |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
| import org.apache.hadoop.hbase.mapreduce.TableInputFormat; | |
| import org.apache.hadoop.hbase.protobuf.ProtobufUtil; | |
| import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; | |
| import org.apache.hadoop.hbase.util.Base64; | |
| import org.apache.hadoop.hbase.util.Bytes; | |
| import org.apache.spark.SparkConf; | |
| import org.apache.spark.api.java.JavaPairRDD; | |
| import org.apache.spark.api.java.JavaSparkContext; | |
| import org.apache.spark.api.java.function.Function2; | |
| import org.apache.spark.api.java.function.PairFunction; | |
| import scala.Serializable; | |
| import scala.Tuple2; | |
| import java.io.IOException; | |
| import java.util.List; | |
| /** | |
| * User: iteblog | |
| * Date: 14-6-27 | |
| * Time: 下午5:18 | |
| *blog: http://www.iteblog.com | |
| * | |
| * Usage: bin/spark-submit --master yarn-cluster --class com.iteblog.spark.SparkFromHbase | |
| * --jars /home/q/hbase/hbase-0.96.0-hadoop2/lib/htrace-core-2.01.jar, | |
| * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-common-0.96.0-hadoop2.jar, | |
| * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-client-0.96.0-hadoop2.jar, | |
| * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-protocol-0.96.0-hadoop2.jar, | |
| * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-server-0.96.0-hadoop2.jar | |
| * ./spark_2.10-1.0.jar | |
| */ | |
| public class SparkFromHbase implements Serializable { | |
| /** | |
| * copy from org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil | |
| * | |
| * @param scan | |
| * @return | |
| * @throws IOException | |
| */ | |
| String convertScanToString(Scan scan) throws IOException { | |
| ClientProtos.Scan proto = ProtobufUtil.toScan(scan); | |
| return Base64.encodeBytes(proto.toByteArray()); | |
| } | |
| public void start() { | |
| SparkConf sparkConf = new SparkConf(); | |
| JavaSparkContext sc = new JavaSparkContext(sparkConf); | |
| Configuration conf = HBaseConfiguration.create(); | |
| Scan scan = new Scan(); | |
| //scan.setStartRow(Bytes.toBytes("195861-1035177490")); | |
| //scan.setStopRow(Bytes.toBytes("195861-1072173147")); | |
| scan.addFamily(Bytes.toBytes("cf")); | |
| scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col_1")); | |
| try { | |
| String tableName = "wyp"; | |
| conf.set(TableInputFormat.INPUT_TABLE, tableName); | |
| conf.set(TableInputFormat.SCAN, convertScanToString(scan)); | |
| JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf, | |
| TableInputFormat.class, ImmutableBytesWritable.class, | |
| Result.class); | |
| JavaPairRDD<String, Integer> levels = hBaseRDD.mapToPair( | |
| new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() { | |
| @Override | |
| public Tuple2<String, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception { | |
| byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("col_1")); | |
| if (o != null) { | |
| return new Tuple2<String, Integer>(new String(o), 1); | |
| } | |
| return null; | |
| } | |
| }); | |
| JavaPairRDD<String, Integer> counts = levels.reduceByKey( | |
| new Function2<Integer, Integer, Integer>() { | |
| @Override | |
| public Integer call(Integer i1, Integer i2) { | |
| return i1 + i2; | |
| } | |
| }); | |
| List<Tuple2<String, Integer>> output = counts.collect(); | |
| for (Tuple2 tuple : output) { | |
| System.out.println(tuple._1() + ": " + tuple._2()); | |
| } | |
| sc.stop(); | |
| } catch (Exception e) { | |
| e.printStackTrace(); | |
| } | |
| } | |
| public static void main(String[] args) throws InterruptedException { | |
| new SparkFromHbase().start(); | |
| System.exit(0); | |
| } | |
| } | 
标签:select 问题 sys ali org deb evel from 情况下
原文地址:https://www.cnblogs.com/huanghanyu/p/13041865.html