码迷,mamicode.com
首页 > 编程语言 > 详细

mapreduce的二次排序实现方式

时间:2015-01-14 00:58:03      阅读:271      评论:0      收藏:0      [点我收藏+]

标签:

本文主要介绍下二次排序的实现方式

我们知道mapreduce是按照key来进行排序的,那么如果有有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序,这个其实就是二次排序。

下面就具体说一下二次排序的实现方式

1. 自定义一个key

为什么要自定义一个key,我们知道mapreduce中排序就是按照key来排序的,我们既然想要实现按照两个字段进行排序,默认的方式肯定不行,所以需要定义一个新的key,key里面有两个属性,也就是我们要排序的两个字段

首先,实现WritableComparable接口,因为key是可序列化并且可以比较的

其次,重载相关的方法,例如序列化、反序列化相关的方法write、readFields。重载在分区的时候要用到的hashcode方法,注意后面会说道一个partitioner类,也是用来分区的,用hashcode方法和partitioner类进行分区都是可以的,使用其中的一个即可。重载排序用的compareTo方法,这个就是真正对排序起作用的方法。

2. 分区函数类

上面定义了一个新的key,那么我现在做分发,到底按照什么样的规则进行分发是在分区函数类中定义的,这个类要继承Partitioner类,重载其中的分区方法getPartition,在main函数里给job添加上即可,例如:job.setPartitionerClass(partitioner.class)

这个类的作用跟key的hashcode方法的作用一样,所以如果在hashcode方法中写了分区的方法,这个分区类是可以省掉的

3. 比较函数类

这个类决定着key的排序规则,是一个比较器,需要继承WritableComparator类,并且重载其中的compare方法。在main函数里给job添加上即可,例如:job.setSortComparatorClass(KeyComparator.class)

这个类的作用跟自定义key的compareTo方法一样,如果在自定义的key中重载的compareTo方法,则这个类可省略。

4. 分组函数类

通过分区类,我们重新定义了key的分区规则,但是多个key不同的也可以进入到一个reducer中,所以我们需要分组函数类来定义什么样的key做为一组来执行,因为也涉及到比较,所以这个类也需要继承WritableComparator,并且重载其中的compare方法,在main函数中加入即可,例如:job.setPartitionerClass(partitioner.class);

下面是具体实现的代码

public class SecondSortTest {

	private static String input = "/dsap/rawdata/secondSortTest/result3";
	private static String output = "/dsap/rawdata/secondSortTest/result6";

	public static class Mapper1 extends Mapper<Object, Text, Pair, Text> {

		private Pair pair = new Pair();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {

			String[] segs = value.toString().split("\\s+");

			pair.set(Float.parseFloat(segs[0]), Float.parseFloat(segs[1]));
			context.write(pair, new Text(segs[1]));
		}
	}

	public static class Reducer2 extends Reducer<Pair, Text, Text, Text> {

		public void reduce(Pair key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {

			context.write(new Text(key.toString()), new Text("==========="));
			for (Text text : values) {
				context.write(new Text(key.toString()), text);
			}
		}
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		/** 判断输出路径是否存在,如果存在,则删除 */
		FileSystem hdfs = FileSystem.get(conf);
		Job job = new Job(conf, "secondSortTest");
		job.setJarByClass(SecondSortTest.class);

		FileInputFormat.addInputPath(job, new Path(input));
		if (hdfs.exists(new Path(output)))
			hdfs.delete(new Path(output));
		FileOutputFormat.setOutputPath(job, new Path(output));

		job.setGroupingComparatorClass(GroupingComparator.class);

		job.setNumReduceTasks(19);
		job.setMapperClass(Mapper1.class);
		job.setReducerClass(Reducer2.class);
		job.setMapOutputKeyClass(Pair.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.waitForCompletion(true);

	}

	public static class partitioner extends Partitioner<Pair, Text> {

		@Override
		public int getPartition(Pair key, Text value, int numPartitions) {
			return Math.abs((int) (key.getFirst() * 127)) % numPartitions;
		}
	}

	static class Pair implements WritableComparable<Pair> {

		private float first;
		private float second = 0;

		@Override
		public void readFields(DataInput in) throws IOException {
			first = in.readFloat();
			second = in.readFloat();
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeFloat(first);
			out.writeFloat(second);
		}

		@Override
		public int hashCode() {
			return (int) (first * 127);
		}

		// 这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法
		@Override
		public int compareTo(Pair o) {
			if (first != o.first) {
				return first - o.first > 0 ? 1 : -1;
			} else if (second != o.second) {
				return second - o.second > 0 ? 1 : -1;
			}
			return 0;
		}

		public void set(float left, float right) {
			first = left;
			second = right;
		}

		public float getFirst() {
			return first;
		}

		public float getSecond() {
			return second;
		}

		@Override
		public String toString() {
			return "Pair [first=" + first + ", second=" + second + "]";
		}
	}

	static class GroupingComparator implements RawComparator<Pair> {

		@Override
		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8,
					b2, s2, Integer.SIZE / 8);
		}

		@Override
		public int compare(Pair o1, Pair o2) {
			float first1 = o1.getFirst();
			float first2 = o2.getFirst();
			return first1 - first2 > 0 ? 1 : -1;
		}
	}
}


上面的代码中注意一点,就是reduce中的key到底是什么,如果我把key直接tostring打印出来,那么这个值是排序排在最前面的那个key,如果我遍历value迭代器,并且在里面将key也打印出来,可以看到,迭代器的value里对应的key也被迭代出来了
技术分享


参考:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html

mapreduce的二次排序实现方式

标签:

原文地址:http://blog.csdn.net/nwpuwyk/article/details/42687289

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!