码迷,mamicode.com
首页 > 其他好文 > 详细

矩阵乘法的Mapreduce实现过程

时间:2015-04-17 11:35:48      阅读:214      评论:0      收藏:0      [点我收藏+]

标签:经验   算法   mapreduce   矩阵乘法   

MapReduce实现矩阵的乘法

在学习pageRank算法时看到这么一个小小的编程应用。并且一直自诩只要有原理就能写出代码(只是时间问题),矩阵乘法的原理很简单,基本上理工科生(只要学过线性代数或者相关课程)都知道。但是从来没有想过通过并行计算的方式来完成矩阵乘法。

这里矩阵的知识就不啰嗦了,矩阵的乘积记作为P=M*N。则P中的元素

技术分享

         简单粗暴的讲,就是左矩阵M的行依次与右矩阵的列元素对应相乘,然后再相加。

         可能说到mapreduce算法可能都会想到map用来整理数据(这里指的就是矩阵中的元素),然后送至reduce中计算。说起来是挺简单的,但是从无到有的过程还是不容易的。关键是key值的设计,可以说一个key的设计是编程成功的一半。

         下面详细的道来设计过程。

技术分享

相信大家都关注到了<k,v>对里面的M: 和N:字样。这个得郑重声明一下,其实不用也可以,但是value值这样设置后很容易维护和调试代码(具体的可以参看以下陆嘉恒的《hadoop实战》关于单表关联的章节),因为能清晰看出元素的出处。必须要说明的是M:和N:后面的数字特别重要,M:y的y表示元素对应的列号;N:x的x表示元素对应的行号。这样说明之后是不是思路就清晰了,把这样的<k,v>上传给reduce,reduce收到的是〈k, list(v)〉,源代码的System.out.print("tuple==" + MN[0] + ":" + MN[1] +"\t");就能看得明白map的内容。List(v)就是如下的样子。

技术分享

到了,这里map过程就结束了,下面就是reduce的过程。刚才说了那个x/y很重要,现在就体现出来了,起到标志位的作用。在一个list里面标志位相同的元素相乘就是,再相加就是对应结果。是不是小有成就感。

下面来简单的谈谈MR程序的实现。MR框架还是比较简单,相信初上手的时候都会拿wordcount程序入门,下面就是就是从hadoop自带的wordcount里面抽取出来的MR框架。

publicclass wordCount {

         publicstaticclass mulMapperextends Mapper<Object, Text, Text, Text> {

 

                   publicvoid map(Object key, Text value, Context context)

                                     throws IOException, InterruptedException {

                  

                  }

         }

 

         publicstaticclass mulReducerextends Reducer<Text, Text, Text, Text> {

 

                   publicvoid reduce(Text key, Iterable<Text> values, Context context)

                                     throws IOException, InterruptedException {

         …

                  }

         }

 

         publicstaticvoid main(String[] args)throws Exception {

                   Configuration conf = new Configuration();

                   String[] otherArgs = new GenericOptionsParser(conf, args)

                                     .getRemainingArgs();

                   if (otherArgs.length != 2) {

                            System.err.println("Usage: wordcount <in> <out>");

                            System.exit(2);

                   }

                   //对重复出现的目录处理

                   FileSystem fs = FileSystem.get(URI.create(otherArgs[1]), conf);

                   Path inpath = new Path(otherArgs[0]);

                   Path outpath = new Path(otherArgs[1]);

                   Job job = new Job(conf,"word count");

                   job.setJarByClass(Multi.class);

                   job.setMapperClass(mulMapper.class);

                   // job.setCombinerClass(mulReducer.class);

                   job.setReducerClass(mulReducer.class);

                   job.setOutputKeyClass(Text.class);

                   job.setOutputValueClass(Text.class);

                   FileInputFormat.addInputPath(job, inpath);

                   FileOutputFormat.setOutputPath(job, outpath);

                   System.exit(job.waitForCompletion(true) ? 0 : 1);

         }

}

 

具体的编程就不讲了,可以根据自己的理解去实现。如果感觉有困难,参考文献后面给出来完整的MR程序代码。

 

祝你好运!!

 

 

参考文献

[1]. 黄宜华, 苗凯翔. 深入理解大数据---大数据处理与编程实践[M].北京:机械工业出版社. 2014

[2]. 陆嘉恒. Hadoop实战[M]. 北京:机械工业出版社. 2012

 


以下是完整的MR代码。

 

package org.apache.test;

 

import java.io.IOException;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

