标签:
其实这个例子都是书上的,我也只是拿过来理解学习下。
WordCount是Hadoop中的Hello, world,这是我听得最多的一个表述。
下面是WordCount.java的源码
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { /* 这个类实现Mapper接口中的map方法, * 输入参数中的value是文本文件中的一行, * 利用StringTokenizer将这个字符串拆成单词, * 然后将输出结果<单词,1> * 写入到org.apache.hadoop.mapred.OutputCollector中 */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /* 代码中LongWritable, IntWritable, Text * 均是Hadoop中实现的用于封装Java数据类型的类, * 这些类都能够被串行化从而便于在分布式环境中进行数据交换, * 可以将它们分别视为long, int, String的替代品 */ public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); // 将每一行变为字符串, 并进行分析, 最后变为一个Iterator while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } /* 这个类实现Reducer接口中的reduce方法, 输入参数中的key, values是由Map任务输出的 * 中间结果, values是一个Iterator */ public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; /* 遍历这个Iterator, 就可以得到属于同一个key的所有的values. * 此处, key是一个单词, values是词频 */ // 依次获得每个词的词频 for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { // 这个配置从何而来, 往哪里去呢 Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } // 新建一个job, 并读取配置文件conf, 不知道是不是读取安装目录下的配置文件 Job job = new Job(conf, "word count"); // 下面这几行都是设置编译好的类 job.setJarByClass(WordCount.class); // 实现map函数, 完成输入的<key, value>对到中间结果的映射 job.setMapperClass(TokenizerMapper.class); // 实现combine函数, 将中间结果的重复key进行合并 job.setCombinerClass(IntSumReducer.class); // 实现reduce函数, 对中间结果进行合并, 形成最终结果 job.setReducerClass(IntSumReducer.class); // 输出的最终结果中key的类型 job.setOutputKeyClass(Text.class); // 输出的最终结果中value的类型 job.setOutputValueClass(IntWritable.class); // 设定job的输入目录, job运行时会处理输入目录下的所有文件 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // 设定job的输出目录, job的最终结果会写入输出目录下 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
由于要编译执行,用的是hadoop命令,就来看看hadoop吧
HADOOP_HOME/bin/hadoop
有这样三段话
# part 1 # 设置java命令的路径所在 JAVA=$JAVA_HOME/bin/java # part 2 # 假如我们在hadoop后面接的是jar, 则会进行一系列设置 elif [ "$COMMAND" = "jar" ] ; then CLASS=org.apache.hadoop.util.RunJar HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" # 后面这一大段是处在文件的最后 # 主要分成安全模式和非安全模式下 # 安全模式下会执行一些设置, 才会运行 # 非安全模式下直接运行 # 说到底最后还是都是在java虚拟机上运行的 # part 3 # Check to see if we should start a secure datanode if [ "$starting_secure_dn" = "true" ]; then if [ "$HADOOP_PID_DIR" = "" ]; then HADOOP_SECURE_DN_PID="/tmp/hadoop_secure_dn.pid" else HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid" fi if [[ $JSVC_HOME ]]; then JSVC="$JSVC_HOME/jsvc" else if [ "$JAVA_PLATFORM" = "Linux-amd64-64" ]; then JSVC_ARCH="amd64" else JSVC_ARCH="i386" fi JSVC="$HADOOP_HOME/libexec/jsvc.${JSVC_ARCH}" fi if [[ ! $JSVC_OUTFILE ]]; then JSVC_OUTFILE="$HADOOP_LOG_DIR/jsvc.out" fi if [[ ! $JSVC_ERRFILE ]]; then JSVC_ERRFILE="$HADOOP_LOG_DIR/jsvc.err" fi exec "$JSVC" -Dproc_$COMMAND -outfile "$JSVC_OUTFILE" -errfile "$JSVC_ERRFILE" -pidfile "$HADOOP_SECURE_DN_PID" -nodetach -user "$HADOOP_SECURE_DN_USER" -cp "$CLASSPATH" $JAVA_HEAP_MAX $HADOOP_OPTS org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter "$@" else # run it exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@" fi
最后这个java执行的命令好长一串啊, 不过分析下觉得
前面的-Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS感觉是对java虚拟机的设置
后面的-classpath则是执行需要那些jar包
"$CLASSPATH" 是根据命令设置的一些jar所在地
$CLASS 如果针对于hadoop运行MapReduce这里的就是org.apache.hadoop.util.RunJar这个jar包
总的过程包括有:
Map类的实现;
Reduce类的实现;
Job的创建以及设置;
运行Job。
标签:
原文地址:http://www.cnblogs.com/tuhooo/p/5486933.html