码迷,mamicode.com
首页 > 其他好文 > 详细

TopN案例

时间:2019-04-06 20:24:55      阅读:159      评论:0      收藏:0      [点我收藏+]

标签:数组越界   output   main   word   数组   stat   cep   key   dex   

准备三份数据

t1 2067
t2 2055
t3 2055
t4 1200
t5 2367
t6 255
t7 2555
t8 12100
t9 20647
t10 245
t11 205
t12 100
t111 1067
t112 2155
t113 2065
t114 1290
t115 237
t116 25
t117 15
t118 1
t119 10647
t110 2995
t111 2057
t112 10044
t211 67
t212 55
t213 65
t214 90
t215 37
t216 425
t217 155
t218 189
t219 1047
t210 295
t211 27
t212 144

定义Mapper类

package com.hadoop.TopN;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.TreeMap;

public class TopMapper extends Mapper<Object, Text, NullWritable, Text> {
    private TreeMap<Integer, Text> map = new TreeMap<>();

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split(" ");
        String number = words[1];
        map.put(Integer.parseInt(number), new Text(value)); //此处必须new Text,不然数组越界,大坑!
        if (map.size() > 10) {
            map.remove(map.firstKey());
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        for (Text text : map.values()) {
            context.write(NullWritable.get(),text);
        }
    }
}

定义Reducer类

package com.hadoop.TopN;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.TreeMap;

public class TopReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
    private TreeMap<Integer, Text> map = new TreeMap<>();

    @Override
    protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            String[] strs = value.toString().split(" ");
            map.put(Integer.parseInt(strs[1]),new Text(value));
            if (map.size() >10){
                map.remove(map.firstKey());
            }
        }
        for (Text text:map.values()){
            context.write(NullWritable.get(),text);
        }
    }
}

编写Driver类

package com.hadoop.TopN;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TopDriver {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(TopDriver.class);
        job.setMapperClass(TopMapper.class);
        job.setReducerClass(TopReducer.class);
        job.setNumReduceTasks(1);   //重点
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(job,new Path("input"));
        FileOutputFormat.setOutputPath(job,new Path("output/topn"));
        job.waitForCompletion(true);
    }
}

输出结果part-r-00000

t113 2065
t1 2067
t112 2155
t5 2367
t7 2555
t110 2995
t112 10044
t119 10647
t8 12100
t9 20647

TopN案例

标签:数组越界   output   main   word   数组   stat   cep   key   dex   

原文地址:https://www.cnblogs.com/JZTX123/p/10662729.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!