码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce实现等值连接,左外连接,右外连接,全外连接

时间:2015-12-15 14:06:44      阅读:167      评论:0      收藏:0      [点我收藏+]

标签:

#测试数据:

# more user.txt(用户ID,用户名)

[java] view plaincopy
1   lavimer  
2   liaozhongmin  
3   liaozemin  
#more post.txt(用户ID,帖子ID,标题)
[java] view plaincopy
1   1   java  
1   2   c  
2   3   hadoop  
4   4   hive  
5   5   hbase  
5   6   pig  
5   7   flume  

#等值连接结果如下:
[java] view plaincopy
1   lavimer 1   1   java  
1   lavimer 1   2   c  
2   liaozhongmin    2   3   hadoop  

#左外连接结果如下:
[java] view plaincopy
1   lavimer 1   1   java  
1   lavimer 1   2   c  
2   liaozhongmin    2   3   hadoop  
3   liaozemin   NULL  

#右外连接结果如下:
[java] view plaincopy
1   lavimer 1   1   java  
1   lavimer 1   2   c  
2   liaozhongmin    2   3   hadoop  
NULL    4   4   hive  
NULL    5   5   hbase  
NULL    5   6   pig  
NULL    5   7   flume  

#全外连接结果如下:
[java] view plaincopy
1   lavimer 1   1   java  
1   lavimer 1   2   c  
2   liaozhongmin    2   3   hadoop  
3   liaozemin   NULL  
NULL    4   4   hive  
NULL    5   5   hbase  
NULL    5   6   pig  
NULL    5   7   flume  

实现代码如下:
[java] view plaincopy
/** 
 *  
 * @author 廖钟民 
 * time : 2015年1月30日下午1:23:36 
 * @version 
 */  
public class UserPostJoin {  
    // 定义输入路径  
    private static final String INPUT_PATH1 = "hdfs://liaozhongmin:9000/user_post_join/user.txt";  
    private static final String INPUT_PATH2 = "hdfs://liaozhongmin:9000/user_post_join/post.txt";  
    // 定义输出路径  
    private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";  
  
    public static void main(String[] args) {  
  
        try {  
            // 创建配置信息  
            Configuration conf = new Configuration();  
  
            // 创建文件系统  
            FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
            // 如果输出目录存在,我们就删除  
            if (fileSystem.exists(new Path(OUT_PATH))) {  
                fileSystem.delete(new Path(OUT_PATH), true);  
            }  
  
            // 创建任务  
            Job job = new Job(conf, UserPostJoin.class.getName());  
  
            // 设置连接类型  
            job.getConfiguration().set("joinType", "allOuter");  
            // 设置多路径输入  
            MultipleInputs.addInputPath(job, new Path(INPUT_PATH1), TextInputFormat.class, UserMapper.class);  
            MultipleInputs.addInputPath(job, new Path(INPUT_PATH2), TextInputFormat.class, PostMapper.class);  
  
            //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型  
            job.setMapOutputKeyClass(Text.class);  
            job.setMapOutputValueClass(UserPost.class);  
  
            //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
            job.setPartitionerClass(HashPartitioner.class);  
            job.setNumReduceTasks(1);  
  
            //1.4 排序  
            //1.5 归约  
            //2.1 Shuffle把数据从Map端拷贝到Reduce端。  
            //2.2 指定Reducer类和输出key和value的类型  
            job.setReducerClass(UserPostReducer.class);  
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(Text.class);  
  
            //2.3 指定输出的路径和设置输出的格式化类  
            FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
            job.setOutputFormatClass(TextOutputFormat.class);  
  
            // 提交作业 退出  
            System.exit(job.waitForCompletion(true) ? 0 : 1);  
  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    /** 
     * 自定义Mapper类用于处理来自user.txt文件的数据 
     * @author 廖钟民 
     * time : 2015年1月30日下午1:22:12 
     * @version 
     */  
    public static class UserMapper extends Mapper<LongWritable, Text, Text, UserPost> {  
        @Override  
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, UserPost>.Context context) throws IOException, InterruptedException {  
            // 对字符串进行切分  
            String[] arr = value.toString().split("\t");  
            // 创建UserId  
            Text userId = new Text(arr[0]);  
            // 把结果写出去  
            context.write(userId, new UserPost("U", value.toString()));  
        }  
    }  
    /** 
     * 自定义Mapper类用于处理来自post.txt文件的数据 
     * @author 廖钟民 
     * time : 2015年1月30日下午1:22:16 
     * @version 
     */  
    public static class PostMapper extends Mapper<LongWritable, Text, Text, UserPost> {  
        @Override  
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, UserPost>.Context context) throws IOException, InterruptedException {  
            // 对数据进行切分  
            String[] arr = value.toString().split("\t");  
            // 创建用户ID  
            Text userId = new Text(arr[0]);  
            context.write(userId, new UserPost("P", value.toString()));  
  
        }  
    }  
    /** 
     * 自定义Reducer类用于处理不同Mapper类的输出 
     * @author 廖钟民 
     * time : 2015年1月30日下午1:23:05 
     * @version 
     */  
    public static class UserPostReducer extends Reducer<Text, UserPost, Text, Text> {  
        // 定义List集合用于存放用户  
        private List<Text> users = new ArrayList<Text>();  
        private List<Text> posts = new ArrayList<Text>();  
  
