package com.twq.javaapi.java7;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
* Created by tangweiqun on 2017/9/16.
*/
public class BaseActionApiTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> listRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 3, 6), 2);
//结果: [1, 2, 4, 3, 3, 6]
System.out.println("collect = " + listRDD.collect());
//结果:[1, 2]
System.out.println("take(2) = " + listRDD.take(2));
//结果:[6, 4]
System.out.println("top(2) = " + listRDD.top(2));
//结果:1
System.out.println("first = " + listRDD.first());
//结果:1
System.out.println("min = " + listRDD.min(new AscComparator()));
//结果:6
System.out.println("min = " + listRDD.min(new DescComparator()));
//结果:6
System.out.println("max = " + listRDD.max(new AscComparator()));
//结果:1
System.out.println("max = " + listRDD.max(new DescComparator()));
//结果:[1, 2]
System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2));
//结果:[1, 2]
System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2, new AscComparator()));
//结果:[6, 4]
System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2, new DescComparator()));
listRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer element) throws Exception {
//这个性能太差,遍历每一个元素的时候都需要调用比较耗时的getInitNumber
//建议采用foreachPartition来代替foreach操作
Integer initNumber = getInitNumber("foreach");
System.out.println((element + initNumber) + "=========");
}
});
listRDD.foreachPartition(new VoidFunction<Iterator<Integer>>() {
@Override
public void call(Iterator<Integer> integerIterator) throws Exception {
//和foreach api的功能是一样,只不过一个是将函数应用到每一条记录,这个是将函数应用到每一个partition
//如果有一个比较耗时的操作,只需要每一分区执行一次这个操作就行,则用这个函数
//这个耗时的操作可以是连接数据库等操作,不需要计算每一条时候去连接数据库,一个分区只需连接一次就行
Integer initNumber = getInitNumber("foreach");
while (integerIterator.hasNext()) {
System.out.println((integerIterator.next() + initNumber) + "=========");
}
}
});
Integer reduceResult = listRDD.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer ele1, Integer ele2) throws Exception {
return ele1 + ele2;
}
});
//结果:19
System.out.println("reduceResult = " + reduceResult);
Integer treeReduceResult = listRDD.treeReduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}, 3);
//结果:19
System.out.println("treeReduceResult = " + treeReduceResult);
//和reduce的功能类似,只不过是在计算每一个分区的时候需要加上初始值0,最后再将每一个分区计算出来的值相加再加上这个初始值
Integer foldResult = listRDD.fold(0, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
//结果:19
System.out.println("foldResult = " + foldResult);
//先初始化一个我们想要的返回的数据类型的初始值
//然后在每一个分区对每一个元素应用函数一(acc, value) => (acc._1 + value, acc._2 + 1)进行聚合
//最后将每一个分区生成的数据应用函数(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)进行聚合
Tuple2 aggregateResult = listRDD.aggregate(new Tuple2<Integer, Integer>(0, 0),
new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer integer) throws Exception {
return new Tuple2<>(acc._1 + integer, acc._2 + 1);
}
}, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2);
}
});
//结果:(19,6)
System.out.println("aggregateResult = " + aggregateResult);
Tuple2 treeAggregateResult = listRDD.treeAggregate(new Tuple2<Integer, Integer>(0, 0),
new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer integer) throws Exception {
return new Tuple2<>(acc._1 + integer, acc._2 + 1);
}
}, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2);
}
});
//结果:(19,6)
System.out.println("treeAggregateResult = " + treeAggregateResult);
}
public static Integer getInitNumber(String source) {
System.out.println("get init number from " + source + ", may be take much time........");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
private static class AscComparator implements Comparator<Integer>, Serializable {
@Override
public int compare(java.lang.Integer o1, java.lang.Integer o2) {
return o1 - o2;
}
}
private static class DescComparator implements Comparator<Integer>, Serializable {
@Override
public int compare(java.lang.Integer o1, java.lang.Integer o2) {
return o2 - o1;
}
}
}对于reduce, treeReduce, fold, aggragate, treeAggrate等api的详细原理,可以参考spark core RDD api原理详解。
spark2.x由浅入深深到底系列六之RDD java api详解二
原文地址:http://7639240.blog.51cto.com/7629240/1966172