码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop 单表多表关联

时间:2016-05-13 01:58:43      阅读:152      评论:0      收藏:0      [点我收藏+]

标签:

单表:

package org.bigdata.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.bigdata.util.WordCountMapReduce.TextDescComparator;
import org.bigdata.util.WordCountMapReduce.WordCountCombiner;
import org.bigdata.util.WordCountMapReduce.WordCountMapper;
import org.bigdata.util.WordCountMapReduce.WordCountReducer;

/**
 * 单表关联
 * 
 * @author wwhhf
 * 
 */
public class SingleJoinMapReduce {

    /**
     * a->b b->c a->c
     * 
     * @author wwhhf
     * 
     */
    public static class SingleJoinMapper extends
            Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String terms[] = value.toString().split(" ");
            // 正常顺序
            context.write(new Text(terms[0]), new Text(terms[1] + ":1"));
            // 颠倒顺序
            context.write(new Text(terms[1]), new Text(terms[0] + ":2"));
        }

    }

    public static class SingleJoinReducer extends
            Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<String> lefts = new ArrayList<>();
            List<String> rights = new ArrayList<>();
            for (Text value : values) {
                String terms[] = value.toString().split(":");
                if ("1".equals(terms[1])) {
                    lefts.add(terms[0]);
                } else {
                    rights.add(terms[0]);
                }
            }
            for (String left : lefts) {
                for (String right : rights) {
                    context.write(new Text(left), new Text(right));
                }
            }
        }

    }

    public static void main(String[] args) {
        try {
            Configuration cfg = HadoopCfg.getConfiguration();
            Job job = Job.getInstance(cfg);
            job.setJobName("SingleJoin");
            job.setJarByClass(SingleJoinMapReduce.class);

            // mapper
            job.setMapperClass(SingleJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // reducer
            job.setReducerClass(SingleJoinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setSortComparatorClass(TextDescComparator.class);

            FileInputFormat.addInputPath(job, new Path("/single"));
            FileOutputFormat.setOutputPath(job, new Path("/single_out/"));

            System.exit(job.waitForCompletion(true) ? 0 : 1);

        } catch (IllegalStateException | IllegalArgumentException
                | ClassNotFoundException | IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

}

多表:

package org.bigdata.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 多表关联
 * 
 * @author wwhhf
 * 
 */
public class MultiJoinMapReduce {

    public static class MultiJoinMapper extends
            Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().getName();

            String record = value.toString();
            if (fileName.startsWith("b")) {
                // table 2
                // 1 Beijing
                Pattern pattern = Pattern
                        .compile("((\\d+)\\s([\\w\\W\\s\\S]+))");
                Matcher matcher = pattern.matcher(record);
                String ckey = null;
                String cvalue = null;
                while (matcher.find()) {
                    ckey = matcher.group(2);
                    cvalue = matcher.group(3);
                }
                context.write(new Text(ckey), new Text(cvalue + ":2"));
            } else {
                // table 1
                // 1 Beijing
                Pattern pattern = Pattern
                        .compile("(([\\w\\W\\s\\S]+)\\s(\\d+))");
                Matcher matcher = pattern.matcher(record);
                String ckey = null;
                String cvalue = null;
                while (matcher.find()) {
                    cvalue = matcher.group(2);
                    ckey = matcher.group(3);
                }
                context.write(new Text(ckey), new Text(cvalue + ":1"));
            }
        }
    }

    public static class MultiJoinReducer extends
            Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<String> lefts = new ArrayList<>();
            List<String> rights = new ArrayList<>();
            for (Text value : values) {
                String terms[] = value.toString().split(":");
                if ("1".equals(terms[1])) {
                    lefts.add(terms[0]);
                } else {
                    rights.add(terms[0]);
                }
            }
            for (String left : lefts) {
                for (String right : rights) {
                    context.write(new Text(left), new Text(right));
                }
            }
        }

    }

    public static void main(String[] args) {
        try {
            Configuration cfg = HadoopCfg.getConfiguration();
            Job job = Job.getInstance(cfg);
            job.setJobName("MultiJoin");
            job.setJarByClass(MultiJoinMapReduce.class);

            // mapper
            job.setMapperClass(MultiJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // reducer
            job.setReducerClass(MultiJoinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            FileInputFormat.addInputPath(job, new Path("/multi"));
            FileOutputFormat.setOutputPath(job, new Path("/multi_out/"));

            System.exit(job.waitForCompletion(true) ? 0 : 1);

        } catch (IllegalStateException | IllegalArgumentException
                | ClassNotFoundException | IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

}

Hadoop 单表多表关联

标签:

原文地址:http://blog.csdn.net/qq_17612199/article/details/51344839

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!