package com.twq.javaapi.java7; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.io.Serializable; import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import java.util.concurrent.TimeUnit; /** * Created by tangweiqun on 2017/9/16. */ public class BaseActionApiTest { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Integer> listRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 3, 6), 2); //结果: [1, 2, 4, 3, 3, 6] System.out.println("collect = " + listRDD.collect()); //结果:[1, 2] System.out.println("take(2) = " + listRDD.take(2)); //结果:[6, 4] System.out.println("top(2) = " + listRDD.top(2)); //结果:1 System.out.println("first = " + listRDD.first()); //结果:1 System.out.println("min = " + listRDD.min(new AscComparator())); //结果:6 System.out.println("min = " + listRDD.min(new DescComparator())); //结果:6 System.out.println("max = " + listRDD.max(new AscComparator())); //结果:1 System.out.println("max = " + listRDD.max(new DescComparator())); //结果:[1, 2] System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2)); //结果:[1, 2] System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2, new AscComparator())); //结果:[6, 4] System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2, new DescComparator())); listRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer element) throws Exception { //这个性能太差,遍历每一个元素的时候都需要调用比较耗时的getInitNumber //建议采用foreachPartition来代替foreach操作 Integer initNumber = getInitNumber("foreach"); System.out.println((element + initNumber) + "========="); } }); listRDD.foreachPartition(new VoidFunction<Iterator<Integer>>() { @Override public void call(Iterator<Integer> integerIterator) throws Exception { //和foreach api的功能是一样,只不过一个是将函数应用到每一条记录,这个是将函数应用到每一个partition //如果有一个比较耗时的操作,只需要每一分区执行一次这个操作就行,则用这个函数 //这个耗时的操作可以是连接数据库等操作,不需要计算每一条时候去连接数据库,一个分区只需连接一次就行 Integer initNumber = getInitNumber("foreach"); while (integerIterator.hasNext()) { System.out.println((integerIterator.next() + initNumber) + "========="); } } }); Integer reduceResult = listRDD.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer ele1, Integer ele2) throws Exception { return ele1 + ele2; } }); //结果:19 System.out.println("reduceResult = " + reduceResult); Integer treeReduceResult = listRDD.treeReduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }, 3); //结果:19 System.out.println("treeReduceResult = " + treeReduceResult); //和reduce的功能类似,只不过是在计算每一个分区的时候需要加上初始值0,最后再将每一个分区计算出来的值相加再加上这个初始值 Integer foldResult = listRDD.fold(0, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); //结果:19 System.out.println("foldResult = " + foldResult); //先初始化一个我们想要的返回的数据类型的初始值 //然后在每一个分区对每一个元素应用函数一(acc, value) => (acc._1 + value, acc._2 + 1)进行聚合 //最后将每一个分区生成的数据应用函数(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)进行聚合 Tuple2 aggregateResult = listRDD.aggregate(new Tuple2<Integer, Integer>(0, 0), new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer integer) throws Exception { return new Tuple2<>(acc._1 + integer, acc._2 + 1); } }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception { return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2); } }); //结果:(19,6) System.out.println("aggregateResult = " + aggregateResult); Tuple2 treeAggregateResult = listRDD.treeAggregate(new Tuple2<Integer, Integer>(0, 0), new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer integer) throws Exception { return new Tuple2<>(acc._1 + integer, acc._2 + 1); } }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception { return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2); } }); //结果:(19,6) System.out.println("treeAggregateResult = " + treeAggregateResult); } public static Integer getInitNumber(String source) { System.out.println("get init number from " + source + ", may be take much time........"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 1; } private static class AscComparator implements Comparator<Integer>, Serializable { @Override public int compare(java.lang.Integer o1, java.lang.Integer o2) { return o1 - o2; } } private static class DescComparator implements Comparator<Integer>, Serializable { @Override public int compare(java.lang.Integer o1, java.lang.Integer o2) { return o2 - o1; } } }
对于reduce, treeReduce, fold, aggragate, treeAggrate等api的详细原理,可以参考spark core RDD api原理详解。
spark2.x由浅入深深到底系列六之RDD java api详解二
原文地址:http://7639240.blog.51cto.com/7629240/1966172