val graph=GraphLoader.edgeListFile(sc,"/home/spark/spark/graphx/data/followers.txt")//加载边时顶点是边上出现的点,定点默认数据是1,注意文件格式:1 2,中间是空格graphx只会读取两列分别作为源顶点和目标顶点,如:1 2 other,第三列的other直接被忽略 val users = sc.textFile("/home/spark/spark/graphx/data/users.txt").map { line=> val fields = line.split(",") (fields(0).toLong,(fields(1),fields(2)))//解析顶点数据:ID(一定转成Long型),fisrt name,full name } val myGraph=Graph.apply(users,graph.edges)//由于graph默认将顶点数据设为1,将顶点数据users和边数据graph.edges重构为新图,如果边edges中的顶点A在顶点集合users中没有,则该顶点A将会以默认值初始化,可以添加默认值 如:val defaultUsers=("first name","full name") val myGraph=Graph.apply(users,graph.edges,defaultUsers) graph.vertices.filter{case(id,(firstName,fullName)=>firstName==”BarackObama”}//针对每个顶点进行filter操作,id是顶点ID,(firstName,fullName)是顶点数据 graph.edges.filter{case(src,dst,prop)=>prop==1}//边数据的过滤,src表示源顶点ID,dst表示目标顶点ID,prop表示边上数据 graph.triplet.map(tri=>tri.srcId+” “+tri.srcAttr+” “+tri.dstId+” “+tri.dstAttr+” “+tri.attr+”\n”) //每个triplet进行map操作,srcId,srcAttr,dstId,dstAttr,attr分别表示源顶点标号ID、源顶点数据、目标顶点ID、目标顶点数据、边数据 graph.vertices.mapValues[Int]((id:VertexId,attr:(String,String))=>10)//针对VertexRDD操作 graph.vertices.saveAsTextFile(“pathToFile”) //顶点数据存入图文件,参数是一个目录,类似graph.edges.saveAsTextFile graph.mapVertices[VD2])((id:VertexId,attr:VD)=>VD2) //VD2要和图的VD匹配(VD是图的顶点数据不包括ID,ED是边的数据不包括srcID和dstID) 如:myGraph.mapVertices[(String,String)]((id,(fistName,fullName))=>(firstName.toUpperCase,fullName))//将顶点数据的firstName改为大写 graph.mapEdges(e=>e.attr+10) //针对每条边进行map操作并返回边集合,e的类型类似于(src,dst,prop)包含了源顶点ID,目标顶点ID,边数据 graph.mapTriplets(triplet=>triplet.attr+10) //针对图中每个triplet进行map操作 graph.reverse //所有边反向 graph.mask[VD2,ED2](other:Graph[VD2,ED2]):Graph[Vd,ED]//返回graph和other的交集 graph.subgraph(edge=>true,(id,prop)=>prop==1)//返回边满足条件(这里是true全部边都满足)及顶点数据为1的子图 graph.joinVertices(other:RDD[(VertexId,U))(map:(VertexId,VD,U)=>VD):Graph[VD,ED] //graph和other相交的顶点执行map函数,在graph中但是不在other中的地点保持不变 graph.outerJoinVertices[U,VD2](other:RDD[(VertexId,U))(map:(VertexId,VD,option)=>VD2):Graph[VD2,ED] //和joinVertices类似,但不同的是在graph中但不在other中的顶点也要执行map函数 graph.mapReduceTriplets[A](map:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],reduce:(A,A)=>A):VertexRDD[A] //针对每个triplet执行map函数(发送消息,暂时只能单向传递消息,即所有的triplet执行map时要么都是想源顶点发送消息,要么都向目标顶点发送消息),并由reduce收集发送给顶点的消息 graph.inDegrees //返回图的入度,类型为VertexRDD[Int] graph.outDegrees //返回出度,(VertexID,Int) graph.Degrees //返回顶点的度 graph.collectNeighbors(edgeDirection:EdgeDirection):VertexRDD[Array[(VertexId,VD)]] //收集每个顶点的邻居顶点数据,返回的是一个数组,数组元素是邻居顶点ID和其顶点数据 EdgeDirection.Out //出边方向 EdgeDirection.In //入边方向 EdgeDirection.Either //出边或入边方向 EdgeDirection.Both //出边和入边方向 graph.pregel[A] ( initialMsg:A,//初始消息 maxIter:Int=Int.MaxValue,//最大迭代次数 activeDir:EdgeDirection=EdgeDirection.Out )//消息传递方向 ( vprog:(VertexId,VD,A)=>VD,//顶点程序 sendMsg:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],//发送消息 mergeMsg:(A,A)=>A ):Graph[VD,ED]//汇集消息 graph.pageRank(0.0001) //计算PageRank值,针对非联通图也可以 前面我所引用的数据格式如下: users.txt: 第一列为ID,第二列为fistName,第二列为fullName 1,BarackObama,Barack Obama 2,ladygaga,Goddess of Love 3,jeresig,John Resig 4,justinbieber,Justin Bieber 6,matei_zaharia,Matei Zaharia 7,odersky,Martin Odersky 8,anonsys,xxoo followers.txt如下: 2 1 4 1 1 2 6 3 7 3 7 6 6 7 3 7
原文地址:http://blog.csdn.net/liuxuejiang158blog/article/details/37928863