标签:内容 path code [] dex object asm closed xxx
MapReduce常见编程实例集锦。
输入数据:
file1.csv内容
hellod world
file2.csv内容
hellod hadoop
输出结果:
hadoop 1 hello 2 world 1
实现代码及源码分析:
1 package com.hadoop.kwang; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 16 public class WordCount { 17 18 /** 19 * Mapper类 20 * 21 * Object和Text是输入数据的<key,value>类型 22 * Text和IntWritable是输出数据的<key,value>类型 23 */ 24 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { 25 26 private final static IntWritable one = new IntWritable(1); 27 private Text word = new Text(); 28 29 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 30 31 //读取一行的文本,并进行分割 32 StringTokenizer itr = new StringTokenizer(value.toString()); 33 34 //遍历读取并记录分割后的每一个单词 35 while (itr.hasMoreTokens()) { 36 word.set(itr.nextToken()); 37 38 //输出的<key,value>形式都是:<"word",1> 39 context.write(word, one); 40 } 41 } 42 } 43 44 /** 45 * Reducer类 46 * 47 */ 48 public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 49 private IntWritable result = new IntWritable(); 50 51 public void reduce(Text key, Iterable<IntWritable> values, Context context) 52 throws IOException, InterruptedException { 53 //统计单词次数 54 int sum = 0; 55 56 //values是某个key对应的value的集合,即<key,value-list>,比如<hello, <1,1>>,values是值的集合 57 for (IntWritable val : values) { 58 //对所有value进行累加 59 sum += val.get(); 60 } 61 result.set(sum); 62 context.write(key, result); 63 } 64 } 65 66 public static void main(String[] args) throws Exception { 67 68 Configuration conf = new Configuration(); 69 70 //配置输入输出路径 71 String input = "hdfs://0.0.0.0:xxx/hadoop/wordcount/input/"; 72 String output = "hdfs://0.0.0.0:xxx/hadoop/wordcount/output/"; 73 74 Job job = new Job(conf, "word count"); 75 job.setJarByClass(WordCount.class); 76 job.setMapperClass(TokenizerMapper.class); //为job设置Mapper类 77 job.setCombinerClass(IntSumReducer.class); //为job设置Conbiner类 78 job.setReducerClass(IntSumReducer.class); //为job设置Reducer类 79 80 job.setOutputKeyClass(Text.class); //设置输出key类型 81 job.setOutputValueClass(IntWritable.class); //设置输出value类型 82 83 FileInputFormat.addInputPath(job, new Path(input)); //设置数据输入路径 84 FileOutputFormat.setOutputPath(job, new Path(output)); //设置数据输出路径 85 86 System.exit(job.waitForCompletion(true) ? 0 : 1); 87 } 88 }
标签:内容 path code [] dex object asm closed xxx
原文地址:https://www.cnblogs.com/walker-/p/9669631.html