码迷,mamicode.com
首页 > 编程语言 > 详细

MapReduce排序及实例

时间:2015-06-16 16:44:10      阅读:191      评论:0      收藏:0      [点我收藏+]

标签:sort   mapreduce   

排序可分为四种排序:
普通排序
部分排序
全局排序
二次排序(比如有两列数据,第一列相同时,需要对第二列进行排序。)

普通排序

普通排序是Mapreduce本身就自带排序功能;
Text对象是不适合排序的;IntWritable,LongWritable等实现了WritableComparable类型的对象都是可以排序的;

部分排序

map和reduce处理过程中默认包含了对key的排序,如果不要求全排序,可以直接把结果输出,那么每个输出文件中包含的就是安装key执行排序的结果;

全局排序

Hadoop平台并没有提供全局数据排序,而在大规模数据处理中进行数据的全局排序是非常普遍的需求;使用hadoop进行大量的数据排序最直观的方法是把文件所以内容给map之后,map不做任何处理,直接输出给一个reduce(一个reduce处理的话,不是很适合大规模的数据,效率不高。),利用hadoop自己的shuffle机制,对所有数据进行排序,而后由reduce直接输出;

如果要对大规模数据处理中进行数据的全局排序的话,
主要思路就是将数据按照区间进行分割,比如对整数排序,
[0,10000]的在partition 0中,(10000,20000]在partition 1中,
在数据分布均匀的情况下,每个分区内的数据量基本相同,这种就是比较理想的情况了,但是实际中数据往往分布不均匀,出现了数据倾斜的情况,这时按照之前的分区划分数据就不合适了,此时就需要一定的帮助——采样器;

package Sort;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class Demo {

    private final static String INPUT_PATH = "hdfs://liguodong:8020/input";
    private final static String OUTPUT_PATH = "hdfs://liguodong:8020/output";

    public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String[] values = value.toString().split("\\s+");
            context.write(new LongWritable(Long.parseLong(values[0])), NullWritable.get());

        }

    }

    public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{

        @Override
        protected void reduce(LongWritable key, Iterable<NullWritable> values,
                Context context)
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }

    }

    public static class MyPartitioner extends Partitioner<LongWritable, NullWritable>{

        @Override
        public int getPartition(LongWritable key, NullWritable value,
                int numPartitions) {
            if(key.get() <= 100){
                return 0%numPartitions;
            }
            if(key.get()>100 && key.get()<1000){
                return 1%numPartitions;
            }
            return 2;   
        }       
    }



    public static void main(String[] args) throws 
    ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUTPUT_PATH)))
        {
            fileSystem.delete(new Path(OUTPUT_PATH),true);
        }

        Job job = Job.getInstance(conf, "shuffle sort"); 

        job.setJarByClass(Demo.class);

        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));  

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setPartitionerClass(MyPartitioner.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(3);

        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}
[root@liguodong file]# vi sortsum
[root@liguodong file]# hdfs dfs -put sortsum /input
[root@liguodong file]# hdfs dfs -cat /input
43
6546
65
787
879
98
......


运行jar
[root@liguodong file]# yarn jar numsort.jar

查看执行结果
[root@liguodong file]# hdfs dfs -ls /output/
Found 4 items
-rw-r--r--   1 root supergroup          0 2015-06-16 10:55 /output/_SUCCESS
-rw-r--r--   1 root supergroup         28 2015-06-16 10:55 /output/part-r-00000
-rw-r--r--   1 root supergroup         20 2015-06-16 10:55 /output/part-r-00001
-rw-r--r--   1 root supergroup         15 2015-06-16 10:55 /output/part-r-00002
[root@liguodong file]# hdfs dfs -cat  /output/part-r-00000
2
7
23
34
43
54
65
76
87
98
[root@liguodong file]# hdfs dfs -cat  /output/part-r-00001
543
567
675
787
879
[root@liguodong file]# hdfs dfs -cat  /output/part-r-00002
5423
6546
6554

以上程序的缺陷:人为的进行分区,这样可能数据极不对称,易产生数据倾斜。
因此,Hadoop提供了Sampler接口可以返回一组样本,该接口为Hadoop的采样器;Hadoop提供了一个TotalOrderPartitioner类,可以用来实现全局排序;

Hadoop2.2.0源码

package org.apache.hadoop.mapreduce.lib.partition;
/**
 * Utility for collecting samples and writing a partition file for
 * {@link TotalOrderPartitioner}.
 */
public class InputSampler<K,V> extends Configured implements Tool  {
  /**
   * Interface to sample using an 
   * {@link org.apache.hadoop.mapreduce.InputFormat}.
   */
  public interface Sampler<K,V> {
    /**
     * For a given job, collect and return a subset of the keys from the
     * input data.
     */
    K[] getSample(InputFormat<K,V> inf, Job job) 
    throws IOException, InterruptedException;
  }

  /**
   * Samples the first n records from s splits.
   * Inexpensive way to sample random data.
   */
  public static class SplitSampler<K,V> implements Sampler<K,V> {}

  /**
   * Sample from random points in the input.
   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
   * each split.
   */
  public static class RandomSampler<K,V> implements Sampler<K,V> {}

  /**
   * Sample from s splits at regular intervals.
   * Useful for sorted data.
   */
  public static class IntervalSampler<K,V> implements Sampler<K,V> {}

}
package org.apache.hadoop.mapreduce.lib.partition;
/**
 * Partitioner effecting a total order by reading split points from
 * an externally generated source.
 */
public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
    extends Partitioner<K,V> implements Configurable {

}

关于partitioner这个实现可以使用采样器产生的文件。

二次排序

举例:

key1 1
key2 2
key3 3
key2 1
key1 3

