码迷,mamicode.com
首页 > 其他好文 > 详细

也来看看hadoop的WordCount

时间:2016-05-13 11:33:19      阅读:143      评论:0      收藏:0      [点我收藏+]

标签:

其实这个例子都是书上的,我也只是拿过来理解学习下。

WordCount是Hadoop中的Hello, world,这是我听得最多的一个表述。

下面是WordCount.java的源码

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

	/* 这个类实现Mapper接口中的map方法,
	 * 输入参数中的value是文本文件中的一行,
	 * 利用StringTokenizer将这个字符串拆成单词,
	 * 然后将输出结果<单词,1>
	 * 写入到org.apache.hadoop.mapred.OutputCollector中
	 */

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
	/* 代码中LongWritable, IntWritable, Text
	 * 均是Hadoop中实现的用于封装Java数据类型的类,
	 * 这些类都能够被串行化从而便于在分布式环境中进行数据交换,
	 * 可以将它们分别视为long, int, String的替代品
	 */
	  
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
	  // 将每一行变为字符串, 并进行分析, 最后变为一个Iterator
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  /* 这个类实现Reducer接口中的reduce方法, 输入参数中的key, values是由Map任务输出的
   * 中间结果, values是一个Iterator
   */
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
	  /* 遍历这个Iterator, 就可以得到属于同一个key的所有的values.
	   * 此处, key是一个单词, values是词频
	   */
	  // 依次获得每个词的词频
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
	// 这个配置从何而来, 往哪里去呢
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
	// 新建一个job, 并读取配置文件conf, 不知道是不是读取安装目录下的配置文件
    Job job = new Job(conf, "word count");
	// 下面这几行都是设置编译好的类
    job.setJarByClass(WordCount.class);
	// 实现map函数, 完成输入的<key, value>对到中间结果的映射
    job.setMapperClass(TokenizerMapper.class);
	// 实现combine函数, 将中间结果的重复key进行合并
    job.setCombinerClass(IntSumReducer.class);
	// 实现reduce函数, 对中间结果进行合并, 形成最终结果
    job.setReducerClass(IntSumReducer.class);
	// 输出的最终结果中key的类型
    job.setOutputKeyClass(Text.class);
	// 输出的最终结果中value的类型
    job.setOutputValueClass(IntWritable.class);
	// 设定job的输入目录, job运行时会处理输入目录下的所有文件
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	// 设定job的输出目录, job的最终结果会写入输出目录下
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

由于要编译执行,用的是hadoop命令,就来看看hadoop吧

HADOOP_HOME/bin/hadoop
有这样三段话

# part 1
# 设置java命令的路径所在
JAVA=$JAVA_HOME/bin/java

# part 2
# 假如我们在hadoop后面接的是jar, 则会进行一系列设置
elif [ "$COMMAND" = "jar" ] ; then
  CLASS=org.apache.hadoop.util.RunJar
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"

# 后面这一大段是处在文件的最后
# 主要分成安全模式和非安全模式下
# 安全模式下会执行一些设置, 才会运行
# 非安全模式下直接运行
# 说到底最后还是都是在java虚拟机上运行的

# part 3
# Check to see if we should start a secure datanode
if [ "$starting_secure_dn" = "true" ]; then
  if [ "$HADOOP_PID_DIR" = "" ]; then
    HADOOP_SECURE_DN_PID="/tmp/hadoop_secure_dn.pid"
  else
    HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid"
  fi

  if [[ $JSVC_HOME ]]; then
    JSVC="$JSVC_HOME/jsvc"
  else
    if [ "$JAVA_PLATFORM" = "Linux-amd64-64" ]; then
      JSVC_ARCH="amd64"
    else
      JSVC_ARCH="i386"
    fi
    JSVC="$HADOOP_HOME/libexec/jsvc.${JSVC_ARCH}"
  fi

  if [[ ! $JSVC_OUTFILE ]]; then
    JSVC_OUTFILE="$HADOOP_LOG_DIR/jsvc.out"
  fi

  if [[ ! $JSVC_ERRFILE ]]; then
    JSVC_ERRFILE="$HADOOP_LOG_DIR/jsvc.err"
  fi

  exec "$JSVC" -Dproc_$COMMAND -outfile "$JSVC_OUTFILE"                -errfile "$JSVC_ERRFILE"                -pidfile "$HADOOP_SECURE_DN_PID"                -nodetach                -user "$HADOOP_SECURE_DN_USER"                -cp "$CLASSPATH"                $JAVA_HEAP_MAX $HADOOP_OPTS                org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter "$@"
else
  # run it
  exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"
fi

最后这个java执行的命令好长一串啊, 不过分析下觉得
前面的-Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS感觉是对java虚拟机的设置
后面的-classpath则是执行需要那些jar包
"$CLASSPATH" 是根据命令设置的一些jar所在地
$CLASS 如果针对于hadoop运行MapReduce这里的就是org.apache.hadoop.util.RunJar这个jar包

总的过程包括有:
Map类的实现;
Reduce类的实现;
Job的创建以及设置;
运行Job。

也来看看hadoop的WordCount

标签:

原文地址:http://www.cnblogs.com/tuhooo/p/5486933.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!