码迷,mamicode.com
首页 > 其他好文 > 详细

tf-idf hadoop map reduce

时间:2014-11-28 21:15:21      阅读:475      评论:0      收藏:0      [点我收藏+]

标签:blog   io   ar   os   sp   java   for   文件   on   

package com.jumei.robot.mapreduce.tfidf;

import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.jumei.robot.common.beans.Word;
import com.jumei.robot.preprocess.IFilterStopwordService;
import com.jumei.robot.preprocess.IWordSegService;

/**
 * <pre>
 * TF-IDF 算法MapReduce实现
 * 分3job
 * job 1: 统计文档中单词在该文档中出现的次数(n)及该文档所有单词的总数(N)
 * job 2: 统计单词所包含的文档数(d),根据所有文档的总数(D),计算tf-idf值
 * job 3: 对job2进行排序,输出tf-idf值最大的前top N个词
 * 数学公式:
 * tf = n / N
 * idf = Math.log(D / d); 
 * tf-idf = tf * idf 
 * </pre>
 * @author deyin
 *
 */
public class TfIdfMapReduce {

  private static Configuration conf;

  public static void main(String[] args) throws Exception {
    conf = new Configuration();
    if (args.length < 3) {
      System.err.println("arguments invalid, usgae: hadoop jar tfidf.jar com.jumei.robot.mapreduce.tfidf.TfIdfMapReduce <hdfs input folder> <hdfs output folder> <number of documents> <topN>");
      return;
    }

    String pathin = args[0];
    String pathout = args[1];
    int nrOfDocuments = Integer.parseInt(args[2]);
    int topN = Integer.parseInt(args[3]);

    System.out.println("==========================================");
    System.out.println("pathin: " + pathin);
    System.out.println("pathout: " + pathout);
    System.out.println("nrOfDocuments: " + nrOfDocuments);
    System.out.println("topN: " + topN);
    System.out.println("==========================================");

    FileSystem fs = FileSystem.get(conf);

    if (!fs.exists(new Path(pathout))) {
      fs.mkdirs(new Path(pathout));
    }

    Path firstJobOut  = new Path(pathout, "job1_output");
    Path secondJobOut = new Path(pathout, "job2_output");
    Path thirdJobOut = new Path(pathout, "job3_output");
    // empty if exists output
    fs.delete(firstJobOut, true);
    fs.delete(secondJobOut, true);
    fs.delete(thirdJobOut, true);
    
    // Run job 1
    runFirstJob(new Path(pathin), firstJobOut, nrOfDocuments);

    // Run job 2
    runSecondJob(firstJobOut, secondJobOut, nrOfDocuments); // job1的输出作为job2的输入+
    
    // Run job 3
    runThirdJob(secondJobOut, thirdJobOut, topN); // job1的输出作为job2的输入+
    
  }
  
  private static int runFirstJob(Path pathin, Path pathout, final int reduceTaskSize) throws Exception {
    String jobName = "tfidf_first_job";
    System.out.println("==================" + jobName + "=======================");

    Job job = new Job(conf, jobName); 
    job.setJarByClass(TfIdfMapReduce.class);

    job.setMapperClass(FirstMapReduce.Mapper.class);
    job.setCombinerClass(FirstMapReduce.Combiner.class);
    job.setReducerClass(FirstMapReduce.Reducer.class);
    
    job.setNumReduceTasks(reduceTaskSize);
    // 自定义分区器
    job.setPartitionerClass(FirstMapReduce.Partitioner.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, pathin);
    FileOutputFormat.setOutputPath(job, pathout);

    boolean success = job.waitForCompletion(true);
    return success ? 0 : -1;
  }

  private static int runSecondJob(Path pathin, Path pathout, final int nrOfDocuments) throws Exception {
    String jobName = "tfidf_second_job";
    System.out.println("==================" + jobName + "=======================");
    
    conf.setInt("nrOfDocuments", nrOfDocuments);
    Job job = new Job(conf, jobName);
    
    job.setJarByClass(TfIdfMapReduce.class);

    job.setMapperClass(SecondMapReduce.Mapper.class);
    job.setCombinerClass(SecondMapReduce.Combiner.class);
    job.setReducerClass(SecondMapReduce.Reducer.class);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    
    FileInputFormat.addInputPath(job, pathin);
    FileOutputFormat.setOutputPath(job, pathout);

    boolean success = job.waitForCompletion(true);
    return success ? 0 : -1;
  }
  
  private static int runThirdJob(Path pathin, Path pathout, final int topN) throws Exception {
    String jobName = "tfidf_third_job";
    System.out.println("==================" + jobName + "=======================");
    
    conf.setInt("topN", topN);
    conf.set("topN_out", new Path(pathin.getParent(), "" + topN).getName());
    Job job = new Job(conf, jobName);
    
    job.setJarByClass(TfIdfMapReduce.class);
    
    job.setMapperClass(ThirdMapReduce.Mapper.class);
    job.setReducerClass(ThirdMapReduce.Reducer.class);
    
    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    
    MultipleOutputs.addNamedOutput(job, "top" + topN, TextOutputFormat.class, Text.class, Text.class);
    
    FileInputFormat.addInputPath(job, pathin);
    FileOutputFormat.setOutputPath(job, pathout);
    
    boolean success = job.waitForCompletion(true);
    return success ? 0 : -1;
  }

