码迷,mamicode.com
首页 > 其他好文 > 详细

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

时间:2016-05-04 12:03:48      阅读:537      评论:0      收藏:0      [点我收藏+]

标签:

正文开始前 ,先介绍几个概念

序列化
所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储。


反序列化
是指将字节流转回到结构化对象的逆过程


序列化在分布式数据处理的两个大领域经常出现:进程间通信和永久存储


在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用"(remote procedure call,RPC)实现的 。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息


Hadoop使用了自己写的序列化格式Writable,它格式紧凑,速度快,但是它很难用Java以外的语言进行拓展或使用,因为Writable是Hadoop的核心,大多数MapReduce程序都会为键和值使用它




简单来说,RPC协议是让程序员可以调用远程计算机进程上的代码的一套工具


打个比方  就是A通过网络调用B的某个进程方法


通信中的协议是由程序员自己规定的,比如你可以规定说当A向B发送数字1, B就打印hello hadoop, 并返回数字1给A, 如果发送数字2,B就打印hello world并发送数字2给A.  


序列化------------>写  write(DataOutput out)


反序列化-------->读   readFields(DataInput in)


首先声明:我是基于Hadoop2.6.4版本

 一.  Hadoop内置的数据类型


BooleanWritable:标准布尔型数值


ByteWritable:单字节数值


DoubleWritable:双字节数值


FloatWritable:浮点数


IntWritable:整型数


LongWritable:长整型数


Text:使用UTF8格式存储的文本


NullWritable:当<key, value>中的key或value为空时使用

 Hadoop中的数据类型都要实现Writable接口,以便用这些类型定义的数据可以被网络传输和文件存储。


 二. 用户自定义数据类型的实现


     1.继承接口Writable,实现其方法write()和readFields(), 以便该数据能被序列化后完成网络传输或文件输入/输出;


     2.如果该数据需要作为主键key使用,或需要比较数值大小时,则需要实现WritalbeComparable接口,实现其方法write(),readFields(),CompareTo() 。


     3.数据类型,必须要有一个无参的构造方法,为了方便反射,进行创建对象。    


     4.在自定义数据类型中,建议使用java的原生数据类型,最好不要使用Hadoop对原生类型进行封装的数据类型。比如 int x ;//IntWritable 和String s; //Text  等等


下面是一个自定义的数据类型  3D坐标轴 

package com.tg.type;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Point3D implements WritableComparable<Point3D> {
	public float x, y, z;
	public Point3D(float fx, float fy, float fz) {
		this.x = fx;
		this.y = fy;
		this.z = fz;
	}
	public Point3D() {
		this(0.0f, 0.0f, 0.0f);
	}
	public void readFields(DataInput in) throws IOException {
		x = in.readFloat();
		y = in.readFloat();
		z = in.readFloat();
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeFloat(x);
		out.writeFloat(y);
		out.writeFloat(z);
	}
	public String toString() {
		return "X:"+Float.toString(x) + ", "
				+ "Y:"+Float.toString(y) + ", "
				+ "Z:"+Float.toString(z);
	}
	public float distanceFromOrigin() {
		return (float) Math.sqrt( x*x + y*y +z*z);
	}
	public int compareTo(Point3D other) {
		return Float.compare(
				distanceFromOrigin(), 
				 other.distanceFromOrigin());
	}
	public boolean equals(Object o) {
		if( !(o instanceof Point3D)) {
			return false;
		}
		Point3D other = (Point3D) o;
		return this.x == other.x && this.y == other.y && this.z == other.z;
	}
	/* 实现 hashCode() 方法很重要
	 * Hadoop的Partitioners会用到这个方法,后面再说 
	 */
	public int hashCode() {
		return Float.floatToIntBits(x)
				^ Float.floatToIntBits(y)
				^ Float.floatToIntBits(z);
	}
	
}

下面讲数据输入输出格式和自定义数据输入输出格式 ,然后把上面讲过的自定义数据类型整合进去


首先看看输入文件a.txt

技术分享

数据输入格式(InputFormat) 用于描述MapReduce作业的数据输入规范。MapReduce框架依靠数据输入格式完成输入规范检查(比如输入文件目录的检查)、对数据文件进行输入分块(也叫分片,InputSplit),以及提供从输入分块(分片)中将数据记录逐一读出,并转化为Map过程的输入键值对等功能