        // 定义连接类型  
        private String joinType;  
  
        @Override  
        protected void setup(Reducer<Text, UserPost, Text, Text>.Context context) throws IOException, InterruptedException {  
  
            this.joinType = context.getConfiguration().get("joinType");  
  
            System.out.println(joinType);  
  
        }  
        /** 
         * 经过Shuffle后数据会分组,每一组数据都会调用一次reduce()函数 
         *第一组数据: 
         *1 lavimer 
         *1 1   java 
         *1 2   c 
         * 
         *第二组数据: 
         *2 3   hadoop 
         *2 liaozhongmin 
         * 
         *第三组数据: 
         *3 liaozemin 
         * 
         *第四组数据: 
         *4 4   hive 
         * 
         *第五组数据: 
         *5 5   hbase 
         *5 6   pig 
         *5 7   flume 
         * 
         *每一组数据都会调用一次reduce()函数,我们以第一组数据为例进行讲解: 
         * 
         *进入reduce函数后,<1,lavimer>会被添加到users集合中  
         *<1 1   java>和<1  2   c>会被添加到posts集合中 
         * 
         *然后是判断当前操作是什么类型的连接,我们以等值连接为例: 
         *遍历两个集合得到的数据为: 
         *【1    lavimer    1         1    java】 
         *【1    lavimer    1         2    c】 
         * 
         *这是第一组数据的执行轨迹,其他依次类推就可以得到相关的操作 
         */  
        protected void reduce(Text key, Iterable<UserPost> values, Reducer<Text, UserPost, Text, Text>.Context context) throws IOException,  
                InterruptedException {  
            // 清空集合  
            users.clear();  
            posts.clear();  
  
            // 迭代values集合把当前穿进来的某个组中的数据分别添加到对应的集合中  
            for (UserPost val : values) {  
                System.out.println("实际值:" + key + "===>" + values.toString());  
                if (val.getType().equals("U")) {  
                    users.add(new Text(val.getData()));  
                } else {  
                    posts.add(new Text(val.getData()));  
                }  
            }  
  
            // 根据joinType关键字做对应的连接操作  
            if (joinType.equals("innerJoin")) {// 内连接  
                if (users.size() > 0 && posts.size() > 0) {  
  
                    for (Text user : users) {  
                        for (Text post : posts) {  
                            context.write(new Text(user), new Text(post));  
                        }  
                    }  
                }  
            } else if (joinType.equals("leftOuter")) {// 左外连接  
  
                for (Text user : users) {  
                    if (posts.size() > 0) {  
                        for (Text post : posts) {  
                            context.write(new Text(user), new Text(post));  
                        }  
                    } else {  
                        context.write(new Text(user), createEmptyPost());  
                    }  
                }  
  
            } else if (joinType.equals("rightOuter")) {// 右外连接  
                for (Text post : posts) {  
                    if (users.size() > 0) {  
                        for (Text user : users) {  
                            context.write(new Text(user), new Text(post));  
                        }  
                    } else {  
                        context.write(createEmptyUser(), post);  
                    }  
                }  
            } else if (joinType.equals("allOuter")) {// 全外连接  
                if (users.size() > 0) {  
                    for (Text user : users) {  
                        if (posts.size() > 0) {  
                            for (Text post : posts) {  
                                context.write(new Text(user), new Text(post));  
                            }  
                        } else {  
                            context.write(new Text(user), createEmptyPost());  
                        }  
                    }  
                } else {  
                    for (Text post : posts) {  
                        if (users.size() > 0) {  
                            for (Text user : users) {  
                                context.write(new Text(user), new Text(post));  
                            }  
                        } else {  
                            context.write(createEmptyUser(), post);  
                        }  
                    }  
                }  
            }  
  
        }  
  
        /** 
         * 用户为空时用制表符代替 
         *  
         * @return 
         */  
        private Text createEmptyUser() {  
            return new Text("NULL");  
        }  
  
        /** 
         * 帖子为空时用制表符代替 
         *  
         * @return 
         */  
        private Text createEmptyPost() {  
            return new Text("NULL");  
        }  
    }  
}  
/** 
 * 自定义实体类封装两个表的数据 
 * @author 廖钟民 
 * time : 2015年1月30日下午1:23:50 
 * @version 
 */  
class UserPost implements Writable {  
  
    // 类型(U表示用户,P表示帖子)  
    private String type;  
    private String data;  
  
    public UserPost() {  
    }  
  
    public UserPost(String type, String data) {  
        this.type = type;  
        this.data = data;  
    }  
  
    public String getType() {  
        return type;  
    }  
  
    public void setType(String type) {  
        this.type = type;  
    }  
  
    public String getData() {  
        return data;  
    }  
  
    public void setData(String data) {  
        this.data = data;  
    }  
  
    public void write(DataOutput out) throws IOException {  
        out.writeUTF(this.type);  
        out.writeUTF(this.data);  
  
    }  
  
    public void readFields(DataInput in) throws IOException {  
        this.type = in.readUTF();  
        this.data = in.readUTF();  
    }  
  
}  

 

MapReduce实现等值连接,左外连接,右外连接,全外连接

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5047807.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!