指的是让所有的输出结果都是有序的,最简单的方法就是用一个reduce任务,但是这样处理大型文件时效率极低,失去的并行架构的意义。所以可以采用分组排序的方法来实现全局排序,例如现在要实现按键的全局的排序,可以将键值按照取值范围分为n个分组,<-10℃,-10℃~0℃, 0℃~10℃,>10℃。实现partitioner类,创建4个分区,将温度按照取值范围分类到四个分区中,每个分区进行排序,然后将4个分区结果合并成一个,既是一个全局有序的输出。
类名称 |
采样方式 |
构造方法 |
效率 |
SplitSampler(int numSamples, int maxSplitsSampled) |
对输入分片均匀采样,每个分片取前n个。 |
采样总数,用于采样的分片数 |
最高 |
RandomSampler(double freq, int numSamples, int maxSplitsSampled) |
遍历所有数据,随机采样 |
采样频率,采样总数,划分数 |
最低 |
IntervalSampler(double freq, int maxSplitsSampled) |
固定间隔采样对有序的数据十分适用 |
采样频率,划分数 |
中 |
public class InputSampler<K, V> extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(InputSampler.class);
static int printUsage() {
System.out.println("sampler -r <reduces>\n [-inFormat <input format class>]\n [-keyClass <map input & output key class>]\n [-splitRandom <double pcnt> <numSamples> <maxsplits> | // Sample from random splits at random (general)\n -splitSample <numSamples> <maxsplits> | // Sample from first records in splits (random data)\n -splitInterval <double pcnt> <maxsplits>] // Sample from splits at intervals (sorted data)");
System.out.println("Default sampler: -splitRandom 0.1 10000 10");
return -1;
public InputSampler(Configuration conf) {
public static <K, V> void writePartitionFile(Job job, InputSampler.Sampler<K, V> sampler) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = job.getConfiguration();
InputFormat inf = (InputFormat)ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
int numPartitions = job.getNumReduceTasks();
K[] samples = (Object[])sampler.getSample(inf, job);
LOG.info("Using " + samples.length + " samples");
RawComparator<K> comparator = job.getSortComparator();
Arrays.sort(samples, comparator);
Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
FileSystem fs = dst.getFileSystem(conf);
if (fs.exists(dst)) {
fs.delete(dst, false);
Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = (float)samples.length / (float)numPartitions;
int last = -1;
for(int i = 1; i < numPartitions; ++i) {
int k;
for(k = Math.round(stepSize * (float)i); last >= k && comparator.compare(samples[last], samples[k]) == 0; ++k) {
writer.append(samples[k], nullValue);
last = k;
public int run(String[] args) throws Exception {
Job job = new Job(this.getConf());
ArrayList<String> otherArgs = new ArrayList();
InputSampler.Sampler<K, V> sampler = null;
for(int i = 0; i < args.length; ++i) {
try {
if ("-r".equals(args[i])) {
} else if ("-inFormat".equals(args[i])) {
} else if ("-keyClass".equals(args[i])) {
} else if ("-splitSample".equals(args[i])) {
int numSamples = Integer.parseInt(args[i]);
int maxSplits = Integer.parseInt(args[i]);
if (0 >= maxSplits) {
maxSplits = 2147483647;
sampler = new InputSampler.SplitSampler(numSamples, maxSplits);
} else {
int maxSplits;
double pcnt;
if ("-splitRandom".equals(args[i])) {
pcnt = Double.parseDouble(args[i]);
maxSplits = Integer.parseInt(args[i]);
int maxSplits = Integer.parseInt(args[i]);
if (0 >= maxSplits) {
maxSplits = 2147483647;
sampler = new InputSampler.RandomSampler(pcnt, maxSplits, maxSplits);
} else if ("-splitInterval".equals(args[i])) {
pcnt = Double.parseDouble(args[i]);
maxSplits = Integer.parseInt(args[i]);
if (0 >= maxSplits) {
maxSplits = 2147483647;
sampler = new InputSampler.IntervalSampler(pcnt, maxSplits);
} else {
} catch (NumberFormatException var10) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException var11) {
System.out.println("ERROR: Required parameter missing from " + args[i - 1]);
return printUsage();
// reduce任务数量不能<=2,否则分组就没有了任何意义
if (job.getNumReduceTasks() <= 1) {
System.err.println("Sampler requires more than one reducer");
return printUsage();
} else if (otherArgs.size() < 2) {
System.out.println("ERROR: Wrong number of parameters: ");
return printUsage();
} else {
if (null == sampler) {
sampler = new InputSampler.RandomSampler(0.1D, 10000, 10);
Path outf = new Path((String)otherArgs.remove(otherArgs.size() - 1));
TotalOrderPartitioner.setPartitionFile(this.getConf(), outf);
Iterator i$ = otherArgs.iterator();
while(i$.hasNext()) {
String s = (String)i$.next();
FileInputFormat.addInputPath(job, new Path(s));
writePartitionFile(job, (InputSampler.Sampler)sampler);
return 0;
public static void main(String[] args) throws Exception {
InputSampler<?, ?> sampler = new InputSampler(new Configuration());
int res = ToolRunner.run(sampler, args);
public interface Sampler<K, V> {
K[] getSample(InputFormat<K, V> var1, Job var2) throws IOException, InterruptedException;
public static class SplitSampler<K, V> implements InputSampler.Sampler<K, V> {
protected final int numSamples;
protected final int maxSplitsSampled;
public SplitSampler(int numSamples) {
this(numSamples, 2147483647);
public SplitSampler(int numSamples, int maxSplitsSampled) {
this.numSamples = numSamples;//采样总数
this.maxSplitsSampled = maxSplitsSampled;// 用于取样的分片数量,不大于实际分片数
public K[] getSample(InputFormat<K, V> inf, Job job) throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList(this.numSamples);
int splitsToSample = Math.min(this.maxSplitsSampled, splits.size());
int samplesPerSplit = this.numSamples / splitsToSample;
long records = 0L;
for(int i = 0; i < splitsToSample; ++i) {
TaskAttemptContext samplingContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader<K, V> reader = inf.createRecordReader((InputSplit)splits.get(i), samplingContext);
reader.initialize((InputSplit)splits.get(i), samplingContext);
while(reader.nextKeyValue()) {
samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null));
if ((long)((i + 1) * samplesPerSplit) <= records) {
return (Object[])samples.toArray();
public static class IntervalSampler<K, V> implements InputSampler.Sampler<K, V> {
protected final double freq;
protected final int maxSplitsSampled;
public IntervalSampler(double freq) {
this(freq, 2147483647);
public IntervalSampler(double freq, int maxSplitsSampled) {
this.freq = freq;//采样率
this.maxSplitsSampled = maxSplitsSampled;//用于采样的分片数
public K[] getSample(InputFormat<K, V> inf, Job job) throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList();
int splitsToSample = Math.min(this.maxSplitsSampled, splits.size());
long records = 0L;//遍历的记录数
long kept = 0L;//采集的记录数
for(int i = 0; i < splitsToSample; ++i) {
TaskAttemptContext samplingContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader<K, V> reader = inf.createRecordReader((InputSplit)splits.get(i), samplingContext);
reader.initialize((InputSplit)splits.get(i), samplingContext);
while(reader.nextKeyValue()) {
if ((double)kept / (double)records < this.freq) {
samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null));
return (Object[])samples.toArray();
public static class RandomSampler<K, V> implements InputSampler.Sampler<K, V> {
protected double freq;
protected final int numSamples;
protected final int maxSplitsSampled;
public RandomSampler(double freq, int numSamples) {
this(freq, numSamples, 2147483647);
public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
this.freq = freq;//采样率
this.numSamples = numSamples;//采样总数
this.maxSplitsSampled = maxSplitsSampled;//用于采样的分片数
public K[] getSample(InputFormat<K, V> inf, Job job) throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList(this.numSamples);//采样保存申请空间
int splitsToSample = Math.min(this.maxSplitsSampled, splits.size());//计算用于采样的分片数
Random r = new Random();//创建随机对象
long seed = r.nextLong();//创建随机种子
InputSampler.LOG.debug("seed: " + seed);
int i;//将分片打乱顺序,随机获取第j个分片,和第i个分片进行交换
for(i = 0; i < splits.size(); ++i) {
InputSplit tmp = (InputSplit)splits.get(i);
int j = r.nextInt(splits.size());
splits.set(i, splits.get(j));
splits.set(j, tmp);
for(i = 0; i < splitsToSample || i < splits.size() && samples.size() < this.numSamples; ++i) {
TaskAttemptContext samplingContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader<K, V> reader = inf.createRecordReader((InputSplit)splits.get(i), samplingContext);
reader.initialize((InputSplit)splits.get(i), samplingContext);
while(reader.nextKeyValue()) {
if (r.nextDouble() <= this.freq) {
if (samples.size() < this.numSamples) {
samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null));
} else {
int ind = r.nextInt(this.numSamples);
if (ind != this.numSamples) {
samples.set(ind, ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null));
//每采样到一个数据,采样率会减小,r.nextDouble() <= this.freq采样到的数据概率会减小
this.freq *= (double)(this.numSamples - 1) / (double)this.numSamples;
return (Object[])samples.toArray();
(1) TotalOrderPartitioner
public class TotalOrderPartitioner<K extends WritableComparable<?>, V> extends Partitioner<K, V> implements Configurable {
(2) 随机采样全局排序实例
温度区间 |
<5.6 |
[5.6,13.9] |
[13.9,22.0) |
>=22.0 |
package Temperature;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.net.URI;
public class SortTempetatureTotalOrder extends Configured implements Tool {
public int run(String[] args) throws Exception
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
0.1, 10000, 10);
Configuration conf =job.getConfiguration();
String partitionFile=TotalOrderPartitioner.getPartitionFile(conf);
URI partitionUri=new URI(partitionFile);
return job.waitForCompletion(true)? 0:1;
public static class JobBuilder {
public static Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException {
if (args.length != 2) {
return null;
Job job = null;
try {
job = new Job(conf, tool.getClass().getName());
} catch (IOException e) {
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job;
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new SortTempetatureTotalOrder(), args);
执行任务的hadoop命令如下, -totalsort表示采用全局排序
%hadoop jar Hadoop-example.jar SortTempetatureTotalOrder –D mapreduce.job.reduces=4 input/ncdc/all –seq outout -totalsort
