标签:那伊抹微笑
(期待与你一起学习,共同进步)package com.itdog8.cloud.hbase.mr.test;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.sun.corba.se.spi.ior.Writeable;
/**
* TestHBaseAsSinkMapReduceMainClass
*
* @author 那伊抹微笑
* @date 2015-07-31 10:52:21
*
*/
public class TestHBaseAsSinkMapReduceMainClass extends Configured implements Tool {
private static final Log _log = LogFactory.getLog(TestHBaseAsSinkMapReduceMainClass.class);
public static class ExampleSinkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private String rowkey = null;
private byte[] family = null;
private byte[] qualifier = null;
private byte[] val = null;
private long ts = System.currentTimeMillis();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
// logical processing
// rowkey = "";
// family = null;
// qualifier = null;
// val = null;
// put
Put put = new Put(Bytes.toBytes(rowkey), ts);
put.add(family, qualifier, val);
context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public int run(String[] args) throws Exception {
if(args.length != 3) {
_log.info("Usage: 3 parameters needed!\nhadoop jar hbase-build-import-1.0.0.jar <inputPath><tableName><columns>");
System.exit(1);
}
String inputPath = args[0];
String tableName = args[1];
String columns = args[2];
// hbase configuration
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "a234-198.hadoop.com,a234-197.hadoop.com,a234-196.hadoop.com");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// Job
Job job = new Job(conf, "Import from file " + inputPath + " into table " + tableName);
job.setJarByClass(TestHBaseAsSinkMapReduceMainClass.class);
job.setMapperClass(ExampleSinkMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writeable.class);
job.setNumReduceTasks(0);
// add input path
FileInputFormat.addInputPath(job, new Path(inputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
int retVal = 1;
try {
retVal = ToolRunner.run(conf, new TestHBaseAsSinkMapReduceMainClass(), otherArgs);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(retVal);
}
}版权声明:本文为博主原创文章,未经博主允许不得转载。
HBase - MapReduce - HBase 作为输出源的示例 | 那伊抹微笑
标签:那伊抹微笑
原文地址:http://blog.csdn.net/u012185296/article/details/47279533