标签:pre 取数 amp 只读 使用 import override main rdd
广播变量
object Main { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]") val sparkContext = new SparkContext(sparkConf) val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2) val i: Int =3 val broadcast = sparkContext.broadcast(i) rdd.map(_+broadcast.value).foreach(println) sparkContext.stop() } }
持久化广播变量:
广播变量会持续占用内存,当我们不需要的时候,可以用 unpersist 算子将其移除
broadcast.unpersist()
这时,如果计算任务又用到广播变量,那么就会重新拉取数据
你还可以使用 destroy 方法彻底销毁广播变量,调用该方法后,如果计算任务中又用到广播变量,则会抛出异常
broadcast.destroy()
广播变量在一定数据量范围内可以有效地使作业避免 Shuffle,使计算尽可能本地运行,Spark 的 Map 端连接操作就是用广播变量实现的。
累加器
与广播变量只读不同,累加器是一种只能进行增加操作的共享变量
package bigdata
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object Main {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]")
val sparkContext = new SparkContext(sparkConf)
val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2)
val aAccumulator: LongAccumulator = sparkContext.longAccumulator("acc")
rdd.map(x=>{
aAccumulator.add(x)
x+1
}).foreach(print)
print(aAccumulator)
sparkContext.stop()
}
}
输出:
567234
LongAccumulator(id: 0, name: Some(acc), value: 21)
自定义累加器:
package bigdata
import org.apache.spark.util.AccumulatorV2
case class SumAandB(A:Long,B:Long)
class MyAccumulator extends AccumulatorV2[SumAandB,SumAandB]{
private var a=0L
private var b=0L
override def isZero: Boolean = {
a==0 && b==0
}
override def copy(): AccumulatorV2[SumAandB, SumAandB] = {
val accumulator = new MyAccumulator
accumulator.a=this.a
accumulator.b=this.b
accumulator
}
override def reset(): Unit = {a=0;b=0
}
override def add(v: SumAandB): Unit = {
a=a+v.A
b=b+v.B
}
override def merge(other: AccumulatorV2[SumAandB, SumAandB]): Unit = {
other match{
case e:MyAccumulator=>{
a+=e.a
b+=e.b
}
case _ =>
}}
override def value: SumAandB = SumAandB(a,b)
}
使用自定义累加器:
package bigdata
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object Main {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]")
val sparkContext = new SparkContext(sparkConf)
val myAccumulator = new MyAccumulator
sparkContext.register(myAccumulator,"myAccumulator")
val rdd: RDD[SumAandB] = sparkContext.parallelize(List(SumAandB(1, 2), SumAandB(0, 2), SumAandB(3, 2)), 2)
rdd.map(s=>{
myAccumulator.add(s)
s
}).foreach(println)
print(myAccumulator)
sparkContext.stop()
}
}
标签:pre 取数 amp 只读 使用 import override main rdd
原文地址:https://www.cnblogs.com/ls-oyang/p/13338707.html