标签:var 版本 else 代码实现 builder .sql pager blank style
闲来无事,整理一下算法。今天整理一下PageRank。
网上搜了搜感觉这篇文章还不错
http://www.cnblogs.com/fengfenggirl/p/pagerank-introduction.html
本文对这篇文章进行修改加工,加入了一些自己的思想,后面代码实现用了Spark而不是原文的MR。
PageRank作用是给出网页的重要性,它的思想是这样的:
根据“民主投票”来确定的重要性,越重要的话语权越大,不断的投票获得新的重要值,直到收敛。(能力有限就不推导证明收敛了)
PageRank让网页通过链接去投票。如果网页A有三个链接B、C、D,其重要性为PRa,那么BCD这轮迭代分别从A获得了1/3 PRa的投票值。一个网页把所有获得的投票值累加起来,就是新的网页重要值。
还有一种解释就是概率的解释。我们最开始认为所有网页被访问的概率都是一样的,假设有M个网页,那么每个网页被访问的概率是1/M。一个网页如果有N个链接,那么从这个网页到一个被链接的网页的概率就是1/M*1/N。
但是迭代过程中有一个问题,就是没有出度的节点,最后会使每个网页被访问的概率都是零,用上面的说法解释就是,我进入了这个网页后我没有概率去访问任何网站。另一个问题就是出度wei1,自己链接自己,这样迭代下去会变成其他网页概率为零,只有自己是1,也就是进入这个网站后只能访问自己了。
模拟下人浏览网站,用户不一定非要点击页面中的链接进入下一个页面,还可以通过随机输入网址获得下一页。所以为了解决上诉两个问题,加入了一个?概率。
其中M为网页链接关系矩阵,V为旧的PageRank值(就是重要性值),e为初始值。
下面是spark实现的pagerank,根据官网example代码修改
import org.apache.spark.sql.SparkSession /* 我们的数据是这种格式的 url 链接的url 中间用空格隔开,每行两个url **/ object SparkPageRank { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: SparkPageRank <file> <iter>") System.exit(1) } showWarning() //spark 2.0版本 val spark = SparkSession .builder .appName("SparkPageRank") .getOrCreate() // 循环次数 val iters = if (args.length > 1) args(1).toInt else 10 val lines = spark.read.textFile(args(0)).rdd // 生成(url,[a,b,c...]) val links = lines.map{ s => val parts = s.split("\\s+") (parts(0), parts(1)) }.distinct().groupByKey().cache()//以后还用先存起来 // 生成rank表 (url,rank)初始值都为1.0,也就是e=[1,1,1,..]T var ranks = links.mapValues(v => 1.0) for (i <- 1 to iters) { val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => val size = urls.size urls.map(url => (url, rank / size)) } //(url1,r1),(url1,r2),(url2,r3).... ranks =contribs.reduceByKey(_+_).mapValues(0.15 + 0.85 * _ ) } val output = ranks.collect() output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + ".")) spark.stop() } }
标签:var 版本 else 代码实现 builder .sql pager blank style
原文地址:http://www.cnblogs.com/matianchi/p/7010773.html