标签:那伊抹微笑
package com.itdog8.cloud.hbase.mr.test; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.sun.corba.se.spi.ior.Writeable; /** * TestHBaseAsSinkMapReduceMainClass * * @author 那伊抹微笑 * @date 2015-07-31 10:52:21 * */ public class TestHBaseAsSinkMapReduceMainClass extends Configured implements Tool { private static final Log _log = LogFactory.getLog(TestHBaseAsSinkMapReduceMainClass.class); public static class ExampleSinkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private String rowkey = null; private byte[] family = null; private byte[] qualifier = null; private byte[] val = null; private long ts = System.currentTimeMillis(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { // logical processing // rowkey = ""; // family = null; // qualifier = null; // val = null; // put Put put = new Put(Bytes.toBytes(rowkey), ts); put.add(family, qualifier, val); context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put); } catch (Exception e) { e.printStackTrace(); } } } @Override public int run(String[] args) throws Exception { if(args.length != 3) { _log.info("Usage: 3 parameters needed!\nhadoop jar hbase-build-import-1.0.0.jar <inputPath><tableName><columns>"); System.exit(1); } String inputPath = args[0]; String tableName = args[1]; String columns = args[2]; // hbase configuration Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "a234-198.hadoop.com,a234-197.hadoop.com,a234-196.hadoop.com"); conf.set("hbase.zookeeper.property.clientPort", "2181"); // Job Job job = new Job(conf, "Import from file " + inputPath + " into table " + tableName); job.setJarByClass(TestHBaseAsSinkMapReduceMainClass.class); job.setMapperClass(ExampleSinkMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writeable.class); job.setNumReduceTasks(0); // add input path FileInputFormat.addInputPath(job, new Path(inputPath)); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); int retVal = 1; try { retVal = ToolRunner.run(conf, new TestHBaseAsSinkMapReduceMainClass(), otherArgs); } catch (Exception e) { e.printStackTrace(); } System.exit(retVal); } }
版权声明:本文为博主原创文章,未经博主允许不得转载。
HBase - MapReduce - HBase 作为输出源的示例 | 那伊抹微笑
标签:那伊抹微笑
原文地址:http://blog.csdn.net/u012185296/article/details/47279533