相关代码:
1 package com.hadoop; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 import java.io.IOException; 14 import java.util.StringTokenizer; 15 16 public class WordCount { 17 18 19 /** 20 * Mapper接口是个泛型类型,它有4个形式参数类型,分别指定map函数的输入键、输入值、输出键和输出值的类型。 21 * WordCount为例:输入键是一个长整数偏移量,输入的值是一行文本,输出的键是单词,输出的值是单词个数(整型) 22 * Hadoop规定了自己的一套用于网络序列化的基本类型,而不直接使用Java内嵌的类型。这些类型在org.apache.hadoop.io包中。 23 * LongWritable类型相当于Java的Long类型 24 * Text类型相当于Java的String类型 25 * IntWritable类型相当于Java的Integer类型 26 27 */ 28 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { 29 30 private final static IntWritable one = new IntWritable(1); 31 private Text word = new Text(); 32 33 /** 34 * 35 * @param key 36 * @param value 37 * @param context 38 * @throws IOException 39 * @throws InterruptedException 40 * map( )方法的输入是一个键和一个值。首先使用StringTokenizer类将输入的Text值转换成String类型,然后使用nextToken( )方法将单词提取出来。 41 * map( )方法还提供Context实例用于输出内容的写入。将单词数据按照Text类型进行读写,因为单词作为键。将单词数据数封装为IntWritable类型。 42 */ 43 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 44 StringTokenizer itr = new StringTokenizer(value.toString()); // 有三个重载方法,这里以空白字符(“ ”,“\t”,“\n”)为分隔符分割字符串 45 while (itr.hasMoreTokens()) { // 判断是否还有分隔符 46 // set方法将String转换成Text 47 // nextToken返回当前位置到下一个分隔符位置的字符串 48 word.set(itr.nextToken()); 49 context.write(word, one); // 使用Context实例用于输出内容的写入 50 } 51 } 52 } 53 54 /** 55 * reduce函数也有四个形式参数类型用于指定输入和输出类型。reduce函数的输入类型必须匹配map函数的输出类型:即Text类型和IntWritable类型。 56 */ 57 public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 58 private IntWritable result = new IntWritable(); 59 60 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 61 int sum = 0; 62 for (IntWritable val : values) { // 遍历相同的key(单词)对应的values,并进行相加 63 sum += val.get(); 64 } 65 result.set(sum); 66 context.write(key, result); // 将统计的数目赋给每一个不同的单词 67 } 68 } 69 70 public static void main(String[] args) throws Exception { 71 /** 72 * Configuration类是作业的配置信息类,任何作用的配置信息必须通过Configuration传递, 73 * 因为通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息。 74 */ 75 Configuration conf = new Configuration(); 76 Job job = Job.getInstance(conf, "word count"); //Job对象制定作业执行规范,用它来控制整个作业的运行。 77 78 /** 79 * 在Hadoop集群上运行这个作业时,要把代码打包成一个JAR包,发布在集群上。 80 * 不必明确指定JAR文件的名称,在Job对象的setJarByClass( )方法中传递一个类即可,Hadoop利用这个类查找包含它的JAR文件。 81 */ 82 job.setJarByClass(WordCount.class); 83 84 /** 85 * setMapperClass( ) 和setReducerClass( )方法指定要用的map类型和reduce类型 86 */ 87 job.setMapperClass(TokenizerMapper.class); 88 job.setReducerClass(IntSumReducer.class); 89 job.setCombinerClass(IntSumReducer.class); 90 91 92 /** 93 * setOutputKeyClass( ) 和setOutputValueClass( )方法控制reduce函数的输出类型,必须要和Reduce类产生的相匹配。 94 * 输入的类型没有设置,因为使用了默认的TextInputFormat(文本输入格式) 95 */ 96 job.setOutputKeyClass(Text.class); 97 job.setOutputValueClass(IntWritable.class); 98 99 /** 100 * FileInputFormat类的静态方法addInputPath( )来指定输入数据的路径 101 * 该路径可以是单个的文件、一个目录或符合特定文件模式的一系列文件。 102 * ‘可以多次调用addInputPath( )来实现多路径的输入。 103 */ 104 FileInputFormat.addInputPath(job, new Path(args[0])); 105 106 /** 107 * FileOutputFormat类中的静态方法setOutputPath( )来指定输出路径(只能有一个输出路径),即reduce函数输出文件的写入目录。 108 * 在运行作业前该目录不能存在,否则Hadoop会报错并拒绝运行作业。 109 * 目的:防止数据丢失,假如一个作业运行了很久才得出结果,现在被另一个作业不小心覆盖会令人崩溃。 110 */ 111 FileOutputFormat.setOutputPath(job, new Path(args[1])); 112 113 114 /** 115 * waitForCompletion( )方法提交作业并等待执行完成。该方法的唯一参数是一个标识,指示是否已生成详细输出。 116 */ 117 System.exit(job.waitForCompletion(true) ? 0 : 1); 118 } 119 }
运行结果: