关于SQL和Hadoop的实现参考这里 MapReduce编程-自连接
这里用相同的原理,使用spark实现。本人也是刚学Scala,可能写的不好,还请指正。
object SelfUion { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SelfUnion") val sc = new SparkContext(conf) val cpFile = sc.textFile("cp.txt") //val strs = Array[String]("a", "b") // 1) 生成两张表 val res = cpFile.flatMap(line => { val strs: Array[String] = line.split(" "); if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]() }) // 2) 转化为key-value形式 .map(line => { val kv = line.split(" ") (kv(0), kv(1)) }) // 3) 列的相等匹配 .groupByKey() // 4) 解析value,得到结果 .flatMapValues(values => { val childList = new ArrayBuffer[String](); val parentList = new ArrayBuffer[String](); values.foreach( name => { if (name.startsWith("child_")) childList += name else if (name.startsWith("parent_")) parentList += name; }) for (c <- childList; p <- parentList) yield c.substring(6) + " " + p.substring(7) }).values .saveAsTextFile("selfunion_out") } }
Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Tom
输出为:
Terry Lucy Terry Jack Tom Mary Tom Ben Jone Mary Jone Ben Tom Alice Tom Jesse Jone Alice Jone Jesse
另外,附带上使用Scala的函数式编程实现。注意,下面是用的Scala原有的map,flatMap() 等方法,而不是RDD。
使用一个ArrayBuffer[String]带入输入文件,每一行为一个item.
def main(args: Array[String]) { val list = new ArrayBuffer[String](); list+="Tom Lucy" list += "Tom Jack" list += "Jone Lucy" list += "Jone Jack" list += "Lucy Mary" list += "Lucy Ben" list += "Jack Alice" list += "Jack Jesse" list += "Terry Tom" val list2 = list.flatMap(line => { val strs: Array[String] = line.split(" "); if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]() } ) //println("list2 : " + list2) val mapout = list2.map(line => line.split(" ")) //println("mapout : " + mapout) //for(item <- mapout) print(item(0) + " " + item(1) + " ; " ) // println() val groupbyout = mapout.groupBy(_(0)) //same as groupByKey, The first element of the Array val res = groupbyout.values.flatMap(values => { val childList = new ArrayBuffer[String](); val parentList = new ArrayBuffer[String](); values.foreach( names => { if (names(1).startsWith("child_")) childList += names(1) else if (names(1).startsWith("parent_")) parentList += names(1); }) for (c <- childList; p <- parentList) yield Array[String](c,p) }) for(arr <- res){ println(arr(0).substring(6) + " " + arr(1).substring(7)) } }
原文地址:http://blog.csdn.net/gladyoucame/article/details/41486955