标签:lag 三元 create collect cal 并行 generator sage reac
参考博客:https://www.cnblogs.com/yszd/p/10186556.html
1 package graphx 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.graphx.util.GraphGenerators 5 import org.apache.spark.sql.SparkSession 6 7 /** 8 * Created by Administrator on 2019/10/22. 9 */ 10 object AggregateMessage { 11 /** 12 * 设置日志级别为WARN 13 */ 14 Logger.getLogger("org").setLevel(Level.WARN) 15 def main(args: Array[String]) { 16 /** 17 * 创建spark入口 18 */ 19 val spark = SparkSession.builder().appName("AggregateMessage").master("local[2]").getOrCreate() 20 val sc = spark.sparkContext 21 22 /** 23 * 随机生成图,默认出度为4,标准偏差为1.3,并行生成numVertices,partition默认为sc的默认partition 24 */ 25 val graph = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices((id, _) => id.toDouble) 26 graph.vertices.take(5).foreach(println) 27 /** 28 * 将用户定义的sendMsg函数应用于图形中的每个边三元组,然后使用mergeMsg函数汇聚信息到目标顶点 29 */ 30 val olderFollowers = graph.aggregateMessages[(Int, Double)](triplet =>{ 31 if(triplet.srcAttr > triplet.dstAttr){ 32 triplet.sendToDst(1, triplet.srcAttr) 33 } 34 }, 35 (a, b) => (a._1 + b._1, a._2 + b._2) 36 ) 37 38 /** 39 * 求平均值 40 */ 41 val avgAgeOfOlderFollowers = olderFollowers.mapValues((id, value) => value match {case (count, totalAge) => totalAge / count}) 42 43 /** 44 * 输出结果 45 */ 46 avgAgeOfOlderFollowers.collect().take(5).foreach(println) 47 } 48 }
随机生成的顶点数据:
聚合结果:
Spark GraphX图计算核心算子实战【AggreagteMessage】
标签:lag 三元 create collect cal 并行 generator sage reac
原文地址:https://www.cnblogs.com/yszd/p/11726921.html