中间结果:

<key1,1> 1
<key1,3>  3
<key2,1>  1
<key2,2>  2
<key3,3>  3

排序结果:

key1 1
key1 3
key2 1
key2 2
key3 3

1、MapReduce默认会对key进行排序;
2、主要思路:
重写Partitioner,完成key分区,形成第一次排序;
可参考如下:
http://blog.csdn.net/scgaliguodong123_/article/details/46489357
实现WritableComparator,完成自己的排序逻辑,完成key的第2次排序;
可参考如下:
http://blog.csdn.net/scgaliguodong123_/article/details/46010947

Hadoop官方提供的例子:

package org.apache.hadoop.examples;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * This is an example Hadoop Map/Reduce application.
 * It reads the text input files that must contain two integers per a line.
 * The output is sorted by the first and second number and grouped on the 
 * first number.
 *
 * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
 *            <i>in-dir</i> <i>out-dir</i> 
 */
public class SecondarySort {

  /**
   * Define a pair of integers that are writable.
   * They are serialized in a byte comparable format.
   */
  public static class IntPair 
                      implements WritableComparable<IntPair> {
    private int first = 0;
    private int second = 0;

    /**
     * Set the left and right values.
     */
    public void set(int left, int right) {
      first = left;
      second = right;
    }
    public int getFirst() {
      return first;
    }
    public int getSecond() {
      return second;
    }
    /**
     * Read the two integers. 
     * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
     */
    @Override
    public void readFields(DataInput in) throws IOException {
      first = in.readInt() + Integer.MIN_VALUE;
      second = in.readInt() + Integer.MIN_VALUE;
    }
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeInt(first - Integer.MIN_VALUE);
      out.writeInt(second - Integer.MIN_VALUE);
    }
    @Override
    public int hashCode() {
      return first * 157 + second;
    }
    @Override
    public boolean equals(Object right) {
      if (right instanceof IntPair) {
        IntPair r = (IntPair) right;
        return r.first == first && r.second == second;
      } else {
        return false;
      }
    }
    /** A Comparator that compares serialized IntPair. */ 
    public static class Comparator extends WritableComparator {
      public Comparator() {
        super(IntPair.class);
      }

      public int compare(byte[] b1, int s1, int l1,
                         byte[] b2, int s2, int l2) {
        return compareBytes(b1, s1, l1, b2, s2, l2);
      }
    }

    static {                                        // register this comparator
      WritableComparator.define(IntPair.class, new Comparator());
    }

    @Override
    public int compareTo(IntPair o) {
      if (first != o.first) {
        return first < o.first ? -1 : 1;
      } else if (second != o.second) {
        return second < o.second ? -1 : 1;
      } else {
        return 0;
      }
    }
  }

  /**
   * Partition based on the first part of the pair.
   */
  public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
    @Override
    public int getPartition(IntPair key, IntWritable value, 
                            int numPartitions) {
      return Math.abs(key.getFirst() * 127) % numPartitions;
    }
  }

  /**
   * Compare only the first part of the pair, so that reduce is called once
   * for each value of the first part.
   */
  public static class FirstGroupingComparator 
                implements RawComparator<IntPair> {
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
                                             b2, s2, Integer.SIZE/8);
    }

    @Override
    public int compare(IntPair o1, IntPair o2) {
      int l = o1.getFirst();
      int r = o2.getFirst();
      return l == r ? 0 : (l < r ? -1 : 1);
    }
  }

  /**
   * Read two integers from each line and generate a key, value pair
   * as ((left, right), right).
   */
  public static class MapClass 
         extends Mapper<LongWritable, Text, IntPair, IntWritable> {

    private final IntPair key = new IntPair();
    private final IntWritable value = new IntWritable();

    @Override
    public void map(LongWritable inKey, Text inValue, 
                    Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(inValue.toString());
      int left = 0;
      int right = 0;
      if (itr.hasMoreTokens()) {
        left = Integer.parseInt(itr.nextToken());
        if (itr.hasMoreTokens()) {
          right = Integer.parseInt(itr.nextToken());
        }
        key.set(left, right);
        value.set(right);
        context.write(key, value);
      }
    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce 
         extends Reducer<IntPair, IntWritable, Text, IntWritable> {
    private static final Text SEPARATOR = 
      new Text("------------------------------------------------");
    private final Text first = new Text();

    @Override
    public void reduce(IntPair key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      context.write(SEPARATOR, null);
      first.set(Integer.toString(key.getFirst()));
      for(IntWritable value: values) {
        context.write(first, value);
      }
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: secondarysort <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "secondary sort");
    job.setJarByClass(SecondarySort.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(Reduce.class);

    // group and partition by the first int in the pair
    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(FirstGroupingComparator.class);

    // the map output is IntPair, IntWritable
    job.setMapOutputKeyClass(IntPair.class);
    job.setMapOutputValueClass(IntWritable.class);

    // the reduce output is Text, IntWritable
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}

运行结果:

[root@liguodong mapreduce]# hdfs dfs -put sort hdfs://liguodong:8020/input           
[root@liguodong mapreduce]# hdfs dfs -cat hdfs://liguodong:8020/input
1 1
2 2
3 3
2 1
1 3

[root@liguodong mapreduce]# yarn jar hadoop-mapreduce-examples-2.6.0.jar secondarysort /input /output
......

[root@liguodong mapreduce]# hdfs dfs -cat /output/p*
------------------------------------------------
1       1
1       3
------------------------------------------------
2       1
2       2
------------------------------------------------
3       3

MapReduce排序及实例

标签:sort   mapreduce   

原文地址:http://blog.csdn.net/scgaliguodong123_/article/details/46514673

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!