标签:des style blog class code java
package com.mr.test; import java.io.IOException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable> { @Override public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { CombineFileSplit combineFileSplit = (CombineFileSplit) split; CombineFileRecordReader<LongWritable, BytesWritable> recordReader = new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class); try { recordReader.initialize(combineFileSplit, context); } catch (InterruptedException e) { new RuntimeException("Error to initialize CombineSmallfileRecordReader."); } return recordReader; } }
package com.mr.test;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class CombineSmallfileRecordReader extends RecordReader<LongWritable, BytesWritable> {
private CombineFileSplit combineFileSplit;
private LineRecordReader lineRecordReader = new LineRecordReader();
private Path[] paths;
private int totalLength;
private int currentIndex;
private float currentProgress = 0;
private LongWritable currentKey;
private BytesWritable currentValue = new BytesWritable();
public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException {
super();
this.combineFileSplit = combineFileSplit;
this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.combineFileSplit = (CombineFileSplit) split;
// 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据
FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());
lineRecordReader.initialize(fileSplit, context);
this.paths = combineFileSplit.getPaths();
totalLength = paths.length;
context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName());
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
currentKey = lineRecordReader.getCurrentKey();
return currentKey;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
System.out.println("lineRecordReader:"+lineRecordReader.getCurrentValue().toString());
byte[] content = lineRecordReader.getCurrentValue().toString().getBytes();
System.out.println("content:"+new String(content));
currentValue = new BytesWritable();
currentValue.set(content, 0, content.length);
System.out.println("currentValue:"+new String(currentValue.getBytes()));
return currentValue;
}
public static void main(String args[]){
BytesWritable cv = new BytesWritable();
String str1 = "1234567";
String str2 = "123450";
cv.set(str1.getBytes(), 0, str1.getBytes().length);
System.out.println(new String(cv.getBytes()));
cv.setCapacity(0);
cv.set(str2.getBytes(), 0, str2.getBytes().length);
System.out.println(new String(cv.getBytes()));
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (currentIndex >= 0 && currentIndex < totalLength) {
return lineRecordReader.nextKeyValue();
} else {
return false;
}
}
@Override
public float getProgress() throws IOException {
if (currentIndex >= 0 && currentIndex < totalLength) {
currentProgress = (float) currentIndex / totalLength;
return currentProgress;
}
return currentProgress;
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
package com.mr.test; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class BulkImportData { public static class TokenizerMapper extends Mapper<Object, BytesWritable, Text, Text> { public Text _key = new Text(); public Text _value = new Text(); public void map(Object key, BytesWritable value, Context context) throws IOException, InterruptedException { _value.set(value.getBytes()); String tmp = _value.toString().trim(); System.out.println(tmp); tmp = tmp.replace("\\x00", ""); _value.set(tmp); String filename = context.getConfiguration().get("map.input.file.name"); String[] splits = _value.toString().split(","); if(splits.length==3){ filename = filename.replace("mv_", ""); filename = filename.replace(".txt", ""); _key.set(splits[0]+"_"+filename); context.write(_key, _value); } } } public static class IntSumReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> itr = values.iterator(); while(itr.hasNext()){ Text t = itr.next(); String[] strs = t.toString().split(","); if(strs.length!=3)continue; Put put = new Put(key.getBytes()); put.add(Bytes.toBytes("content"), Bytes.toBytes("score"), Bytes.toBytes(strs[1].trim())); put.add(Bytes.toBytes("content"), Bytes.toBytes("date"), Bytes.toBytes(strs[2].trim())); context.write(new ImmutableBytesWritable(key.getBytes()), put); } } } public static void main(String[] args) throws Exception { String tablename = "ntf_data"; Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); if (admin.tableExists(tablename)) { admin.disableTable(tablename); admin.deleteTable(tablename); } HTableDescriptor htd = new HTableDescriptor(tablename); HColumnDescriptor hcd = new HColumnDescriptor("content"); htd.addFamily(hcd); admin.createTable(htd); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 1) { System.err .println("Usage: wordcount <in> <out>" + otherArgs.length); System.exit(2); } Job job = new Job(conf, "h"); job.setMapperClass(TokenizerMapper.class); job.setJarByClass(BulkImportData.class); job.setInputFormatClass(CombineSmallfileInputFormat.class); job.setNumReduceTasks(5); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class, job); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
利用CombineFileInputFormat把ntf_data导入到Hbase里,布布扣,bubuko.com
利用CombineFileInputFormat把ntf_data导入到Hbase里
标签:des style blog class code java
原文地址:http://blog.csdn.net/xiewenbo/article/details/25637931