码迷,mamicode.com
首页 > 编程语言 > 详细

pagerank算法的MapReduce实现

时间:2015-05-18 23:09:50      阅读:148      评论:0      收藏:0      [点我收藏+]

标签:hadoop   mapreduce   pagerank   分布式缓存   算法   

pagerank是一种不容易被欺骗的计算Web网页重要性的工具,pagerank是一个函数,它对Web中(或者至少是抓取并发现其中连接关系的一部分web网页)的每个网页赋予一个实数值。他的意图在于,网页 的pagerank越高,那么它就越重要。并不存在一个固定的pagerank分配算法。

对于pagerank算法的推到我在这里不想做过多的解释,有兴趣的可以自己查看资料看看,这里我直接给出某个网页pagerank的求解公式:

     P(n)=a/G+(1-a)*求和(P(m)/C(m))     (m属于L(n))

其中:G 为网页的数量,P(n)为页面n的pagerank值,C(m)为页面m含有的连接数量,a为随机跳转因子,其中求和符号不能打印,我直接使用文字给出,L(n)表示存在到页面n链接的页面的集合。

下面给出pagerank的MapReduce实现,其中输入文件必须要求的格式为:

输入文件 pagerank.txt:

页面id 初始pagerank值;{对于页面n,n所包含的链接所指向的页面id集合(即出链集合)};{对于页面n,包含页面n链接的页面id集合(即入链集合)};包含链接个数

注意:这中间一定是分号分隔

   1   0.2;{2,4};{5};2
   2 0.2;{3,5};{1,5};2
   3 0.2;{4};{2,5};1
   4 0.2;{5};{1,3};1
   5 0.2;{1,2,3};{2,4};3

分布式缓存文件 rankCache.txt

rank 页面id:页面pagerank值,页面id:页面pagerank值,页面id:页面pagerank值....

rank 1:0.2,2:0.2,3:0.2,4:0.2,5:0.2

介绍完了两个输入文件,下面是pagerank算法的MapReduce实现:当然输出路径是自己设置

