标签:ast 过程 代码 pac int 北京 creat -- print
1.规律
如果JoinAPI之前被调用的RDD API是宽依赖(存在shuffle), 而且两个join的RDD的分区数量一致,join结果的rdd分区数量也一样,这个时候join api是窄依赖
除此之外的,rdd 的join api是宽依赖
2.测试程序
1 package com.ibeifeng.senior.join 2 3 import org.apache.spark.{SparkConf, SparkContext} 4 5 /** 6 * RDD数据Join相关API讲解 7 * Created by ibf on 02/09. 8 */ 9 object RDDJoin2 { 10 def main(args: Array[String]): Unit = { 11 val conf = new SparkConf() 12 .setMaster("local[*]") 13 .setAppName("RDD-Join") 14 val sc = SparkContext.getOrCreate(conf) 15 16 // ==================具体代码====================== 17 // 模拟数据产生, 添加map、reduceByKey、mapPartitions等api的主要功能是给rdd1和rdd2中添加一个分区器(表示当前rdd是存在shuffle过程的) 18 val rdd1 = sc.parallelize(Array( 19 (1, "张三1"), 20 (1, "张三2"), 21 (2, "李四"), 22 (3, "王五"), 23 (4, "Tom"), 24 (5, "Gerry"), 25 (6, "莉莉") 26 ), 1).map(x => (x, null)).reduceByKey((x,y) => x, 1).mapPartitions( 27 iter => iter.map(tuple => tuple._1), 28 true // 使用上一个RDD的分区器,false表示不使用, 设置为None 29 ) 30 31 val rdd2 = sc.parallelize(Array( 32 (1, "上海"), 33 (2, "北京1"), 34 (2, "北京2"), 35 (3, "南京"), 36 (4, "纽约"), 37 (6, "深圳"), 38 (7, "香港") 39 ), 1).map(x => (x, null)).reduceByKey((x,y) => x, 1).mapPartitions( 40 iter => iter.map(tuple => tuple._1), 41 true // 使用上一个RDD的分区器,false表示不使用, 设置为None 42 ) 43 44 // 调用RDD API实现内连接 45 val joinResultRDD = rdd1.join(rdd2).map { 46 case (id, (name, address)) => { 47 (id, name, address) 48 } 49 } 50 println("----------------") 51 joinResultRDD.foreachPartition(iter => { 52 iter.foreach(println) 53 }) 54 55 // 休眠为了看4040页面 56 Thread.sleep(1000000) 57 } 58 }
标签:ast 过程 代码 pac int 北京 creat -- print
原文地址:http://www.cnblogs.com/juncaoit/p/6528146.html