标签:
package org.bigdata.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.bigdata.util.WordCountMapReduce.TextDescComparator;
import org.bigdata.util.WordCountMapReduce.WordCountCombiner;
import org.bigdata.util.WordCountMapReduce.WordCountMapper;
import org.bigdata.util.WordCountMapReduce.WordCountReducer;
/**
 * 单表关联
 * 
 * @author wwhhf
 * 
 */
public class SingleJoinMapReduce {
    /**
     * a->b b->c a->c
     * 
     * @author wwhhf
     * 
     */
    public static class SingleJoinMapper extends
            Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String terms[] = value.toString().split(" ");
            // 正常顺序
            context.write(new Text(terms[0]), new Text(terms[1] + ":1"));
            // 颠倒顺序
            context.write(new Text(terms[1]), new Text(terms[0] + ":2"));
        }
    }
    public static class SingleJoinReducer extends
            Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<String> lefts = new ArrayList<>();
            List<String> rights = new ArrayList<>();
            for (Text value : values) {
                String terms[] = value.toString().split(":");
                if ("1".equals(terms[1])) {
                    lefts.add(terms[0]);
                } else {
                    rights.add(terms[0]);
                }
            }
            for (String left : lefts) {
                for (String right : rights) {
                    context.write(new Text(left), new Text(right));
                }
            }
        }
    }
    public static void main(String[] args) {
        try {
            Configuration cfg = HadoopCfg.getConfiguration();
            Job job = Job.getInstance(cfg);
            job.setJobName("SingleJoin");
            job.setJarByClass(SingleJoinMapReduce.class);
            // mapper
            job.setMapperClass(SingleJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            // reducer
            job.setReducerClass(SingleJoinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.setSortComparatorClass(TextDescComparator.class);
            FileInputFormat.addInputPath(job, new Path("/single"));
            FileOutputFormat.setOutputPath(job, new Path("/single_out/"));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (IllegalStateException | IllegalArgumentException
                | ClassNotFoundException | IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
package org.bigdata.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 多表关联
 * 
 * @author wwhhf
 * 
 */
public class MultiJoinMapReduce {
    public static class MultiJoinMapper extends
            Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            String record = value.toString();
            if (fileName.startsWith("b")) {
                // table 2
                // 1 Beijing
                Pattern pattern = Pattern
                        .compile("((\\d+)\\s([\\w\\W\\s\\S]+))");
                Matcher matcher = pattern.matcher(record);
                String ckey = null;
                String cvalue = null;
                while (matcher.find()) {
                    ckey = matcher.group(2);
                    cvalue = matcher.group(3);
                }
                context.write(new Text(ckey), new Text(cvalue + ":2"));
            } else {
                // table 1
                // 1 Beijing
                Pattern pattern = Pattern
                        .compile("(([\\w\\W\\s\\S]+)\\s(\\d+))");
                Matcher matcher = pattern.matcher(record);
                String ckey = null;
                String cvalue = null;
                while (matcher.find()) {
                    cvalue = matcher.group(2);
                    ckey = matcher.group(3);
                }
                context.write(new Text(ckey), new Text(cvalue + ":1"));
            }
        }
    }
    public static class MultiJoinReducer extends
            Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<String> lefts = new ArrayList<>();
            List<String> rights = new ArrayList<>();
            for (Text value : values) {
                String terms[] = value.toString().split(":");
                if ("1".equals(terms[1])) {
                    lefts.add(terms[0]);
                } else {
                    rights.add(terms[0]);
                }
            }
            for (String left : lefts) {
                for (String right : rights) {
                    context.write(new Text(left), new Text(right));
                }
            }
        }
    }
    public static void main(String[] args) {
        try {
            Configuration cfg = HadoopCfg.getConfiguration();
            Job job = Job.getInstance(cfg);
            job.setJobName("MultiJoin");
            job.setJarByClass(MultiJoinMapReduce.class);
            // mapper
            job.setMapperClass(MultiJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            // reducer
            job.setReducerClass(MultiJoinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path("/multi"));
            FileOutputFormat.setOutputPath(job, new Path("/multi_out/"));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (IllegalStateException | IllegalArgumentException
                | ClassNotFoundException | IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
标签:
原文地址:http://blog.csdn.net/qq_17612199/article/details/51344839