码迷,mamicode.com
首页 > 其他好文 > 详细

mapreduce 高级案例倒排索引

时间:2018-04-11 10:43:48      阅读:175      评论:0      收藏:0      [点我收藏+]

标签:大数据   hadoop   mapreduce   倒排索引   

  • 理解【倒排索引】的功能
  • 熟悉mapreduce 中的combine 功能
  • 根据需求编码实现【倒排索引】的功能,旨在理解mapreduce 的功能。

一:理解【倒排索引】的功能

1.1 倒排索引:

    由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引
    简单来说根据单词,返回它在哪个文件中出现过,而且频率是多少的结果。例如:就像百度里的搜索,你输入一个关键字,那么百度引擎就迅速的在它的服务器里找到有该关键字的文件,并根据频率和其他一些策略(如页面点击投票率)等来给你返回结果

二:熟悉mapreduce 中的combine 功能

2.1 mapreduce的combine 功能

   1 Map过程:Map过程首先分析输入的<key,value>对,得到索引中需要的信息:单词,文档URI 和词频。key:单词和URI.value:出现同样单词的次数。

2 Combine过程:经过map方法处理后,Combine过程将key值相同的value值累加,得到一个单词在文档中的词频。

3 Reduce过程:经过上述的俩个过程后,Reduce过程只需要将相同的key值的value值组合成倒排引索文件的格式即可,其余的事情直接交给MapReduce框架进行处理

技术分享图片

三:根据需求编码实现【倒排索引】的功能,旨在理解mapreduce 的功能。

3.1 Java的编程代码

InvertedIndexMapReduce.java

package org.apache.hadoop.studyhadoop.index;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 
 * @author zhangyy
 * 
 */
public class InvertedIndexMapReduce extends Configured implements Tool {
    // step 1 : mapper
    /**
     * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     */
    public static class WordCountMapper extends //
            Mapper<LongWritable, Text, Text, Text> {

        private Text mapOutputKey = new Text();
        private Text mapOutputValue = new Text("1");

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // split1
            String[] lines = value.toString().split("##");
            // get url
            String url = lines[0];

            // split2
            String[] strs = lines[1].split(" ");

            for (String str : strs) {
                mapOutputKey.set(str + "," + url);
                context.write(mapOutputKey, mapOutputValue);
            }

        }
    }

    // set combiner class
    public static class InvertedIndexCombiner extends //
            Reducer<Text, Text, Text, Text> {

        private Text CombinerOutputKey = new Text();
        private Text CombinerOutputValue = new Text();

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {

            // split
            String[] strs = key.toString().split(",");

            // set key
            CombinerOutputKey.set(strs[0] + "\n");

            // set value
            int sum = 0;
            for (Text value : values) {
                sum += Integer.valueOf(value.toString());
            }

            CombinerOutputValue.set(strs[1] + ":" + sum);

            context.write(CombinerOutputKey, CombinerOutputValue);

        }
    }

    // step 2 : reducer
    public static class WordCountReducer extends //
            Reducer<Text, Text, Text, Text> {

        private Text outputValue = new Text();

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // TODO

            String result = new String();

            for (Text value : values) {
                result += value.toString() + "\t";
            }

            outputValue.set(result);

            context.write(key, outputValue);
        }
    }

    // step 3 : job

    public int run(String[] args) throws Exception {

        // 1 : get configuration
        Configuration configuration = super.getConf();

        // 2 : create job
        Job job = Job.getInstance(//
                configuration,//
                this.getClass().getSimpleName());
        job.setJarByClass(InvertedIndexMapReduce.class);

        // job.setNumReduceTasks(tasks);

        // 3 : set job
        // input --> map --> reduce --> output
        // 3.1 : input
        Path inPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inPath);

        // 3.2 : mapper
        job.setMapperClass(WordCountMapper.class);
        // TODO
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // ====================shuffle==========================
        // 1: partition
        // job.setPartitionerClass(cls);
        // 2: sort
        // job.setSortComparatorClass(cls);
        // 3: combine
        job.setCombinerClass(InvertedIndexCombiner.class);
        // 4: compress
        // set by configuration
        // 5 : group
        // job.setGroupingComparatorClass(cls);

        // ====================shuffle==========================

        // 3.3 : reducer
        job.setReducerClass(WordCountReducer.class);
        // TODO
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 3.4 : output
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);

        // 4 : submit job
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {

        args = new String[] {
                "hdfs://namenode01.hadoop.com:8020/input/index.txt",
                "hdfs://namenode01.hadoop.com:8020/outputindex/" 
                };

        // get configuration
        Configuration configuration = new Configuration();

        // configuration.set(name, value);

        // run job
        int status = ToolRunner.run(//
                configuration,//
                new InvertedIndexMapReduce(),//
                args);

        // exit program
        System.exit(status);
    }

}

3.2 运行案例测试

上传文件:
hdfs dfs -put index.txt /input 

代码运行结果:

技术分享图片
技术分享图片

输出结果:

技术分享图片

mapreduce 高级案例倒排索引

标签:大数据   hadoop   mapreduce   倒排索引   

原文地址:http://blog.51cto.com/flyfish225/2096764

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!