Hadoop提供了丰富的内置数据输入格式。最常用的数据输入格式包括:TextInputFormat和KeyValueInputFormat
TextInputFormat是系统默认的数据输入格式,可以将文本文件分块并逐行读入以便Map节点进行处理。读入一行时,所产生的主键Key就是当前行在整个文本文件中的字节偏移位置,而value就是该行的内容,它是系统默认的输入格式,当用户程序不设置任何数据输入格式时,系统自动使用这个数据输入格式。
比如如下文件内容
hello tanggao
hello hadoop
第一行的偏移量为0
第二行偏移量为13

KeyValueTextInputFormat是另一个常用的数据输入格式,可将一个按照<key,value>格式逐行存放的文本文件逐行读出,并自动解析生成相应的key和value

比如
姓名    汤高
年龄    20
则解析出来的
第一行键Key为姓名  值value为汤高
第二行键key为年龄 值value为20
注意和TextInputFormat不同,TextInputFormat是偏移量做键,整行内容做值

对于一个数据输入格式,都需要一个对应的RecordReader。RecordReader。主要用于将一个文件中的数据记录分拆成具体的键值对,传送给Map过程作为键值对输入参数。每一个数据输入格式都有一个默认的RecordReader。TextInputFormat的默认RecordReader是LineRecordReader,而KeyValueTextInputFormat的默认RecordReader是KeyValueLineRecordReader
当然肯定还有很多数据输入格式和对应的默认RecordReader 这里就不接受了,有需要的可以去官网看看



数据输出格式(OutputFormat)用于描述MapReduce作业的数据输出规范。MapReduce框架依靠数据输出格式完成输出规范检查(蔽日检查输出目录是否存在),以及提供作业结果数据输出等功能

Hadoop提供了丰富的内置数据输出格式。最常用的数据输出格式是TextOutputFormat,也是系统默认的数据输出格式,可以将计算结果以 key+\t+value的形式逐行输出到文本文件中。

与数据输入格式中的RecordReader类似,数据输出格式也提供一个对应的RecordWriter,以便系统明确输出结果写入到文件中的具体格式。
TextOutputFormat的默认RecordWriter是LineRecordWriter,其实际操作是将结果数据以key+\t+value的形式输出到文本文件中。

当然同样肯定还有很多数据输出格式和对应的默认RecordWriter


对于自定义数据输入格式 可以参考已有的数据输入格式,继承自它即可,只要重写GetRecordReader方法得到一个自己写的RecordReader即可


我的是仿造KeyValueTextInputFormat和它的KeyValueLineRecordReader来自定义自己的输入格式的,所以我都是自己复制了上面两个类的源码然后进行自己的改写



package com.my.input;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class myInputFormat extends FileInputFormat<Text,Text> {
	//用来压缩的
	@Override
	  protected boolean isSplitable(JobContext context, Path file) {
	    final CompressionCodec codec =
	      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
	    if (null == codec) {
	      return true;
	    }
	    return codec instanceof SplittableCompressionCodec;
	  }
	
	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {
		context.setStatus(genericSplit.toString());
		return new MyRecordReader(context.getConfiguration());
	}

}


他的RecordReader

