码迷,mamicode.com
首页 > 其他好文 > 详细

数据批量导入HBase

时间:2016-03-02 19:38:11      阅读:432      评论:0      收藏:0      [点我收藏+]

标签:

测试数据:

datas

1001    lilei   17  13800001111
1002    lily    16  13800001112
1003    lucy    16  13800001113
1004    meimei  16  13800001114

 数据批量导入使用mr,先生成HFile文件然后在用completebulkload工具导入。

1、需要先在hbase 创建表名:

hbase> create ‘student‘, {NAME => ‘info‘}

maven pom.xml配置文件如下:

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>

<!-- hbase -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.0.0</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.0.0</version>
        </dependency>

 

编写MapReduce代码如下:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/** 
 * @author 作者 E-mail: 
 * @version 创建时间:2016年3月2日 下午4:15:57 
 * 类说明 
 */
public class CreateHfileByMapReduce {

    public static class MyBulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{
        @Override
        protected void setup( Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context )
            throws IOException, InterruptedException {
           
            super.setup( context );
        }
        @Override
        protected void map( LongWritable key, Text value,
                            Context context )
            throws IOException, InterruptedException {
            String[] split = value.toString().split("\t"); // 根据实际情况修改
            if (split.length == 4){
                byte[] rowkey = split[0].getBytes();
                ImmutableBytesWritable imrowkey = new ImmutableBytesWritable( rowkey );
                context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1])));
                context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(split[2])));
                context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(split[3])));
            }
        }
    }
    
    @SuppressWarnings( "deprecation" )
    public static void main( String[] args ) {
        if (args.length != 4){
            System.err.println("Usage: CreateHfileByMapReduce <table_name><data_input_path><hfile_output_path> ");
            System.exit(2);
        }
        
        String tableName = args[0];
        String inputPath  = args[1];
        String outputPath = args[2];
        
      /*  String tableName = "student";
        String inputPath  = "hdfs://node2:9000/datas";
        String outputPath = "hdfs://node2:9000/user/output";*/
        HTable hTable = null;
        Configuration conf = HBaseConfiguration.create();
        try {
           hTable  = new HTable(conf, tableName);
           Job job = Job.getInstance( conf, "CreateHfileByMapReduce");
           job.setJarByClass( CreateHfileByMapReduce.class );
           job.setMapperClass(MyBulkMapper.class);
           job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
           //
           HFileOutputFormat.configureIncrementalLoad(job, hTable);
           FileInputFormat.addInputPath( job, new Path(inputPath) );
           FileOutputFormat.setOutputPath( job, new Path(outputPath) );
           System.exit( job.waitForCompletion(true)? 0: 1 );
           
        }
        catch ( Exception e ) {
            
            e.printStackTrace();
        }
        
    }
}

注: 借助maven的assembly插件, 生成胖jar包(就是把依赖的zookeeper和hbase jar包都打到该MapReduce包中), 否则的话, 就需要用户静态配置, 在Hadoop的class中添加zookeeper和hbase的配置文件和相关jar包.

最终的jar包为 bulk.jar, 主类名为cn.bd.batch.mr.CreateHfileByMapReduce, 生成HFile, 增量热载入hbase
sudo -u hdfs hadoop jar <xxoo>.jar <MainClass> <table_name> <data_input_path> <hfile_output_path>
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hfile_output_path> <table_name>

hadoop jar bulk.jar cn.bd.batch.mr.CreateHfileByMapReduce student /datas /user/output

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /user/output student

 

 

 

 

 

 

 

 

 

本文参考地址:http://www.cnblogs.com/mumuxinfei/p/3823367.html

数据批量导入HBase

标签:

原文地址:http://www.cnblogs.com/zhanggl/p/5235912.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!