标签:那伊抹微笑 hbase mapreduce 妳那伊抹微笑
(期待与你一起学习,共同进步)Configuration config = HBaseConfiguration.create();...mapper需要继承于TableMapper...
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MyReadJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don‘t set to true for MR jobs
// set other scan attrs
...
TableMapReduceUtil.initTableMapperJob(
tableName, // input HBase table name
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); // because we aren‘t emitting anything from mapper
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
public class MyMapper extends TableMapper<Text, LongWritable> {
public void map(ImmutableBytesWritable row, Result value, Context context)
throws InterruptedException, IOException {
// process data for the row from the Result instance.
package com.itdog8.cloud.hbase.mr.test;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* TestHBaseAsSourceMapReduceMainClass
*
* @author 那伊抹微笑
* @date 2015-07-30 18:00:21
*
*/
public class TestHBaseAsSourceMapReduceMainClass {
private static final Log _log = LogFactory.getLog(TestHBaseAsSourceMapReduceMainClass.class);
private static final String JOB_NAME = "TestHBaseAsSourceMapReduce";
private static String tmpPath = "/tmp/com/itdog8/yting/TestHBaseAsSourceMapReduce";
private static String hbaseInputTble = "itdog8:test_1";
public static class ExampleSourceMapper extends TableMapper<Text, Text> {
private Text k = new Text();
private Text v = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String rowkey = Bytes.toString(key.get());
// 这里的操作需要熟悉下 Result 的操作就行了,接下来就是业务逻辑了
try {
// set value
k.set("望咩望");
v.set("食屎啦你");
// context write to reducer
context.write(k, v);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
}
public static void main(String[] args) throws Exception {
// hbase configuration
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "a234-198.hadoop.com,a234-197.hadoop.com,a234-196.hadoop.com");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// batch and caching
Scan scan = new Scan();
scan.setCaching(10000);
scan.setCacheBlocks(false);
scan.setMaxVersions(1);
// set hadoop speculative execution to false
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
// tmp index path
tmpPath = args[0];
Path tmpIndexPath = new Path(tmpPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(tmpIndexPath)) {
// fs.delete(tmpIndexPath, true); // dangerous
// _log.info("delete tmp index path : " + tmpIndexPath.getName());
_log.warn("The hdfs path ["+tmpPath+"] existed, please change a path.");
return ;
}
// Job && conf
Job job = new Job(conf, JOB_NAME);
job.setJarByClass(TestHBaseAsSourceMapReduceMainClass.class);
TableMapReduceUtil.initTableMapperJob(hbaseInputTble, scan, ExampleSourceMapper.class, Text.class, Text.class, job);
// job.setReducerClass(MyReducer.class); // 自己的处理逻辑
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, tmpIndexPath);
int success = job.waitForCompletion(true) ? 0 : 1;
System.exit(success);
}
}版权声明:本文为博主原创文章,未经博主允许不得转载。
HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑
标签:那伊抹微笑 hbase mapreduce 妳那伊抹微笑
原文地址:http://blog.csdn.net/u012185296/article/details/47279419