  static class FirstMapReduce {

    // 分词接口
    static IWordSegService wordSegService;
    
    //停用词过滤接口
    static IFilterStopwordService filterStopwordService;
    
    static {
      init();
    }
    
    static void init() {
      ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath*:spring/robot-preprocess.xml");
      wordSegService = (IWordSegService) ctx.getBean("wordSegService");
      filterStopwordService = (IFilterStopwordService) ctx.getBean("filterStopwordService");
    }

    static class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {

      static final Text one = new Text("1");

      String filename = "";

      long totalWordCount = 0; // 当前文档中单词总数

      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("=================" + context.getJobName() + " map================");
      }
      
      @Override
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // Segment the line into words and output each word
        // Input (LineNr, Line in document)
        // Output (filename;word, 1)
        String line = value.toString();
        if (line.trim().isEmpty() || line.startsWith("#")) { // ignore empty or comment line
          return;
        }
        FileSplit split = (FileSplit) context.getInputSplit();
        filename = split.getPath().toString();
        // 分词
        Collection<Word> words = wordSegService.segment(line);
        // 去掉停用词
        filterStopwordService.filter(words);
        for (Word word : words) {
          String outputKey = filename + ";" + word.getName();
          //System.out.println("<" + outputKey + ", " + one.toString() + ">");
          context.write(new Text(outputKey), one);
          ++totalWordCount;
        } // end for
      } // end map
      