package soft.project;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PageRank {

	private final static String localInputPath = "/home/hadoop/test/mapReduce/pagerank.txt";
// private final static String hdfsInputPath = "hdfs:/192.168.0.1:9000/user/hadoop/pagerank";
	private final static String localOutputPath = "/home/hadoop/test/mapReduce/pagerank";
	private final static String hdfsOutputPath = "hdfs:/192.168.0.1:9000/user/hadoop/pagerank";
	private final static String rankCachePath="/home/hadoop/test/mapReduce/rankCache.txt";
	private static List<RankResult> pagerankList=new Vector<RankResult>();
	private final static double random = 0.85;    //随机跳转因子
	private final static double stopFactor=0.001;   //上一次和这次每个网页pagerank差值的绝对值之和小于stopFactor则停止迭代
	private final static long G = 5;        //初始网页数量

	private static class RankResult{
		private String order="";
		private double rank=0;
		
		@SuppressWarnings("unused")
		public RankResult() {}
		public RankResult(String order,double rank){
			this.order=order;
			this.rank=rank;
		}
	}
	
	private static class PRMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		private String keyinfo = "";
		private String valueinfo = "";

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split(";");
			String outLink[] = split[1].split("[{}]")[1].split(",");
			double pagerank = Double.parseDouble(split[0].split("\\s")[1]);
			double c = Double.parseDouble(split[3]);
			double k = pagerank / c;
			/*System.out.println("page:" + split[0].split("\\s")[0] + "pagerank:"
					+ pagerank + "  c:" + c);*/
			for (String page : outLink) {
				context.write(new Text(page), new Text(String.valueOf(k)));
			//	System.out.println("page:" + page + "  ragerank:" + k);
			}
			writeNode(value, context);
		}

		private void writeNode(Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String split[] = value.toString().split("\\s");
			valueinfo = split[1].split(";", 2)[1];
			keyinfo = split[0];
			context.write(new Text(keyinfo), new Text(valueinfo));
			/*System.out.println("keyinfo:" + keyinfo + "  valueinfo:"
					+ valueinfo);*/
		}
	}

	private static class PRCombiner extends Reducer<Text, Text, Text, Text> {

		@Override
		protected void reduce(Text key, Iterable<Text> value,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String v = "";
			double pagerank = 0;
			for (Text text : value) {
				String valueString = text.toString();
				if (valueString.contains("{")) {
					v = valueString;
				} else {
					pagerank += Double.parseDouble(valueString);
				}
			}
			if (v.equals("")) {
				context.write(key, new Text(String.valueOf(pagerank)));
			} else {
				String s = pagerank + ";" + v;
				context.write(key, new Text(s));
			}

		}

	}

	private static class PRReducer extends Reducer<Text, Text, Text, Text> {

		private List<Double> rankList=new Vector<Double>((int)G);          //是否每个job都是重新创建一个rankList和rankMap???
		private Hashtable<Integer, Double> rankMap=new Hashtable<Integer, Double>();
		
		@Override
		protected void setup(Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			Configuration conf=context.getConfiguration();
			int order=Integer.parseInt(conf.get("order"));
			System.out.println(".................... invoke setup().................");
			Path cachePath[]=DistributedCache.getLocalCacheFiles(conf);
			if(cachePath==null || cachePath.length>0){
				for(Path p:cachePath){
					System.out.println("reduce cache:"+p.toString());
				}
				System.out.println("cachePath length:"+cachePath.length);
				getRankList(cachePath[order-1].toString(), context);
			}else {
				System.out.println("cachePath ==null || cachePath's lenth is 0");
			}
		}
		
		@Override
		protected void reduce(Text key, Iterable<Text> value,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			double pagerank = 0;
			String node = "";
			for (Text v : value) {
				String pString = v.toString();
				System.out.println("reduce key="+key.toString()+"  reduce value=" + pString);
				String split[] = pString.split(";");
				if (split.length == 1) { // pString is the same as 0.2+
					
					pagerank += Double.parseDouble(pString);
				} else if (!split[0].contains("{")) { // pString is the same as 0.2;{2,4};{1,3};2
					pagerank += Double.parseDouble(split[0]);
					node = pString.split(";", 2)[1];
				} else if (split[0].contains("{")) { // pString is the same as	 {2,4};{1,3};2
					node = pString;
				}
			}
			pagerank = random / G + (1 - random) * pagerank;
			node = pagerank + ";" + node;
			System.out.println("reduce  key=" + key.toString() + "  node_value=" + node);
			rankMap.put(Integer.parseInt(key.toString()), pagerank);         //将每一个节点的pagerank值加入rankMap
			if (!node.equals(""))
				context.write(key, new Text(node));
		}
		
		@Override
		protected void cleanup(Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			System.out.println(".................invoke cleanup().......................");
			System.out.println("rankList.size="+rankList.size()+"  rankMap.size="+rankMap.size());
			Configuration configuration=context.getConfiguration();
			String order=configuration.get("order");
			System.out.println("order:"+order+"  invoke cleanup().............");
			if(rankList.size()==G && rankMap.size()==G){
				double gammar=0;
				int length=rankList.size();
				int orderNum=Integer.parseInt(order);
				if(orderNum>1){
					for(int i=1;i<=length;i++){
						gammar+=Math.abs(rankMap.get(i)-rankList.get(i-1));
					}
				String s="第"+orderNum+"次和第"+(orderNum-1)+"次迭代差值:";
				pagerankList.add(new RankResult(s,gammar));
				}
				flushCacheFile(rankMap);
			}
			else{
				System.out.println("rankList.size()!=G || rankMap.size()!=G "
						+ "rankList.size():"+rankList.size()+"  rankMap.size():"+rankMap.size());
			}
		}
		
		private void flushCacheFile(Hashtable<Integer, Double> rankMap){
			File file =new File(rankCachePath);
			StringBuffer stringBuffer=new StringBuffer();
			int length=rankMap.size();
			if(length==G){
				BufferedWriter writer=null;
				stringBuffer.append("rank").append("\t");
				for(int i=1;i<=G;i++){
					stringBuffer.append(i+":"+rankMap.get(i)+",");
				}
				String string=stringBuffer.toString().substring(0,stringBuffer.toString().length()-2);
				System.out.println("Stringbuffer:"+string);
				try {
					writer=new BufferedWriter(new FileWriter(file, false));
					writer.write(string);
					writer.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}else{
				System.out.println("reduce rankMap 长度不够G,不执行flushCacheFile");
			}
		}

		private void  getRankList(String  path,Reducer<Text, Text, Text, Text>.Context context) {
			FileReader reader = null;
			try {
				reader = new FileReader(new File(path));
			} catch (FileNotFoundException e) {
				e.printStackTrace();
			}
			BufferedReader in=new BufferedReader(reader);
			StringBuffer stringBuffer=new StringBuffer();
			String string="";
			try {
				while((string=in.readLine())!=null){
					stringBuffer.append(string);
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
			String value=stringBuffer.toString().split("\t")[1];
			System.out.println("reduce  rankList value:"+value);
			String split[]=value.split(",");
			for(String pagerank:split)
				rankList.add(Double.parseDouble(pagerank.split(":")[1]));
		}

	}
 
	private static boolean deleteOutput(boolean isLocalFile, Configuration conf)
			throws IOException {
		if (isLocalFile) {
			File file = new File(localOutputPath);
			return deleteFile(file);
		} else if (!isLocalFile) {
			FileSystem hdfs = FileSystem.get(conf);
			boolean isDelete = hdfs.delete(new Path(hdfsOutputPath), true);
			return isDelete;
		} else
			return false;
	}

	private static boolean deleteFile(File file) {
		if (file.isFile()) {
			return file.delete();
		} else if (file.isDirectory()) {
			String filePath = file.getAbsolutePath();
			String[] list = file.list();
			for (String subFile : list) {
				String path = filePath + "/" + subFile;
				File sonFile = new File(path);
				deleteFile(sonFile);
			}
			file.delete();
		}
		return file.exists() ? false : true;
	}

	public static Job getJob(Configuration conf,String input,String output) throws IOException {
		//Configuration conf=new Configuration();
		/*if (deleteOutput(true,conf)) {
			System.out.println("delete output success");
		} else {
			System.out.println("delete fail,exit program");
			System.exit(1);
		}*/

		Job job = new Job(conf, "pagerank");
		job.setJarByClass(PageRank.class);
		
		DistributedCache.addCacheFile(new Path(rankCachePath).toUri(), conf);

		job.setMapperClass(PRMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setCombinerClass(PRCombiner.class);

		job.setReducerClass(PRReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(input));
		FileOutputFormat.setOutputPath(job, new Path(output));

		return job;
	}
	
	public static void run(int number) throws IOException, ClassNotFoundException, InterruptedException{
		Configuration configuration = new Configuration();          //有问题?每个job公用一个配置??
		deleteOutput(true, configuration);
		int i=1;
		String input="";
		String output="";
		while(i<=number){
			System.out.println("i="+i+"  pagerankList.length:"+pagerankList.size());
			if(i>=3 && pagerankList.get(i-3).rank<=stopFactor){
				System.out.println("********pagerankList.get("+(i-3)+").rank="+pagerankList.get(i-3).rank+"<="+stopFactor+" "
						+ "满足迭代终止条件,结束迭代**************************");
				break;
			}
			if(i==1){
				input=localInputPath;
				output=localOutputPath+"/trash";
				System.out.println("*******************第0次MapReduce***************************************");
				configuration.set("order",String.valueOf(0));
				Job job=getJob(configuration,input, output);
				job.waitForCompletion(true);
			}else {
				input=output;
			}
			output=localOutputPath+"/"+i;
			System.out.println("*******************第"+i+"次MapReduce***************************************");
			configuration.set("order",String.valueOf(i));       //位置很重要,切记一定要放在这里!!!
			Job job=getJob(configuration,input, output);
			job.waitForCompletion(true);
			i++;
		}
	}

	public static void printGap(){
		int num=pagerankList.size();
			Iterator<RankResult> iterator=pagerankList.iterator();
			int i=1;
			while(iterator.hasNext()){
				RankResult rankResult=iterator.next();
				System.out.print(rankResult.order+rankResult.rank+"    ");
				if(i%3==0)
					System.out.println();
				i++;
			}
	}
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		  int n=10;
		  long start=System.currentTimeMillis();
		  PageRank.run(n);
		  PageRank.printGap();
		  long  end=System.currentTimeMillis();
		  System.out.println("\n迭代"+n+"次一共花费:"+(end-start)/60000+"分"+((end-start)%60000)/1000+"秒"+(end-start)%1000+"毫秒");
	}
}
















pagerank算法的MapReduce实现

标签:hadoop   mapreduce   pagerank   分布式缓存   算法   

原文地址:http://blog.csdn.net/weiweiyixiaocsdn/article/details/45825623

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!