标签:
正文开始前 ,先介绍几个概念4.在自定义数据类型中,建议使用java的原生数据类型,最好不要使用Hadoop对原生类型进行封装的数据类型。比如 int x ;//IntWritable 和String s; //Text 等等
下面是一个自定义的数据类型 3D坐标轴
package com.tg.type; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Point3D implements WritableComparable<Point3D> { public float x, y, z; public Point3D(float fx, float fy, float fz) { this.x = fx; this.y = fy; this.z = fz; } public Point3D() { this(0.0f, 0.0f, 0.0f); } public void readFields(DataInput in) throws IOException { x = in.readFloat(); y = in.readFloat(); z = in.readFloat(); } @Override public void write(DataOutput out) throws IOException { out.writeFloat(x); out.writeFloat(y); out.writeFloat(z); } public String toString() { return "X:"+Float.toString(x) + ", " + "Y:"+Float.toString(y) + ", " + "Z:"+Float.toString(z); } public float distanceFromOrigin() { return (float) Math.sqrt( x*x + y*y +z*z); } public int compareTo(Point3D other) { return Float.compare( distanceFromOrigin(), other.distanceFromOrigin()); } public boolean equals(Object o) { if( !(o instanceof Point3D)) { return false; } Point3D other = (Point3D) o; return this.x == other.x && this.y == other.y && this.z == other.z; } /* 实现 hashCode() 方法很重要 * Hadoop的Partitioners会用到这个方法,后面再说 */ public int hashCode() { return Float.floatToIntBits(x) ^ Float.floatToIntBits(y) ^ Float.floatToIntBits(z); } }
首先看看输入文件a.txt
数据输入格式(InputFormat) 用于描述MapReduce作业的数据输入规范。MapReduce框架依靠数据输入格式完成输入规范检查(比如输入文件目录的检查)、对数据文件进行输入分块(也叫分片,InputSplit),以及提供从输入分块(分片)中将数据记录逐一读出,并转化为Map过程的输入键值对等功能
Hadoop提供了丰富的内置数据输入格式。最常用的数据输入格式包括:TextInputFormat和KeyValueInputFormat
TextInputFormat是系统默认的数据输入格式,可以将文本文件分块并逐行读入以便Map节点进行处理。读入一行时,所产生的主键Key就是当前行在整个文本文件中的字节偏移位置,而value就是该行的内容,它是系统默认的输入格式,当用户程序不设置任何数据输入格式时,系统自动使用这个数据输入格式。
比如如下文件内容
hello tanggao
hello hadoop
第一行的偏移量为0
第二行偏移量为13
KeyValueTextInputFormat是另一个常用的数据输入格式,可将一个按照<key,value>格式逐行存放的文本文件逐行读出,并自动解析生成相应的key和value
比如
姓名 汤高
年龄 20
则解析出来的
第一行键Key为姓名 值value为汤高
第二行键key为年龄 值value为20
注意和TextInputFormat不同,TextInputFormat是偏移量做键,整行内容做值
对于一个数据输入格式,都需要一个对应的RecordReader。RecordReader。主要用于将一个文件中的数据记录分拆成具体的键值对,传送给Map过程作为键值对输入参数。每一个数据输入格式都有一个默认的RecordReader。TextInputFormat的默认RecordReader是LineRecordReader,而KeyValueTextInputFormat的默认RecordReader是KeyValueLineRecordReader
当然肯定还有很多数据输入格式和对应的默认RecordReader 这里就不接受了,有需要的可以去官网看看
数据输出格式(OutputFormat)用于描述MapReduce作业的数据输出规范。MapReduce框架依靠数据输出格式完成输出规范检查(蔽日检查输出目录是否存在),以及提供作业结果数据输出等功能
Hadoop提供了丰富的内置数据输出格式。最常用的数据输出格式是TextOutputFormat,也是系统默认的数据输出格式,可以将计算结果以 key+\t+value的形式逐行输出到文本文件中。
与数据输入格式中的RecordReader类似,数据输出格式也提供一个对应的RecordWriter,以便系统明确输出结果写入到文件中的具体格式。
TextOutputFormat的默认RecordWriter是LineRecordWriter,其实际操作是将结果数据以key+\t+value的形式输出到文本文件中。
当然同样肯定还有很多数据输出格式和对应的默认RecordWriter
对于自定义数据输入格式 可以参考已有的数据输入格式,继承自它即可,只要重写GetRecordReader方法得到一个自己写的RecordReader即可
我的是仿造KeyValueTextInputFormat和它的KeyValueLineRecordReader来自定义自己的输入格式的,所以我都是自己复制了上面两个类的源码然后进行自己的改写
package com.my.input; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class myInputFormat extends FileInputFormat<Text,Text> { //用来压缩的 @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; } @Override public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { context.setStatus(genericSplit.toString()); return new MyRecordReader(context.getConfiguration()); } }
他的RecordReader
package com.my.input; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; @InterfaceAudience.Public @InterfaceStability.Stable public class MyRecordReader extends RecordReader<Text, Text> { public static final String KEY_VALUE_SEPERATOR = "mapreduce.input.mylinerecordreader.key.value.separator"; private final LineRecordReader lineRecordReader; //源码是根据\t分割 我改为了我自己的需求为=号分割 private byte separator = (byte) '='; private Text innerValue; private Text key; private Text value; public Class<Text> getKeyClass() { return Text.class; } public MyRecordReader(Configuration conf) throws IOException { lineRecordReader = new LineRecordReader(); String sepStr = conf.get(KEY_VALUE_SEPERATOR, "="); this.separator = (byte) sepStr.charAt(0); } public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { lineRecordReader.initialize(genericSplit, context); } public static int findSeparator(byte[] utf, int start, int length, byte sep) { for (int i = start; i < (start + length); i++) { if (utf[i] == sep) { return i; } } return -1; } public static void setKeyValue(Text key, Text value, byte[] line, int lineLen, int pos) { if (pos == -1) { key.set(line, 0, lineLen); value.set(""); } else { key.set(line, 0, pos); value.set(line, pos + 1, lineLen - pos - 1); } } /** Read key/value pair in a line. */ public synchronized boolean nextKeyValue() throws IOException { byte[] line = null; int lineLen = -1; if (lineRecordReader.nextKeyValue()) { innerValue = lineRecordReader.getCurrentValue(); line = innerValue.getBytes(); lineLen = innerValue.getLength(); } else { return false; } if (line == null) return false; if (key == null) { key = new Text(); } if (value == null) { value = new Text(); } int pos = findSeparator(line, 0, lineLen, this.separator); setKeyValue(key, value, line, lineLen, pos); return true; } public Text getCurrentKey() { return key; } public Text getCurrentValue() { return value; } public float getProgress() throws IOException { return lineRecordReader.getProgress(); } public synchronized void close() throws IOException { lineRecordReader.close(); } }
因为FileOutputFormat已经帮我们实现了许多通用的功能
package com.my.input; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public class MyOutputFormat<K, V> extends FileOutputFormat<K, V> { public static String SEPERATOR = "mapreduce.output.textoutputformat.separator"; protected static class MyLineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } //改写了源码 把\t改为了=========> public MyLineRecordWriter(DataOutputStream out) { this(out, "=========>"); } /** * Write the object to the byte stream, handling Text as a special case. * * @param o * the object to print * @throws IOException * if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } } public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = conf.get(SEPERATOR, "=========>"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new MyLineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); return new MyLineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } } }
package com.my.input; import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public class MyOutputFormat2<K, V> extends TextOutputFormat<K, V> { @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); //改写了源码 把\t改为了=========> String keyValueSeparator = conf.get(SEPERATOR, "=========>"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } } }
测试代码
package com.my.input; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import com.tg.type.Point3D; public class Point3DDriver { /** * * @author 汤高 * Point3D为自定义数据类型 把它作为map的输出类型 * */ // Map过程 static int count=0; public static class MyMapper extends Mapper<Text, Text, Text, Point3D> { /*** * */ @Override protected void map(Text key, Text value, Mapper<Text, Text, Text, Point3D>.Context context) throws IOException, InterruptedException { count++; //这里得到的键是自定义输入格式输出的内容 本例是 One 、two、three //这里得到的值是X:1.0, Y:2.0, Z:3.0 等 //根据都好截取值里面的内容 分别设置到自定义数据类型Point3D里面去 String[] vs = value.toString().split(","); Point3D p = new Point3D(Float.parseFloat(vs[0].split(":")[1]), Float.parseFloat(vs[1].split(":")[1]), Float.parseFloat(vs[2].split(":")[1]) ); // 写出去 把自定义数据类型输出去 context.write(new Text(key), p); System.out.println("几个map==========>"+count); } } //Reduce过程 public static class MyReducer extends Reducer<Text, Point3D, Text, Point3D>{ protected void reduce(Text key, Point3D value, Reducer<Text, Point3D, Text, Point3D>.Context context) throws IOException, InterruptedException { context.write(key, value); } } public static void main(String[] args) { try { Configuration conf = new Configuration(); String[] paths = new GenericOptionsParser(conf, args).getRemainingArgs(); if (paths.length < 2) { throw new RuntimeException("usage <input> <output>"); } Job job = Job.getInstance(conf, "Point3DDriver"); job.setJarByClass(Point3DDriver.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setInputFormatClass(myInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Point3D.class); job.setOutputFormatClass(MyOutputFormat.class); //job.setOutputFormatClass(MyOutputFormat2.class); FileInputFormat.addInputPaths(job, paths[0]); FileOutputFormat.setOutputPath(job, new Path(paths[1] + System.currentTimeMillis()));// 整合好结果后输出的位置 System.exit(job.waitForCompletion(true) ? 0 : 1);// 执行job } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
对于编写Map函数和Reduce函数不熟悉的朋友,可以参看我上篇博客 里面讲解了如何实现MapReduce编程
MapReduce工作原理详解
结果:
码字不易,转载请指明出自 http://blog.csdn.net/tanggao1314/article/details/51305852
干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例
标签:
原文地址:http://blog.csdn.net/tanggao1314/article/details/51305852