11 13 15 17 19 21 23 25 27 29
10 12 14 16 18 20 22 24 26 28 30
1 2 3 4 5 6 7 8 9 10
1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 10 10 11 10 ……
实际需求有了,问题也来了,那么需要一一解决。MapReduce确实有自己的排序机制,我们不会排开不用,但是不能完全靠内部机制实现。要知道MapReduce是根据key进行排序的,如果key为int类型,则按照key的数值大小排序;如果key为String类型,则按照字典先后顺序进行排序。为了保证这里的全局有序,需要定义一个自己的Partition类,起到一个全局筛选的作用,是的map过后的分配到reduce端的都是有序的。具体做法就是用输入数据的最大值除以系统partition数量的商作为分割数据的边界增量,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的。之后,在Reduce端得到的<key, value-list>,根据value-list中的元素个数将输入的key作为value的输出次数,输出的key是一个全局变量,用于统计当前的位次。
public class Sort { //map将输入中的value转化成IntWritable类型,作为输出的key public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{ private static IntWritable data = new IntWritable(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); System.out.println("line:" + line); try{ data.set(Integer.parseInt(line)); }catch(Exception e){ data.set(1000); } System.out.println("Map key:" + data .toString() ); context.write(data, new IntWritable(1)); } } //reduce将输入的key复制到输出的value,然后根据输入的 //value-list中元素的个数决定key的输出次数 //用全局linenum来代表key的位次 public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { private static IntWritable linenum = new IntWritable(1); public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable val : values) { context.write(linenum , key); System.out.println("Reduce key:" + linenum + "\tReduce value:" + key ); linenum = new IntWritable(linenum.get() + 1); } } } //自定义Partition函数,此函数根据输入数据的最大值和MapReduce框架中 //Partition的数量获取将输入数据按照大小分块的边界,然后根据输入数值和 //边界的关系返回对应的Partition ID public static class Partition extends Partitioner <IntWritable,IntWritable> { @Override public int getPartition(IntWritable key, IntWritable value, int numPartitions) { int Maxnumber = 65223; int bound = Maxnumber/numPartitions + 1; int keynumber = key.get(); for(int i = 0; i < numPartitions; i++){ System.out.println("numPartitions:" + numPartitions); if(keynumber < bound*i && keynumber >= bound*(i-1)) return i-1; } return 0; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "Sort"); job.setJarByClass(Sort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setPartitionerClass(Partition.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
1.在自己新建测试数据的时候,需要小心处理,比如在sortfile1.txt中一共是10行数据,如果将换行符停留在第11行,则在map阶段会抛出格式转换异常,所以添加代码中try catch处理。
3.Reduce中应该是“return 0”,圣经《hadoop 实战2》中写成了return -1,实践证明是有错误的
15/01/28 21:19:28 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 15/01/28 21:19:28 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 15/01/28 21:19:28 INFO input.FileInputFormat: Total input paths to process : 3 15/01/28 21:19:29 INFO mapred.JobClient: Running job: job_local_0001 15/01/28 21:19:29 INFO input.FileInputFormat: Total input paths to process : 3 15/01/28 21:19:29 INFO mapred.MapTask: io.sort.mb = 100 15/01/28 21:19:29 INFO mapred.MapTask: data buffer = 79691776/99614720 15/01/28 21:19:29 INFO mapred.MapTask: record buffer = 262144/327680 line:11 15/01/28 21:19:29 INFO mapred.MapTask: Starting flush of map output Map key:11 numPartitions:1 line:13 Map key:13 numPartitions:1 line:15 Map key:15 numPartitions:1 line:17 Map key:17 numPartitions:1 line:19 Map key:19 numPartitions:1 line:21 Map key:21 numPartitions:1 line:23 Map key:23 numPartitions:1 line:25 Map key:25 numPartitions:1 line:27 Map key:27 numPartitions:1 line:29 Map key:29 numPartitions:1 15/01/28 21:19:29 INFO mapred.MapTask: Finished spill 0 15/01/28 21:19:29 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 15/01/28 21:19:29 INFO mapred.LocalJobRunner: 15/01/28 21:19:29 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0‘ done. 15/01/28 21:19:29 INFO mapred.MapTask: io.sort.mb = 100 15/01/28 21:19:29 INFO mapred.MapTask: data buffer = 79691776/99614720 15/01/28 21:19:29 INFO mapred.MapTask: record buffer = 262144/327680 line:10 Map key:10 numPartitions:1 line:12 Map key:12 numPartitions:1 line:14 Map key:14 numPartitions:1 line:16 Map key:16 numPartitions:1 line:18 Map key:18 numPartitions:1 line:20 Map key:20 numPartitions:1 line:22 Map key:22 numPartitions:1 line:24 Map key:24 numPartitions:1 line:26 Map key:26 numPartitions:1 line:28 Map key:28 numPartitions:1 line:30 Map key:30 numPartitions:1 15/01/28 21:19:29 INFO mapred.MapTask: Starting flush of map output 15/01/28 21:19:29 INFO mapred.MapTask: Finished spill 0 15/01/28 21:19:29 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 15/01/28 21:19:29 INFO mapred.LocalJobRunner: 15/01/28 21:19:29 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0‘ done. 15/01/28 21:19:29 INFO mapred.MapTask: io.sort.mb = 100 15/01/28 21:19:30 INFO mapred.JobClient: map 100% reduce 0% 15/01/28 21:19:30 INFO mapred.MapTask: data buffer = 79691776/99614720 15/01/28 21:19:30 INFO mapred.MapTask: record buffer = 262144/327680 line:1 Map key:1 numPartitions:1 line:2 Map key:2 numPartitions:1 line:3 Map key:3 numPartitions:1 line:4 Map key:4 numPartitions:1 line:5 Map key:5 numPartitions:1 line:6 Map key:6 numPartitions:1 line:7 Map key:7 numPartitions:1 line:8 Map key:8 numPartitions:1 line:9 Map key:9 numPartitions:1 line:10 Map key:10 numPartitions:1 15/01/28 21:19:30 INFO mapred.MapTask: Starting flush of map output 15/01/28 21:19:30 INFO mapred.MapTask: Finished spill 0 15/01/28 21:19:30 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000002_0 is done. And is in the process of commiting 15/01/28 21:19:30 INFO mapred.LocalJobRunner: 15/01/28 21:19:30 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000002_0‘ done. 15/01/28 21:19:30 INFO mapred.LocalJobRunner: 15/01/28 21:19:30 INFO mapred.Merger: Merging 3 sorted segments 15/01/28 21:19:30 INFO mapred.Merger: Down to the last merge-pass, with 3 segments left of total size: 316 bytes 15/01/28 21:19:30 INFO mapred.LocalJobRunner: Reduce key:1 Reduce value:1 Reduce key:2 Reduce value:2 Reduce key:3 Reduce value:3 Reduce key:4 Reduce value:4 Reduce key:5 Reduce value:5 Reduce key:6 Reduce value:6 Reduce key:7 Reduce value:7 Reduce key:8 Reduce value:8 Reduce key:9 Reduce value:9 Reduce key:10 Reduce value:10 Reduce key:11 Reduce value:10 Reduce key:12 Reduce value:11 Reduce key:13 Reduce value:12 Reduce key:14 Reduce value:13 Reduce key:15 Reduce value:14 Reduce key:16 Reduce value:15 Reduce key:17 Reduce value:16 Reduce key:18 Reduce value:17 Reduce key:19 Reduce value:18 Reduce key:20 Reduce value:19 Reduce key:21 Reduce value:20 Reduce key:22 Reduce value:21 Reduce key:23 Reduce value:22 Reduce key:24 Reduce value:23 Reduce key:25 Reduce value:24 Reduce key:26 Reduce value:25 Reduce key:27 Reduce value:26 Reduce key:28 Reduce value:27 Reduce key:29 Reduce value:28 Reduce key:30 Reduce value:29 Reduce key:31 Reduce value:30 15/01/28 21:19:30 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 15/01/28 21:19:30 INFO mapred.LocalJobRunner: 15/01/28 21:19:30 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 15/01/28 21:19:30 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://hadoop:9000/usr/hadoop/output3 15/01/28 21:19:30 INFO mapred.LocalJobRunner: reduce > reduce 15/01/28 21:19:30 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0‘ done. 15/01/28 21:19:31 INFO mapred.JobClient: map 100% reduce 100% 15/01/28 21:19:31 INFO mapred.JobClient: Job complete: job_local_0001 15/01/28 21:19:31 INFO mapred.JobClient: Counters: 14 15/01/28 21:19:31 INFO mapred.JobClient: FileSystemCounters 15/01/28 21:19:31 INFO mapred.JobClient: FILE_BYTES_READ=67220 15/01/28 21:19:31 INFO mapred.JobClient: HDFS_BYTES_READ=261 15/01/28 21:19:31 INFO mapred.JobClient: FILE_BYTES_WRITTEN=138115 15/01/28 21:19:31 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=168 15/01/28 21:19:31 INFO mapred.JobClient: Map-Reduce Framework 15/01/28 21:19:31 INFO mapred.JobClient: Reduce input groups=30 15/01/28 21:19:31 INFO mapred.JobClient: Combine output records=0 15/01/28 21:19:31 INFO mapred.JobClient: Map input records=31 15/01/28 21:19:31 INFO mapred.JobClient: Reduce shuffle bytes=0 15/01/28 21:19:31 INFO mapred.JobClient: Reduce output records=31 15/01/28 21:19:31 INFO mapred.JobClient: Spilled Records=62 15/01/28 21:19:31 INFO mapred.JobClient: Map output bytes=248 15/01/28 21:19:31 INFO mapred.JobClient: Combine input records=0 15/01/28 21:19:31 INFO mapred.JobClient: Map output records=31 15/01/28 21:19:31 INFO mapred.JobClient: Reduce input records=31
1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 10 10 11 10 12 11 13 12 14 13 15 14 16 15 17 16 18 17 19 18 20 19 21 20 22 21 23 22 24 23 25 24 26 25 27 26 28 27 29 28 30 29 31 30
大儿子 爸爸
小儿子 爸爸
大女儿 爸爸
小女儿 爸爸
爸爸 爷爷
爸爸 二大爷
爸爸 三大爷
二女儿 妈妈
二儿子 妈妈
妈妈 爷爷
妈妈 二大爷
妈妈 三大爷
grandchild grandparent
二女儿 爷爷
二女儿 二大爷
二女儿 三大爷
二儿子 爷爷
二儿子 二大爷
MapReduce下的表与表或者表与自身的连接不会像传统SQL语句那样直接一个left join、right join就能出一个最终表,鉴于本场景的需求,需要进行表连接,一个左表、一个右表,都是同一张表,连接的条件是左表的parent列以及右表的child列,整个过程就是一个自连接过程。
public class STjoin { public static int time = 0; //map将输入分割成child和parent,然后正序输出一次作为右表,反//序输出一次作为左表,需要注意的是在输出的value中必须加上左右表//区别标志 public static class Map extends Mapper<Object, Text, Text, Text>{ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String childname = new String(); String parentname = new String(); String relationtype = new String(); String line = value.toString(); int i = 0; while(line.charAt(i)!=‘ ‘){ i++; } String[] values = {line.substring(0,i),line.substring(i+1)}; if(values[0].compareTo("child") != 0) { childname = values[0]; parentname = values[1]; relationtype = "1"; //左右表区分标志 context.write(new Text(values[1]), new Text(relationtype + "+" + childname + "+" + parentname)); System.out.println("左表 Map key:" + new Text(values[1]) + "\tvalue:" + (relationtype + "+" + childname + "+" + parentname) ); //左表 relationtype = "2"; context.write(new Text(values[0]), new Text(relationtype + "+" + childname + "+" + parentname)); System.out.println("右表 Map key:" + new Text(values[0]) + "\tvalue:" + (relationtype + "+" + childname + "+" + parentname) ); //右表 } } } public static class Reduce extends Reducer<Text,Text,Text,Text> { public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { if(time == 0){ //输出表头 context.write(new Text("grandchild"),new Text("grandparent")); time++; } int grandchildnum = 0; String grandchild[] = new String[10]; int grandparentnum = 0; String grandparent[] = new String[10]; Iterator ite = values.iterator(); while(ite.hasNext()) { String record = ite.next().toString(); int len = record.length(); int i = 2; if(len == 0) continue; char relationtype = record.charAt(0); String childname = new String(); String parentname = new String(); //获取value-list中value的child while(record.charAt(i) != ‘+‘) { childname = childname + record.charAt(i); i++; } i = i+1; //获取value-list中value的parent while(i < len) { parentname = parentname + record.charAt(i); i++; } //左表,取出child放入grandchild if(relationtype == ‘1‘){ grandchild[grandchildnum] = childname; grandchildnum++; } else{//右表,取出parent放入grandparent grandparent[grandparentnum] = parentname; grandparentnum++; } } //grandchild和grandparent数组求笛卡儿积 if(grandparentnum != 0 && grandchildnum != 0){ for(int m = 0; m < grandchildnum; m++){ for(int n = 0; n < grandparentnum; n++){ context.write(new Text(grandchild[m]),new Text(grandparent[n])); //输出结果 System.out.println("Reduce 孙子:" + grandchild[m] + "\t 爷爷:" + grandparent[n]); } } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "single table join"); job.setJarByClass(STjoin.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
15/01/28 22:06:28 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 15/01/28 22:06:28 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 15/01/28 22:06:28 INFO input.FileInputFormat: Total input paths to process : 2 15/01/28 22:06:28 INFO mapred.JobClient: Running job: job_local_0001 15/01/28 22:06:28 INFO input.FileInputFormat: Total input paths to process : 2 15/01/28 22:06:28 INFO mapred.MapTask: io.sort.mb = 100 15/01/28 22:06:28 INFO mapred.MapTask: data buffer = 79691776/99614720 15/01/28 22:06:28 INFO mapred.MapTask: record buffer = 262144/327680 左表 Map key:爸爸 value:1+大儿子+爸爸 右表 Map key:大儿子 value:2+大儿子+爸爸 左表 Map key:爸爸 value:1+小儿子+爸爸 右表 Map key:小儿子 value:2+小儿子+爸爸 左表 Map key:爸爸 value:1+大女儿+爸爸 右表 Map key:大女儿 value:2+大女儿+爸爸 左表 Map key:爸爸 value:1+小女儿+爸爸 右表 Map key:小女儿 value:2+小女儿+爸爸 左表 Map key:爷爷 value:1+爸爸+爷爷 右表 Map key:爸爸 value:2+爸爸+爷爷 左表 Map key:二大爷 value:1+爸爸+二大爷 右表 Map key:爸爸 value:2+爸爸+二大爷 左表 Map key:三大爷 value:1+爸爸+三大爷 右表 Map key:爸爸 value:2+爸爸+三大爷 15/01/28 22:06:28 INFO mapred.MapTask: Starting flush of map output 15/01/28 22:06:28 INFO mapred.MapTask: Finished spill 0 15/01/28 22:06:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 15/01/28 22:06:28 INFO mapred.LocalJobRunner: 15/01/28 22:06:28 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0‘ done. 15/01/28 22:06:28 INFO mapred.MapTask: io.sort.mb = 100 15/01/28 22:06:28 INFO mapred.MapTask: data buffer = 79691776/99614720 15/01/28 22:06:28 INFO mapred.MapTask: record buffer = 262144/327680 左表 Map key:妈妈 value:1+二女儿+妈妈 右表 Map key:二女儿 value:2+二女儿+妈妈 左表 Map key:妈妈 value:1+二儿子+妈妈 右表 Map key:二儿子 value:2+二儿子+妈妈 左表 Map key:爷爷 value:1+妈妈+爷爷 右表 Map key:妈妈 value:2+妈妈+爷爷 左表 Map key:二大爷 value:1+妈妈+二大爷 右表 Map key:妈妈 value:2+妈妈+二大爷 左表 Map key:三大爷 value:1+妈妈+三大爷 右表 Map key:妈妈 value:2+妈妈+三大爷 15/01/28 22:06:28 INFO mapred.MapTask: Starting flush of map output 15/01/28 22:06:28 INFO mapred.MapTask: Finished spill 0 15/01/28 22:06:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 15/01/28 22:06:28 INFO mapred.LocalJobRunner: 15/01/28 22:06:28 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0‘ done. 15/01/28 22:06:28 INFO mapred.LocalJobRunner: 15/01/28 22:06:28 INFO mapred.Merger: Merging 2 sorted segments 15/01/28 22:06:28 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 697 bytes 15/01/28 22:06:28 INFO mapred.LocalJobRunner: Reduce 孙子:二女儿 爷爷:爷爷 Reduce 孙子:二女儿 爷爷:二大爷 Reduce 孙子:二女儿 爷爷:三大爷 Reduce 孙子:二儿子 爷爷:爷爷 Reduce 孙子:二儿子 爷爷:二大爷 Reduce 孙子:二儿子 爷爷:三大爷 Reduce 孙子:大儿子 爷爷:爷爷 Reduce 孙子:大儿子 爷爷:二大爷 Reduce 孙子:大儿子 爷爷:三大爷 Reduce 孙子:小儿子 爷爷:爷爷 Reduce 孙子:小儿子 爷爷:二大爷 Reduce 孙子:小儿子 爷爷:三大爷 Reduce 孙子:大女儿 爷爷:爷爷 Reduce 孙子:大女儿 爷爷:二大爷 Reduce 孙子:大女儿 爷爷:三大爷 Reduce 孙子:小女儿 爷爷:爷爷 Reduce 孙子:小女儿 爷爷:二大爷 Reduce 孙子:小女儿 爷爷:三大爷 15/01/28 22:06:28 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 15/01/28 22:06:28 INFO mapred.LocalJobRunner: 15/01/28 22:06:28 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 15/01/28 22:06:28 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://hadoop:9000/usr/hadoop/output4 15/01/28 22:06:28 INFO mapred.LocalJobRunner: reduce > reduce 15/01/28 22:06:28 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0‘ done. 15/01/28 22:06:29 INFO mapred.JobClient: map 100% reduce 100% 15/01/28 22:06:29 INFO mapred.JobClient: Job complete: job_local_0001 15/01/28 22:06:29 INFO mapred.JobClient: Counters: 14 15/01/28 22:06:29 INFO mapred.JobClient: FileSystemCounters 15/01/28 22:06:29 INFO mapred.JobClient: FILE_BYTES_READ=50580 15/01/28 22:06:29 INFO mapred.JobClient: HDFS_BYTES_READ=515 15/01/28 22:06:29 INFO mapred.JobClient: FILE_BYTES_WRITTEN=103312 15/01/28 22:06:29 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=369 15/01/28 22:06:29 INFO mapred.JobClient: Map-Reduce Framework 15/01/28 22:06:29 INFO mapred.JobClient: Reduce input groups=12 15/01/28 22:06:29 INFO mapred.JobClient: Combine output records=0 15/01/28 22:06:29 INFO mapred.JobClient: Map input records=12 15/01/28 22:06:29 INFO mapred.JobClient: Reduce shuffle bytes=0 15/01/28 22:06:29 INFO mapred.JobClient: Reduce output records=19 15/01/28 22:06:29 INFO mapred.JobClient: Spilled Records=48 15/01/28 22:06:29 INFO mapred.JobClient: Map output bytes=645 15/01/28 22:06:29 INFO mapred.JobClient: Combine input records=0 15/01/28 22:06:29 INFO mapred.JobClient: Map output records=24 15/01/28 22:06:29 INFO mapred.JobClient: Reduce input records=24
grandchild grandparent
二女儿 爷爷
二女儿 二大爷
二女儿 三大爷
二儿子 爷爷
二儿子 二大爷
二儿子 三大爷
大儿子 爷爷
大儿子 二大爷
大儿子 三大爷
小儿子 爷爷
小儿子 二大爷
小儿子 三大爷
大女儿 爷爷
大女儿 二大爷
大女儿 三大爷
小女儿 爷爷
小女儿 二大爷
小女儿 三大爷