排序可分为四种排序:
普通排序
部分排序
全局排序
二次排序(比如有两列数据,第一列相同时,需要对第二列进行排序。)
普通排序是Mapreduce本身就自带排序功能;
Text对象是不适合排序的;IntWritable,LongWritable等实现了WritableComparable类型的对象都是可以排序的;
map和reduce处理过程中默认包含了对key的排序,如果不要求全排序,可以直接把结果输出,那么每个输出文件中包含的就是安装key执行排序的结果;
Hadoop平台并没有提供全局数据排序,而在大规模数据处理中进行数据的全局排序是非常普遍的需求;使用hadoop进行大量的数据排序最直观的方法是把文件所以内容给map之后,map不做任何处理,直接输出给一个reduce(一个reduce处理的话,不是很适合大规模的数据,效率不高。),利用hadoop自己的shuffle机制,对所有数据进行排序,而后由reduce直接输出;
如果要对大规模数据处理中进行数据的全局排序的话,
主要思路就是将数据按照区间进行分割,比如对整数排序,
[0,10000]的在partition 0中,(10000,20000]在partition 1中,
在数据分布均匀的情况下,每个分区内的数据量基本相同,这种就是比较理想的情况了,但是实际中数据往往分布不均匀,出现了数据倾斜的情况,这时按照之前的分区划分数据就不合适了,此时就需要一定的帮助——采样器;
package Sort;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Demo {
private final static String INPUT_PATH = "hdfs://liguodong:8020/input";
private final static String OUTPUT_PATH = "hdfs://liguodong:8020/output";
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] values = value.toString().split("\\s+");
context.write(new LongWritable(Long.parseLong(values[0])), NullWritable.get());
}
}
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
@Override
protected void reduce(LongWritable key, Iterable<NullWritable> values,
Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static class MyPartitioner extends Partitioner<LongWritable, NullWritable>{
@Override
public int getPartition(LongWritable key, NullWritable value,
int numPartitions) {
if(key.get() <= 100){
return 0%numPartitions;
}
if(key.get()>100 && key.get()<1000){
return 1%numPartitions;
}
return 2;
}
}
public static void main(String[] args) throws
ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
if(fileSystem.exists(new Path(OUTPUT_PATH)))
{
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
Job job = Job.getInstance(conf, "shuffle sort");
job.setJarByClass(Demo.class);
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setPartitionerClass(MyPartitioner.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(3);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[root@liguodong file]# vi sortsum
[root@liguodong file]# hdfs dfs -put sortsum /input
[root@liguodong file]# hdfs dfs -cat /input
43
6546
65
787
879
98
......
运行jar
[root@liguodong file]# yarn jar numsort.jar
查看执行结果
[root@liguodong file]# hdfs dfs -ls /output/
Found 4 items
-rw-r--r-- 1 root supergroup 0 2015-06-16 10:55 /output/_SUCCESS
-rw-r--r-- 1 root supergroup 28 2015-06-16 10:55 /output/part-r-00000
-rw-r--r-- 1 root supergroup 20 2015-06-16 10:55 /output/part-r-00001
-rw-r--r-- 1 root supergroup 15 2015-06-16 10:55 /output/part-r-00002
[root@liguodong file]# hdfs dfs -cat /output/part-r-00000
2
7
23
34
43
54
65
76
87
98
[root@liguodong file]# hdfs dfs -cat /output/part-r-00001
543
567
675
787
879
[root@liguodong file]# hdfs dfs -cat /output/part-r-00002
5423
6546
6554
以上程序的缺陷:人为的进行分区,这样可能数据极不对称,易产生数据倾斜。
因此,Hadoop提供了Sampler
接口可以返回一组样本,该接口为Hadoop的采样器;Hadoop提供了一个TotalOrderPartitioner
类,可以用来实现全局排序;
Hadoop2.2.0源码
package org.apache.hadoop.mapreduce.lib.partition;
/**
* Utility for collecting samples and writing a partition file for
* {@link TotalOrderPartitioner}.
*/
public class InputSampler<K,V> extends Configured implements Tool {
/**
* Interface to sample using an
* {@link org.apache.hadoop.mapreduce.InputFormat}.
*/
public interface Sampler<K,V> {
/**
* For a given job, collect and return a subset of the keys from the
* input data.
*/
K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException;
}
/**
* Samples the first n records from s splits.
* Inexpensive way to sample random data.
*/
public static class SplitSampler<K,V> implements Sampler<K,V> {}
/**
* Sample from random points in the input.
* General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
* each split.
*/
public static class RandomSampler<K,V> implements Sampler<K,V> {}
/**
* Sample from s splits at regular intervals.
* Useful for sorted data.
*/
public static class IntervalSampler<K,V> implements Sampler<K,V> {}
}
package org.apache.hadoop.mapreduce.lib.partition;
/**
* Partitioner effecting a total order by reading split points from
* an externally generated source.
*/
public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
extends Partitioner<K,V> implements Configurable {
}
关于partitioner这个实现可以使用采样器产生的文件。
举例:
key1 1
key2 2
key3 3
key2 1
key1 3
中间结果:
<key1,1> 1
<key1,3> 3
<key2,1> 1
<key2,2> 2
<key3,3> 3
排序结果:
key1 1
key1 3
key2 1
key2 2
key3 3
1、MapReduce默认会对key进行排序;
2、主要思路:
重写Partitioner,完成key分区,形成第一次排序;
可参考如下:
http://blog.csdn.net/scgaliguodong123_/article/details/46489357
实现WritableComparator,完成自己的排序逻辑,完成key的第2次排序;
可参考如下:
http://blog.csdn.net/scgaliguodong123_/article/details/46010947
Hadoop官方提供的例子:
package org.apache.hadoop.examples;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* This is an example Hadoop Map/Reduce application.
* It reads the text input files that must contain two integers per a line.
* The output is sorted by the first and second number and grouped on the
* first number.
*
* To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
* <i>in-dir</i> <i>out-dir</i>
*/
public class SecondarySort {
/**
* Define a pair of integers that are writable.
* They are serialized in a byte comparable format.
*/
public static class IntPair
implements WritableComparable<IntPair> {
private int first = 0;
private int second = 0;
/**
* Set the left and right values.
*/
public void set(int left, int right) {
first = left;
second = right;
}
public int getFirst() {
return first;
}
public int getSecond() {
return second;
}
/**
* Read the two integers.
* Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
*/
@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt() + Integer.MIN_VALUE;
second = in.readInt() + Integer.MIN_VALUE;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first - Integer.MIN_VALUE);
out.writeInt(second - Integer.MIN_VALUE);
}
@Override
public int hashCode() {
return first * 157 + second;
}
@Override
public boolean equals(Object right) {
if (right instanceof IntPair) {
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
} else {
return false;
}
}
/** A Comparator that compares serialized IntPair. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(IntPair.class);
}
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return compareBytes(b1, s1, l1, b2, s2, l2);
}
}
static { // register this comparator
WritableComparator.define(IntPair.class, new Comparator());
}
@Override
public int compareTo(IntPair o) {
if (first != o.first) {
return first < o.first ? -1 : 1;
} else if (second != o.second) {
return second < o.second ? -1 : 1;
} else {
return 0;
}
}
}
/**
* Partition based on the first part of the pair.
*/
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
@Override
public int getPartition(IntPair key, IntWritable value,
int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
/**
* Compare only the first part of the pair, so that reduce is called once
* for each value of the first part.
*/
public static class FirstGroupingComparator
implements RawComparator<IntPair> {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
b2, s2, Integer.SIZE/8);
}
@Override
public int compare(IntPair o1, IntPair o2) {
int l = o1.getFirst();
int r = o2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}
/**
* Read two integers from each line and generate a key, value pair
* as ((left, right), right).
*/
public static class MapClass
extends Mapper<LongWritable, Text, IntPair, IntWritable> {
private final IntPair key = new IntPair();
private final IntWritable value = new IntWritable();
@Override
public void map(LongWritable inKey, Text inValue,
Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(inValue.toString());
int left = 0;
int right = 0;
if (itr.hasMoreTokens()) {
left = Integer.parseInt(itr.nextToken());
if (itr.hasMoreTokens()) {
right = Integer.parseInt(itr.nextToken());
}
key.set(left, right);
value.set(right);
context.write(key, value);
}
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class Reduce
extends Reducer<IntPair, IntWritable, Text, IntWritable> {
private static final Text SEPARATOR =
new Text("------------------------------------------------");
private final Text first = new Text();
@Override
public void reduce(IntPair key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
context.write(SEPARATOR, null);
first.set(Integer.toString(key.getFirst()));
for(IntWritable value: values) {
context.write(first, value);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: secondarysort <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "secondary sort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// group and partition by the first int in the pair
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
// the map output is IntPair, IntWritable
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
// the reduce output is Text, IntWritable
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行结果:
[root@liguodong mapreduce]# hdfs dfs -put sort hdfs://liguodong:8020/input
[root@liguodong mapreduce]# hdfs dfs -cat hdfs://liguodong:8020/input
1 1
2 2
3 3
2 1
1 3
[root@liguodong mapreduce]# yarn jar hadoop-mapreduce-examples-2.6.0.jar secondarysort /input /output
......
[root@liguodong mapreduce]# hdfs dfs -cat /output/p*
------------------------------------------------
1 1
1 3
------------------------------------------------
2 1
2 2
------------------------------------------------
3 3
原文地址:http://blog.csdn.net/scgaliguodong123_/article/details/46514673