package com.my.input;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MyRecordReader extends RecordReader<Text, Text> {

	 public static final String KEY_VALUE_SEPERATOR = 
			    "mapreduce.input.mylinerecordreader.key.value.separator";
			  
			  private final LineRecordReader lineRecordReader;
			  //源码是根据\t分割  我改为了我自己的需求为=号分割
			  private byte separator = (byte) '=';

			  private Text innerValue;

			  private Text key;
			  
			  private Text value;
			  
			  public Class<Text> getKeyClass() { return Text.class; }
			  
			  public MyRecordReader(Configuration conf)
			    throws IOException {
			    
			    lineRecordReader = new LineRecordReader();
			    String sepStr = conf.get(KEY_VALUE_SEPERATOR, "=");
			    this.separator = (byte) sepStr.charAt(0);
			  }

			  public void initialize(InputSplit genericSplit,
			      TaskAttemptContext context) throws IOException {
			    lineRecordReader.initialize(genericSplit, context);
			   
				
			  }
			  
			  public static int findSeparator(byte[] utf, int start, int length, 
			      byte sep) {
			    for (int i = start; i < (start + length); i++) {
			      if (utf[i] == sep) {
			        return i;
			      }
			    }
			    return -1;
			  }

			  public static void setKeyValue(Text key, Text value, byte[] line,
			      int lineLen, int pos) {
			    if (pos == -1) {
			      key.set(line, 0, lineLen);
			      value.set("");
			    } else {
			      key.set(line, 0, pos);
			      value.set(line, pos + 1, lineLen - pos - 1);
			    }
			  }
			  /** Read key/value pair in a line. */
			  public synchronized boolean nextKeyValue()
			    throws IOException {
			    byte[] line = null;
			    int lineLen = -1;
			    if (lineRecordReader.nextKeyValue()) {
			      innerValue = lineRecordReader.getCurrentValue();
			      line = innerValue.getBytes();
			      lineLen = innerValue.getLength();
			    } else {
			      return false;
			    }
			    if (line == null)
			      return false;
			    if (key == null) {
			      key = new Text();
			    }
			    if (value == null) {
			      value = new Text();
			    }
			    int pos = findSeparator(line, 0, lineLen, this.separator);
			    setKeyValue(key, value, line, lineLen, pos);
			    return true;
			  }
			  
			  public Text getCurrentKey() {
			    return key;
			  }

			  public Text getCurrentValue() {
			    return value;
			  }

			  public float getProgress() throws IOException {
			    return lineRecordReader.getProgress();
			  }
			  
			  public synchronized void close() throws IOException { 
			    lineRecordReader.close();
			  }
}


对于自定义数据输出格式 可以参考已有的数据输出格式,继承自它即可,只要重写GetRecordWriter方法得到一个自己写的RecordWriter即可
一种实现:我们一般继承自FileOutputFormat来改写

因为FileOutputFormat已经帮我们实现了许多通用的功能

package com.my.input;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class MyOutputFormat<K, V> extends FileOutputFormat<K, V> {
	public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";

	protected static class MyLineRecordWriter<K, V> extends RecordWriter<K, V> {
		private static final String utf8 = "UTF-8";
		private static final byte[] newline;

		static {
			try {
				newline = "\n".getBytes(utf8);
			} catch (UnsupportedEncodingException uee) {
				throw new IllegalArgumentException("can't find " + utf8 + " encoding");
			}
		}

		protected DataOutputStream out;
		private final byte[] keyValueSeparator;

		public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) {
			this.out = out;
			try {
				this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
			} catch (UnsupportedEncodingException uee) {
				throw new IllegalArgumentException("can't find " + utf8 + " encoding");
			}
		}
		//改写了源码  把\t改为了=========>
		public MyLineRecordWriter(DataOutputStream out) {
			this(out, "=========>");
		}

		/**
		 * Write the object to the byte stream, handling Text as a special case.
		 * 
		 * @param o
		 *            the object to print
		 * @throws IOException
		 *             if the write throws, we pass it on
		 */
		private void writeObject(Object o) throws IOException {
			if (o instanceof Text) {
				Text to = (Text) o;
				out.write(to.getBytes(), 0, to.getLength());
			} else {
				out.write(o.toString().getBytes(utf8));
			}
		}

		public synchronized void write(K key, V value) throws IOException {

			boolean nullKey = key == null || key instanceof NullWritable;
			boolean nullValue = value == null || value instanceof NullWritable;
			if (nullKey && nullValue) {
				return;
			}
			if (!nullKey) {
				writeObject(key);
			}
			if (!(nullKey || nullValue)) {
				out.write(keyValueSeparator);
			}
			if (!nullValue) {
				writeObject(value);
			}
			out.write(newline);
		}

		public synchronized void close(TaskAttemptContext context) throws IOException {
			out.close();
		}
	}

	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
		Configuration conf = job.getConfiguration();
		boolean isCompressed = getCompressOutput(job);
		String keyValueSeparator = conf.get(SEPERATOR, "=========>");
		CompressionCodec codec = null;
		String extension = "";
		if (isCompressed) {
			Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
			extension = codec.getDefaultExtension();
		}
		Path file = getDefaultWorkFile(job, extension);
		FileSystem fs = file.getFileSystem(conf);
		if (!isCompressed) {
			FSDataOutputStream fileOut = fs.create(file, false);
			return new MyLineRecordWriter<K, V>(fileOut, keyValueSeparator);
		} else {
			FSDataOutputStream fileOut = fs.create(file, false);
			return new MyLineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),
					keyValueSeparator);
		}
	}
}

