1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import scala.Tuple2; import java.util.Arrays; import java.util.Map;
public class JoinRDD { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("ReduceByKey").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); sc.setLogLevel("WARN");
JavaRDD<Tuple2<Integer,Integer>> rddPre = sc.parallelize(Arrays.asList(new Tuple2(1,2) , new Tuple2(3,4) , new Tuple2(3,6))); JavaRDD<Tuple2<Integer,Integer>> otherPre = sc.parallelize(Arrays.asList(new Tuple2(3,10),new Tuple2(4,8)));
JavaPairRDD<Integer, Integer> rdd = JavaPairRDD.fromJavaRDD(rddPre); JavaPairRDD<Integer, Integer> other = JavaPairRDD.fromJavaRDD(otherPre); JavaPairRDD<Integer, Integer> subRDD = rdd.subtractByKey(other);
JavaPairRDD<Integer, Tuple2<Integer, Integer>> joinRDD = rdd.join(other); JavaPairRDD<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinRDD = rdd.fullOuterJoin(other); JavaPairRDD<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinRDD = rdd.leftOuterJoin(other);
JavaPairRDD<Integer, Tuple2<Optional<Integer>, Integer>> rightOutJoinRDD = rdd.rightOuterJoin(other); Map<Integer, Integer> subMap = subRDD.collectAsMap(); System.out.println("-------------subRDD-------------"); for (Integer key : subMap.keySet()) { System.out.println("subRDD: "+key+", "+subMap.get(key)); } Map<Integer, Tuple2<Integer, Integer>> joinMap = joinRDD.collectAsMap(); System.out.println("-------------joinRDD-------------"); for (Integer key : joinMap.keySet()) { System.out.println("join: "+key+", Tuple("+joinMap.get(key)._1+","+joinMap.get(key)._2+")"); } Map<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinMap = fullOutJoinRDD.collectAsMap(); System.out.println("-------------fullOutJoinRDD-------------"); for (Integer key : fullOutJoinMap.keySet()) { System.out.println("fullOutJoinRDD: "+key+", Tuple("+fullOutJoinMap.get(key)._1+","+fullOutJoinMap.get(key)._2+")"); }
Map<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinMap = leftOutJoinRDD.collectAsMap(); System.out.println("-------------leftOutJoinRDD-------------"); for (Integer key : leftOutJoinMap.keySet()) { System.out.println("leftOutJoinRDD: "+key+", Tuple("+leftOutJoinMap.get(key)._1+","+leftOutJoinMap.get(key)._2+")"); }
Map<Integer, Tuple2<Optional<Integer>, Integer>> rightOutJoinMap = rightOutJoinRDD.collectAsMap(); System.out.println("-------------rightOutJoinRDD-------------"); for (Integer key : rightOutJoinMap.keySet()) { System.out.println("rightOutJoinRDD: "+key+", Tuple("+rightOutJoinMap.get(key)._1+","+rightOutJoinMap.get(key)._2+")"); }
} }
|