标签:class package cal nbsp void 内存 factor port imp
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import java.util.Arrays;
import java.util.List;
public class BroadcastVariable {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("BroadcastVariable")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> rdd = sc.parallelize(numbers);
final int factor = 3;
JavaRDD<Integer> newNumbers = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer v1) throws Exception {
return v1 * factor;
}
});
newNumbers.foreach(new VoidFunction<Integer>() {
public void call(Integer number) throws Exception {
System.out.println(number);
}
});
}
}
如上代码在Driver端定义了一个变量 factor,在函数中调用这个factor。实际的执行过程中会发生什么事呢?
假设一个节点上有100个task,那么Spark会为每个task复制一份factor变量放在内存中。
但其实我们只是在函数中读取了这个变量的值进行了计算,完全没有必要复制100份,只需要在当前的Executor中保留一份,所有的task都来读取这一份数据就足够了。
设想一下,如果要共享一个很大的变量,在每个task中都复制一份无疑会消耗巨大的网络带宽和节点内存,这是非常不合理的。
基于这种情况,我们就可以使用广播变量。
package com.rabbit;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import java.util.Arrays;
import java.util.List;
public class BroadcastVariable {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("BroadcastVariable")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> rdd = sc.parallelize(numbers);
final int factor = 3;
//将factor转为广播变量
final Broadcast<Integer> broadcastFactor = sc.broadcast(factor);
JavaRDD<Integer> newNumbers = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer v1) throws Exception {
//使用广播变量时,调用 value()方法获得其内部封装的值
int factor = broadcastFactor.value();
return v1 * factor;
}
});
newNumbers.foreach(new VoidFunction<Integer>() {
public void call(Integer number) throws Exception {
System.out.println(number);
}
});
}
}
Scala 版本:
import org.apache.spark.{SparkConf, SparkContext}
object BroadcastVariable {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("BroadcastVariable")
.setMaster("local")
val sc = new SparkContext(conf)
val arr = Array(1,2,3,4,5)
val numbers = sc.parallelize(arr)
val factor = 3;
val broadcastFactor = sc.broadcast(factor)
val newNumbers = numbers.map(number => number * broadcastFactor.value)
newNumbers.foreach(number => println(number))
}
}
标签:class package cal nbsp void 内存 factor port imp
原文地址:https://www.cnblogs.com/rabbit624/p/10664567.html