标签:
Hbase Mapreduce编程
hadoop,hbase安装参考:http://blog.csdn.net/mapengbo521521/article/details/41777721
hbase表创建数据插入参考:http://blog.csdn.net/mapengbo521521/article/details/43917119
hbase mapreduce参考:http://wenku.baidu.com/link?url=w5WwJHqI2KWOx_xQcIrP0Q2GYo0s3t6SWDz_plT-D2WiTBrU6hn_6CGwCD6XbbQ72EPeRTmIxpDJ-6Ju5LFyHh4P27eLyVXJ5xwBdmqYVR_
对于Hhbase中的表blog2的数据如下格式:
我们需要转化成blog3表的数据如下,即统计每种技术作者的nicknames:
实际运算过程如下:
具体代码如下(注意如果表blog3不存在则需要创建):
packageorg.apache.hadoop.hbase;
importjava.io.IOException;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.hbase.client.HBaseAdmin;
importorg.apache.hadoop.hbase.client.Put;
importorg.apache.hadoop.hbase.client.Result;
importorg.apache.hadoop.hbase.client.Scan;
importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;
importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
importorg.apache.hadoop.hbase.mapreduce.TableMapper;
importorg.apache.hadoop.hbase.mapreduce.TableReducer;
importorg.apache.hadoop.hbase.util.Bytes;
importorg.apache.hadoop.mapreduce.Job;
publicclass HbaseMapReduce {
public static class Mapper extendsTableMapper<ImmutableBytesWritable,ImmutableBytesWritable> {
public Mapper() {
}
@Override
public voidmap(ImmutableBytesWritable row, Result
values, Context context) throwsIOException {
ImmutableBytesWritable value= null;
String[] tags = null;
for (KeyValue kv :values.list()) {
if("author".equals(Bytes.toString(kv.getFamily())) &&"nickname".equals(Bytes.toString(kv.getQualifier()))) {
value =new ImmutableBytesWritable(kv.getValue());
}
if("article".equals(Bytes.toString(kv.getFamily())) &&"tag".equals(Bytes.toString(kv.getQualifier()))) {
tags =Bytes.toString(kv.getValue()).split(",");
}
}
for (int i = 0; i <tags.length; i++) {
ImmutableBytesWritablekey = new ImmutableBytesWritable(
Bytes.toBytes(tags[i].toLowerCase()));
try {
context.write(key,value);
} catch(InterruptedException e) {
}
}
}
}
public static class Reducer extendsTableReducer<ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable> {
@Override
public voidreduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable>values, Context
context) throws IOException,InterruptedException {
String friends ="";
for (ImmutableBytesWritableval : values) {
friends +=(friends.length() > 0 ? "," : "")+Bytes.toString(val.get());
}
Put put = newPut(key.get());
put.add(Bytes.toBytes("person"),
Bytes.toBytes("nicknames"),Bytes.toBytes(friends));
context.write(key, put);
}
}
public static void main(String[] args) throwsException {
Configuration conf = newConfiguration();
conf = HBaseConfiguration.create(conf);
Job job = new Job(conf,"HBase_FindFriend");
job.setJarByClass(HbaseMapReduce.class);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("author"),Bytes.toBytes("nickname"));
scan.addColumn(Bytes.toBytes("article"),Bytes.toBytes("tag"));
TableMapReduceUtil.initTableMapperJob("blog2",scan,HbaseMapReduce.Mapper.class, ImmutableBytesWritable.class,
ImmutableBytesWritable.class, job);
String []family={"person"};
creatTable("blog3",family,conf);
TableMapReduceUtil.initTableReducerJob("blog3",HbaseMapReduce.Reducer.class,job);
System.exit(job.waitForCompletion(true)? 0 : 1);
}
public static void creatTable(String tableName,String[] family,Configuration conf)throws Exception {
HBaseAdmin admin = newHBaseAdmin(conf);
HTableDescriptor desc = newHTableDescriptor(tableName);
for (int i = 0; i < family.length;i++) {
desc.addFamily(newHColumnDescriptor(family[i]));
}
if (admin.tableExists(tableName)) {
System.out.println("tableExists!");
System.exit(0);
} else {
admin.createTable(desc);
System.out.println("createtable Success!");
}
}
}
如果以hbase作为输入HDFS作为输出:
Configurationconf = HBaseConfiguration.create();
Job job =new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan= new Scan();
TableMapReduceUtil.initTableMapperJob(inputTable,scan, mapper.class,
Writable.class, Writable.class, job);
job.setOutputKeyClass(Writable.class);
job.setOutputValueClass(Writable.class);
FileOutputFormat.setOutputPath(job,Path);
job.waitForCompletion(true);
如果以HDFS作为输入hbase作为输出:
Configurationconf = HBaseConfiguration.create();
Job job =new Job(conf, "job name ");
job.setJarByClass(test.class);
job.setMapperClass(mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job,path);
TableMapReduceUtil.initTableReducerJob(tableName,
reducer.class,job);
标签:
原文地址:http://blog.csdn.net/mapengbo521521/article/details/43924891