标签:configure awd http jms rtc 一起 行数据 cpi echo
1 public class TotalSortMap extends Mapper<Text, Text, Text, IntWritable> { 2 @Override 3 protected void map(Text key, Text value, 4 Context context) throws IOException, InterruptedException { 5 context.write(key, new IntWritable(Integer.parseInt(key.toString()))); 6 } 7 }
1 public class TotalSortReduce extends Reducer<Text, IntWritable, IntWritable, NullWritable> { 2 @Override 3 protected void reduce(Text key, Iterable<IntWritable> values, 4 Context context) throws IOException, InterruptedException { 5 for (IntWritable value : values) 6 context.write(value, NullWritable.get()); 7 } 8 }
入口类:
1 public class TotalSort extends Configured implements Tool{ 2 3 //实现一个Kye比较器,用于比较两个key的大小,将key由字符串转化为Integer,然后进行比较。 4 public static class KeyComparator extends WritableComparator { 5 protected KeyComparator() { 6 super(Text.class, true); 7 } 8 9 @Override 10 public int compare(WritableComparable writableComparable1, WritableComparable writableComparable2) { 11 int num1 = Integer.parseInt(writableComparable1.toString()); 12 int num2 = Integer.parseInt(writableComparable2.toString()); 13 14 return num1 - num2; 15 } 16 } 17 @Override 18 public int run(String[] args) throws Exception { 19 Configuration conf = new Configuration(); 20 conf.set("mapreduce.totalorderpartitioner.naturalorder", "false"); 21 Job job = Job.getInstance(conf, "Total Sort app"); 22 job.setJarByClass(TotalSort.class); 23 24 //设置读取文件的路径,都是从HDFS中读取。读取文件路径从脚本文件中传进来 25 FileInputFormat.addInputPath(job,new Path(args[0])); 26 //设置mapreduce程序的输出路径,MapReduce的结果都是输入到文件中 27 FileOutputFormat.setOutputPath(job,new Path(args[1])); 28 job.setInputFormatClass(KeyValueTextInputFormat.class); 29 //设置比较器,用于比较数据的大小,然后按顺序排序,该例子主要用于比较两个key的大小 30 job.setSortComparatorClass(KeyComparator.class); 31 job.setNumReduceTasks(3);//设置reduce数量 32 33 job.setMapOutputKeyClass(Text.class); 34 job.setMapOutputValueClass(IntWritable.class); 35 job.setOutputKeyClass(IntWritable.class); 36 job.setOutputValueClass(NullWritable.class); 37 38 //设置保存partitions文件的路径 39 TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2])); 40 //key值采样,0.01是采样率, 41 InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100); 42 //将采样数据写入到分区文件中 43 InputSampler.writePartitionFile(job, sampler); 44 45 job.setMapperClass(TotalSortMap.class); 46 job.setReducerClass(TotalSortReduce.class); 47 //设置分区类。 48 job.setPartitionerClass(TotalOrderPartitioner.class); 49 return job.waitForCompletion(true) ? 0 : 1; 50 } 51 public static void main(String[] args)throws Exception{ 52 53 int exitCode = ToolRunner.run(new TotalSort(), args); 54 System.exit(exitCode); 55 } 56 }
1 #!/bin/bash 2 do 3 for k in $(seq 1 10000) 4 echo $RANDOM; 5 done
sh create_data.sh > test_data.txt
hadoop fs -put test_data.txt /data/
1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar TotalSort.jar 2 hdfs://hadoop-master:8020/data/test_data1.txt \ 3 hdfs://hadoop-master:8020/total_sort_output \ 4 hdfs://hadoop-master:8020/total_sort_partitions
下面有几个坑要注意,大家不要踩:
job.setInputFormatClass(KeyValueTextInputFormat.class);
一起学Hadoop——TotalOrderPartitioner类实现全局排序
标签:configure awd http jms rtc 一起 行数据 cpi echo
原文地址:https://www.cnblogs.com/airnew/p/9595385.html