第二种实现:FileOutputFormat的一个重要子类就是TextOutputFormat,我们也可以继承它,然后重写getRecordWriter方法即可


package com.my.input;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class MyOutputFormat2<K, V> extends TextOutputFormat<K, V> {
	
	@Override
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
		Configuration conf = job.getConfiguration();
		boolean isCompressed = getCompressOutput(job);
		//改写了源码  把\t改为了=========>
		String keyValueSeparator = conf.get(SEPERATOR, "=========>");
		CompressionCodec codec = null;
		String extension = "";
		if (isCompressed) {
			Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
			extension = codec.getDefaultExtension();
		}
		Path file = getDefaultWorkFile(job, extension);
		FileSystem fs = file.getFileSystem(conf);
		if (!isCompressed) {
			FSDataOutputStream fileOut = fs.create(file, false);
			return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
		} else {
			FSDataOutputStream fileOut = fs.create(file, false);
			return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),
					keyValueSeparator);
		}
	}
}

最后就可以用来测试了

技术分享


测试代码

package com.my.input;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.tg.type.Point3D;




public class Point3DDriver {

	/**
	 * 
	 * @author 汤高 
	 * Point3D为自定义数据类型  把它作为map的输出类型
	 *        
	 */
	// Map过程
	static int count=0;
	public static class MyMapper extends Mapper<Text, Text, Text, Point3D> {
		/***
		 * 
		 */
		@Override
		protected void map(Text key, Text value, Mapper<Text, Text, Text, Point3D>.Context context)
				throws IOException, InterruptedException {
			count++;
			//这里得到的键是自定义输入格式输出的内容  本例是 One 、two、three
			//这里得到的值是X:1.0, Y:2.0, Z:3.0 等
			//根据都好截取值里面的内容 分别设置到自定义数据类型Point3D里面去
			String[] vs = value.toString().split(",");
			Point3D p = new Point3D(Float.parseFloat(vs[0].split(":")[1]), Float.parseFloat(vs[1].split(":")[1]), Float.parseFloat(vs[2].split(":")[1])	);
				// 写出去 把自定义数据类型输出去
			
				context.write(new Text(key), p);
				System.out.println("几个map==========>"+count);
		}
	}
	//Reduce过程
	public static class MyReducer extends Reducer<Text, Point3D, Text, Point3D>{
		
		protected void reduce(Text key, Point3D value,
				Reducer<Text, Point3D, Text, Point3D>.Context context) throws IOException, InterruptedException {
			
			context.write(key, value);
		}
		
	}

	public static void main(String[] args) {

		try {
			Configuration conf = new Configuration();
			String[] paths = new GenericOptionsParser(conf, args).getRemainingArgs();
			if (paths.length < 2) {
				throw new RuntimeException("usage <input> <output>");
			}

			Job job = Job.getInstance(conf, "Point3DDriver");
			job.setJarByClass(Point3DDriver.class);

			job.setMapperClass(MyMapper.class);
			job.setReducerClass(MyReducer.class);
			job.setInputFormatClass(myInputFormat.class);
			
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Point3D.class);
			job.setOutputFormatClass(MyOutputFormat.class);
			//job.setOutputFormatClass(MyOutputFormat2.class);
			FileInputFormat.addInputPaths(job, paths[0]);
			FileOutputFormat.setOutputPath(job, new Path(paths[1] + System.currentTimeMillis()));// 整合好结果后输出的位置
			System.exit(job.waitForCompletion(true) ? 0 : 1);// 执行job
			

		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

对于编写Map函数和Reduce函数不熟悉的朋友,可以参看我上篇博客 里面讲解了如何实现MapReduce编程  

MapReduce工作原理详解   
结果:

技术分享



码字不易,转载请指明出自  http://blog.csdn.net/tanggao1314/article/details/51305852


干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

标签:

原文地址:http://blog.csdn.net/tanggao1314/article/details/51305852

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!