publicclass Multi {

 

         publicstaticclass mulMapperextends Mapper<Object, Text, Text, Text> {

                   private Textmap_key = new Text();

                   private Textmap_value = new Text();

                   privateintline1 = 0;

                   // m*n

 

                   publicvoid map(Object key, Text value, Context context)

                                     throws IOException, InterruptedException {

                            int rowM = 2;// 左矩阵的行数

                            int colM = 2;// 左矩阵的列数

                            introwN = 2; // 右矩阵德行数

                            int colN = 3;// 右矩阵的列数

                            // System.out.println("再执行一次");

                            FileSplit filesplit = (FileSplit) context.getInputSplit();

                            String fileName = filesplit.getPath().getName();

                            // System.out.println(fileName);

                            if (fileName.contains("n")) {

                                     String[] line = value.toString().split(" ");

                                     for (int j = 0; j < rowM; j++) {

                                              for (int i = 0; i < colN; i++) {

                                                       int val = Integer.parseInt(line[i]);

                                                       map_key.set(j +"," + i);

                                                       map_value.set("N" +":" + line1 + "," + val);

                                                       System.out.println("N:" +map_key + "\t" + map_value);

                                                       context.write(map_key,map_value);

                                              }

                                     }

                            } elseif (fileName.contains("m")) {

                                     String[] line = value.toString().split(" ");

                                     for (int i = 0; i < colN; i++) {

                                              for (int j = 0; j < colM; j++) {

                                                       int val = Integer.parseInt(line[j]);

                                                       map_key.set(line1 +"," + i);

                                                       map_value.set("M" +":" + j + "," + val);

                                                       System.out.println("M:" +map_key + "\t" + map_value);

                                                       context.write(map_key,map_value);

                                              }

                                     }

                            }

                            line1++;

                   }

         }

 

         publicstaticclass mulReducerextends Reducer<Text, Text, Text, Text> {

 

                   publicvoid reduce(Text key, Iterable<Text> values, Context context)

                                     throws IOException, InterruptedException {

                            finalint[] M =newint[2];

                            finalint[] N =newint[2];

                            int sum = 0;

                            System.out.println("key值小到大输出:");

                            for (Text val : values) {

                                     String[] MN = val.toString().split(":");

                                     String[] except_value = MN[1].toString().split(",");

                                     System.out.print("tuple==" + MN[0] +":" + MN[1] +"\t");

//                                   System.out.println(MN[1]);

                                     if (MN[0].equals("M"))

                                              M[Integer.parseInt(except_value[0])] = Integer

                                                                .parseInt(except_value[1]);

                                     else

                                              N[Integer.parseInt(except_value[0])] = Integer

                                                                .parseInt(except_value[1]);

                            }

                            for (int i = 0; i < M.length; i++) {

                                     sum += M[i] * N[i];

                            }

                            System.out.print("sum=" + sum);

                            System.out.println();

//                         context.write(key, new Text(Integer.toString(sum)));

                            context.write(new Text(),new Text("\t"+Integer.toString(sum)));

                           

                   }

         }

 

         publicstaticvoid main(String[] args)throws Exception {

                   Configuration conf =new Configuration();

                   String[] otherArgs =new GenericOptionsParser(conf, args)

                                     .getRemainingArgs();

                   if (otherArgs.length != 2) {

                            System.err.println("Usage: matrix <in> <out>");

                            System.exit(2);

                   }

                   //对重复出现的目录处理

                   FileSystem fs = FileSystem.get(URI.create(otherArgs[1]), conf);

                   Path inpath =new Path(otherArgs[0]);

                   Path outpath =new Path(otherArgs[1]);

                   if (fs.exists(outpath)) {

                            System.out.println(outpath);

                            System.out.println("输出目录已经存在,现在删除...重建目录");

                            fs.delete(outpath,true);

                   }

                   Job job = new Job(conf,"multiplication");

                   job.setJarByClass(Multi.class);

                   job.setMapperClass(mulMapper.class);

                   // job.setCombinerClass(mulReducer.class);

                   job.setReducerClass(mulReducer.class);

                   job.setOutputKeyClass(Text.class);

                   job.setOutputValueClass(Text.class);

                   FileInputFormat.addInputPath(job, inpath);

                   FileOutputFormat.setOutputPath(job, outpath);

                   System.exit(job.waitForCompletion(true) ? 0 : 1);

         }

}

 

 

 

矩阵乘法的Mapreduce实现过程

标签:经验   算法   mapreduce   矩阵乘法   

原文地址:http://blog.csdn.net/yxb3158/article/details/45083871

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!