      @Override
      protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(new Text(filename + ";" + "!"), new Text("" + totalWordCount)); // 写入文件中词的总数目, ‘!‘的ascii码比所有字母都小,sort后排在最前面
      }
      
    } // end class Mapper
    
    static class Partitioner extends org.apache.hadoop.mapreduce.Partitioner<Text, Text> {
      @Override
      public int getPartition(Text key, Text value, int numPartitions) {
        // partition by filename
        StringTokenizer tokenizer = new StringTokenizer(key.toString(), ";");
        String filename = tokenizer.nextToken();
        int hashCode = new Text(filename).hashCode();
        return Math.abs((hashCode * 127) % numPartitions);  
      }
    } // end class Partitioner
    
    static class Combiner extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
      
      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("=================" + context.getJobName() + " combiner================");
      }
      
      long totalWordCount = 0;
      @Override
      protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // Calculate word count of each document and total word count
        // Input  (filename;word, 1)
        // Output (word;filename, n;N)
        StringTokenizer tokenizer = new StringTokenizer(key.toString(), ";");
        String filename = tokenizer.nextToken();
        String word = tokenizer.nextToken();
        if(word.endsWith("!")) {
          for (Text value : values) {
            totalWordCount = Long.parseLong(value.toString());
            System.out.println("File " + filename + " total word count " + totalWordCount);
            return;
          }
        }
        long wordCount = 0;
        for(Text value: values) {
          wordCount += Integer.parseInt(value.toString());
        }
        String outputKey = word + ";" + filename;
        String outputValue = wordCount + ";" + totalWordCount;
        //System.out.println("<" + outputKey + ", " + outputValue + ">");
        context.write(new Text(outputKey), new Text(outputValue));
      } // end reduce
    } // end class Combiner
    
    static class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
      
      @Override
      protected void setup(Context context)
          throws IOException, InterruptedException {
        System.out.println("=================" + context.getJobName() + " reducer================");
      }
      
      protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
          //System.out.println("<" + key.toString() + ", " + value.toString() + ">");
          context.write(key, value);
        }
      }
    } // end reduce
  } // end class reducer
  
  static class SecondMapReduce {
    
    static class Mapper extends org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, Text> {

      static Text one = new Text("1");
      
      @Override
      protected void setup(Context context)
          throws IOException, InterruptedException {
        System.out.println("=================" + context.getJobName() + " map================");
      }
      
      @Override
      protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        // Word occur in document
        // Input  (word;filename, n;N)
        // Output (word;filename;n;N, 1)
        String outputKey = key.toString() + ";" + value.toString();
        String outputValue = one.toString();
        //System.out.println("<" + outputKey + ", " + outputValue  + ">");
        context.write(new Text(outputKey), one);
      }
    } // end map
  
    static class Combiner extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
      
      int D = 1;
      
      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        D = context.getConfiguration().getInt("nrOfDocuments", 0);
        System.out.println("=================" + context.getJobName() + " combiner================");
      }
      
      protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // Calculate word contains document count
        // Input  (word;filename;n;N, 1)
        // Output (word;filename;n;N, d;D)
        int d = 0; // 该单词包含的文档总数
        for (Text value : values) {
          d += Integer.parseInt(value.toString());
        }
        String outputKey = key.toString(); 
        String outputValue = d + ";" + D;
        //System.out.println("<" + outputKey + ", " + outputValue  + ">");
        context.write(key, new Text(outputValue));
      } // end reduce
    } // end class Combiner
    
    static class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
      
      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("=================" + context.getJobName() + " reducer================");
      }
      
      protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // Calculate tf-idf
        // Input  (word;filename;n;N, d;D)
        // Output (word;filename, tf-idf)
        StringTokenizer keyTokenizer = new StringTokenizer(key.toString(), ";");
        String word = keyTokenizer.nextToken();
        String filename = keyTokenizer.nextToken();
        long n = Long.parseLong(keyTokenizer.nextToken()); // 单词出现次数
        long N = Long.parseLong(keyTokenizer.nextToken()); // 单词总数
        StringTokenizer valueTokenizer = new StringTokenizer(values.iterator().next().toString(), ";");
        int d = Integer.parseInt(valueTokenizer.nextToken()); // 单词包含的文档数
        int D = Integer.parseInt(valueTokenizer.nextToken()); // 文档总数
        double tf = n / 1.0d / N;
        double idf = Math.log(D / 1.0d / d);
        double tfidf = tf * idf;
        String outputKey = word + ";" + filename;
        String outputValue = "" + tfidf;
        //System.out.println("<" + outputKey + ", " + outputValue  + ">");
        context.write(new Text(outputKey), new Text(outputValue));
      } // end reduce
      
      @Override
      protected void cleanup(Context context) throws IOException, InterruptedException {
        super.cleanup(context);
      }
    } // end Reducer
  } // end class SecondMapReduce
  
  static class ThirdMapReduce {
    
    static class Pair implements Comparable<Pair>{
      final String key; 
      final Double value;
      public Pair(String key, Double value) {
        this.key = key;
        this.value = value;
      }

      public int compareTo(Pair o) {
        int value = o.value.compareTo(this.value);
        if(value != 0) {
          return value;
        }
        return o.key.compareTo(this.key);
      }
      
      @Override
      public String toString() {
        return key;
      }
    }

    static class Mapper extends org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, Text> {
      
      static TreeMap<Pair, String> treemap = new TreeMap<Pair, String>(new Comparator<Pair>() {
        public int compare(Pair o1, Pair o2) {
          return o1.compareTo(o2);
        }
      });
      
      int topN;
      
      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        topN = context.getConfiguration().getInt("topN", 100); // default 100
        System.out.println("=================" + context.getJobName() + " map================");
      }
      
      @Override
      protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        // Input  (word;filename, tf-idf)
        treemap.put(new Pair(key.toString(), Double.parseDouble(value.toString())), value.toString());
        if(treemap.size() > topN) {
          treemap.remove(treemap.lastKey());
        }
      } // end map
      
      @Override
      protected void cleanup(Context context) throws IOException, InterruptedException {
        Set<Entry<Pair,String>> entrySet = treemap.entrySet();
        for (Entry<Pair, String> entry : entrySet) {
          String outputKey = entry.getKey().toString();
          String outputValue = entry.getValue();
          //System.out.println("<" + outputKey + ", " + outputValue  + ">");
          context.write(new Text(outputKey), new Text(outputValue));
        }
      }
    } // end class mapper
    
    static class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
      
      int topN;
      
      static TreeMap<Pair, String> treemap = new TreeMap<Pair, String>(new Comparator<Pair>() {
        public int compare(Pair o1, Pair o2) {
          return o1.compareTo(o2);
        }
      });
      
      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        topN = context.getConfiguration().getInt("topN", 100); // default 100
        System.out.println("=================" + context.getJobName() + " reduce================");
      }
      
      @Override
      protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
        // Input (word;filename, tf-idf)
        Text value = values.iterator().next();
        treemap.put(new Pair(key.toString(), Double.parseDouble(value.toString())), value.toString());
        if(treemap.size() > topN) {
          treemap.remove(treemap.lastKey());
        }
      } // end reduce
      
      @Override
      protected void cleanup(Context context) throws IOException, InterruptedException {
        // 输出前topN个
        String path = context.getConfiguration().get("topN_out");
        MultipleOutputs<Text, Text> output = null;
        try {
          output = new MultipleOutputs<Text, Text>(context);
          Set<Entry<Pair, String>> entrySet = treemap.entrySet();
          System.out.println("================TF-IDF top " + topN + "==================");
          for (Entry<Pair, String> entry : entrySet) {
            String key = entry.getKey().toString();
            String value = entry.getValue();
            output.write("top" + topN, key, value, path);
            System.out.println("<" + key + ", " + value  + ">");
          }
        } catch (IOException e) {
          throw e;
        } catch (InterruptedException e) {
          throw e;
        } finally {
          if (output != null) {
            output.close();
          }
        }
      }
    } // end class Reducer
  }
  
}

 

tf-idf hadoop map reduce

标签:blog   io   ar   os   sp   java   for   文件   on   

原文地址:http://www.cnblogs.com/deyinx/p/4129281.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!