标签:hadoop mapreduce pagerank 分布式缓存 算法
pagerank是一种不容易被欺骗的计算Web网页重要性的工具,pagerank是一个函数,它对Web中(或者至少是抓取并发现其中连接关系的一部分web网页)的每个网页赋予一个实数值。他的意图在于,网页 的pagerank越高,那么它就越重要。并不存在一个固定的pagerank分配算法。
对于pagerank算法的推到我在这里不想做过多的解释,有兴趣的可以自己查看资料看看,这里我直接给出某个网页pagerank的求解公式:
P(n)=a/G+(1-a)*求和(P(m)/C(m)) (m属于L(n))
其中:G 为网页的数量,P(n)为页面n的pagerank值,C(m)为页面m含有的连接数量,a为随机跳转因子,其中求和符号不能打印,我直接使用文字给出,L(n)表示存在到页面n链接的页面的集合。
下面给出pagerank的MapReduce实现,其中输入文件必须要求的格式为:
输入文件 pagerank.txt:
页面id 初始pagerank值;{对于页面n,n所包含的链接所指向的页面id集合(即出链集合)};{对于页面n,包含页面n链接的页面id集合(即入链集合)};包含链接个数
注意:这中间一定是分号分隔
1
0.2;{2,4};{5};2
2 0.2;{3,5};{1,5};2
3 0.2;{4};{2,5};1
4 0.2;{5};{1,3};1
5 0.2;{1,2,3};{2,4};3
分布式缓存文件 rankCache.txt
rank 页面id:页面pagerank值,页面id:页面pagerank值,页面id:页面pagerank值....
rank 1:0.2,2:0.2,3:0.2,4:0.2,5:0.2
介绍完了两个输入文件,下面是pagerank算法的MapReduce实现:当然输出路径是自己设置
package soft.project; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Vector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRank { private final static String localInputPath = "/home/hadoop/test/mapReduce/pagerank.txt"; // private final static String hdfsInputPath = "hdfs:/192.168.0.1:9000/user/hadoop/pagerank"; private final static String localOutputPath = "/home/hadoop/test/mapReduce/pagerank"; private final static String hdfsOutputPath = "hdfs:/192.168.0.1:9000/user/hadoop/pagerank"; private final static String rankCachePath="/home/hadoop/test/mapReduce/rankCache.txt"; private static List<RankResult> pagerankList=new Vector<RankResult>(); private final static double random = 0.85; //随机跳转因子 private final static double stopFactor=0.001; //上一次和这次每个网页pagerank差值的绝对值之和小于stopFactor则停止迭代 private final static long G = 5; //初始网页数量 private static class RankResult{ private String order=""; private double rank=0; @SuppressWarnings("unused") public RankResult() {} public RankResult(String order,double rank){ this.order=order; this.rank=rank; } } private static class PRMapper extends Mapper<LongWritable, Text, Text, Text> { private String keyinfo = ""; private String valueinfo = ""; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split(";"); String outLink[] = split[1].split("[{}]")[1].split(","); double pagerank = Double.parseDouble(split[0].split("\\s")[1]); double c = Double.parseDouble(split[3]); double k = pagerank / c; /*System.out.println("page:" + split[0].split("\\s")[0] + "pagerank:" + pagerank + " c:" + c);*/ for (String page : outLink) { context.write(new Text(page), new Text(String.valueOf(k))); // System.out.println("page:" + page + " ragerank:" + k); } writeNode(value, context); } private void writeNode(Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String split[] = value.toString().split("\\s"); valueinfo = split[1].split(";", 2)[1]; keyinfo = split[0]; context.write(new Text(keyinfo), new Text(valueinfo)); /*System.out.println("keyinfo:" + keyinfo + " valueinfo:" + valueinfo);*/ } } private static class PRCombiner extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String v = ""; double pagerank = 0; for (Text text : value) { String valueString = text.toString(); if (valueString.contains("{")) { v = valueString; } else { pagerank += Double.parseDouble(valueString); } } if (v.equals("")) { context.write(key, new Text(String.valueOf(pagerank))); } else { String s = pagerank + ";" + v; context.write(key, new Text(s)); } } } private static class PRReducer extends Reducer<Text, Text, Text, Text> { private List<Double> rankList=new Vector<Double>((int)G); //是否每个job都是重新创建一个rankList和rankMap??? private Hashtable<Integer, Double> rankMap=new Hashtable<Integer, Double>(); @Override protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { Configuration conf=context.getConfiguration(); int order=Integer.parseInt(conf.get("order")); System.out.println(".................... invoke setup()................."); Path cachePath[]=DistributedCache.getLocalCacheFiles(conf); if(cachePath==null || cachePath.length>0){ for(Path p:cachePath){ System.out.println("reduce cache:"+p.toString()); } System.out.println("cachePath length:"+cachePath.length); getRankList(cachePath[order-1].toString(), context); }else { System.out.println("cachePath ==null || cachePath's lenth is 0"); } } @Override protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { double pagerank = 0; String node = ""; for (Text v : value) { String pString = v.toString(); System.out.println("reduce key="+key.toString()+" reduce value=" + pString); String split[] = pString.split(";"); if (split.length == 1) { // pString is the same as 0.2+ pagerank += Double.parseDouble(pString); } else if (!split[0].contains("{")) { // pString is the same as 0.2;{2,4};{1,3};2 pagerank += Double.parseDouble(split[0]); node = pString.split(";", 2)[1]; } else if (split[0].contains("{")) { // pString is the same as {2,4};{1,3};2 node = pString; } } pagerank = random / G + (1 - random) * pagerank; node = pagerank + ";" + node; System.out.println("reduce key=" + key.toString() + " node_value=" + node); rankMap.put(Integer.parseInt(key.toString()), pagerank); //将每一个节点的pagerank值加入rankMap if (!node.equals("")) context.write(key, new Text(node)); } @Override protected void cleanup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { System.out.println(".................invoke cleanup()......................."); System.out.println("rankList.size="+rankList.size()+" rankMap.size="+rankMap.size()); Configuration configuration=context.getConfiguration(); String order=configuration.get("order"); System.out.println("order:"+order+" invoke cleanup()............."); if(rankList.size()==G && rankMap.size()==G){ double gammar=0; int length=rankList.size(); int orderNum=Integer.parseInt(order); if(orderNum>1){ for(int i=1;i<=length;i++){ gammar+=Math.abs(rankMap.get(i)-rankList.get(i-1)); } String s="第"+orderNum+"次和第"+(orderNum-1)+"次迭代差值:"; pagerankList.add(new RankResult(s,gammar)); } flushCacheFile(rankMap); } else{ System.out.println("rankList.size()!=G || rankMap.size()!=G " + "rankList.size():"+rankList.size()+" rankMap.size():"+rankMap.size()); } } private void flushCacheFile(Hashtable<Integer, Double> rankMap){ File file =new File(rankCachePath); StringBuffer stringBuffer=new StringBuffer(); int length=rankMap.size(); if(length==G){ BufferedWriter writer=null; stringBuffer.append("rank").append("\t"); for(int i=1;i<=G;i++){ stringBuffer.append(i+":"+rankMap.get(i)+","); } String string=stringBuffer.toString().substring(0,stringBuffer.toString().length()-2); System.out.println("Stringbuffer:"+string); try { writer=new BufferedWriter(new FileWriter(file, false)); writer.write(string); writer.close(); } catch (IOException e) { e.printStackTrace(); } }else{ System.out.println("reduce rankMap 长度不够G,不执行flushCacheFile"); } } private void getRankList(String path,Reducer<Text, Text, Text, Text>.Context context) { FileReader reader = null; try { reader = new FileReader(new File(path)); } catch (FileNotFoundException e) { e.printStackTrace(); } BufferedReader in=new BufferedReader(reader); StringBuffer stringBuffer=new StringBuffer(); String string=""; try { while((string=in.readLine())!=null){ stringBuffer.append(string); } } catch (IOException e) { e.printStackTrace(); } String value=stringBuffer.toString().split("\t")[1]; System.out.println("reduce rankList value:"+value); String split[]=value.split(","); for(String pagerank:split) rankList.add(Double.parseDouble(pagerank.split(":")[1])); } } private static boolean deleteOutput(boolean isLocalFile, Configuration conf) throws IOException { if (isLocalFile) { File file = new File(localOutputPath); return deleteFile(file); } else if (!isLocalFile) { FileSystem hdfs = FileSystem.get(conf); boolean isDelete = hdfs.delete(new Path(hdfsOutputPath), true); return isDelete; } else return false; } private static boolean deleteFile(File file) { if (file.isFile()) { return file.delete(); } else if (file.isDirectory()) { String filePath = file.getAbsolutePath(); String[] list = file.list(); for (String subFile : list) { String path = filePath + "/" + subFile; File sonFile = new File(path); deleteFile(sonFile); } file.delete(); } return file.exists() ? false : true; } public static Job getJob(Configuration conf,String input,String output) throws IOException { //Configuration conf=new Configuration(); /*if (deleteOutput(true,conf)) { System.out.println("delete output success"); } else { System.out.println("delete fail,exit program"); System.exit(1); }*/ Job job = new Job(conf, "pagerank"); job.setJarByClass(PageRank.class); DistributedCache.addCacheFile(new Path(rankCachePath).toUri(), conf); job.setMapperClass(PRMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setCombinerClass(PRCombiner.class); job.setReducerClass(PRReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); return job; } public static void run(int number) throws IOException, ClassNotFoundException, InterruptedException{ Configuration configuration = new Configuration(); //有问题?每个job公用一个配置?? deleteOutput(true, configuration); int i=1; String input=""; String output=""; while(i<=number){ System.out.println("i="+i+" pagerankList.length:"+pagerankList.size()); if(i>=3 && pagerankList.get(i-3).rank<=stopFactor){ System.out.println("********pagerankList.get("+(i-3)+").rank="+pagerankList.get(i-3).rank+"<="+stopFactor+" " + "满足迭代终止条件,结束迭代**************************"); break; } if(i==1){ input=localInputPath; output=localOutputPath+"/trash"; System.out.println("*******************第0次MapReduce***************************************"); configuration.set("order",String.valueOf(0)); Job job=getJob(configuration,input, output); job.waitForCompletion(true); }else { input=output; } output=localOutputPath+"/"+i; System.out.println("*******************第"+i+"次MapReduce***************************************"); configuration.set("order",String.valueOf(i)); //位置很重要,切记一定要放在这里!!! Job job=getJob(configuration,input, output); job.waitForCompletion(true); i++; } } public static void printGap(){ int num=pagerankList.size(); Iterator<RankResult> iterator=pagerankList.iterator(); int i=1; while(iterator.hasNext()){ RankResult rankResult=iterator.next(); System.out.print(rankResult.order+rankResult.rank+" "); if(i%3==0) System.out.println(); i++; } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { int n=10; long start=System.currentTimeMillis(); PageRank.run(n); PageRank.printGap(); long end=System.currentTimeMillis(); System.out.println("\n迭代"+n+"次一共花费:"+(end-start)/60000+"分"+((end-start)%60000)/1000+"秒"+(end-start)%1000+"毫秒"); } }
标签:hadoop mapreduce pagerank 分布式缓存 算法
原文地址:http://blog.csdn.net/weiweiyixiaocsdn/article/details/45825623