码迷,mamicode.com
首页 > 其他好文 > 详细

spark 广播变量 累加器

时间:2020-07-19 11:44:16      阅读:62      评论:0      收藏:0      [点我收藏+]

标签:pre   取数   amp   只读   使用   import   override   main   rdd   

广播变量

object Main { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]") val sparkContext = new SparkContext(sparkConf) val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2) val i: Int =3 val broadcast = sparkContext.broadcast(i) rdd.map(_+broadcast.value).foreach(println) sparkContext.stop() } }

  持久化广播变量:

广播变量会持续占用内存,当我们不需要的时候,可以用 unpersist 算子将其移除

    broadcast.unpersist()

  这时,如果计算任务又用到广播变量,那么就会重新拉取数据

你还可以使用 destroy 方法彻底销毁广播变量,调用该方法后,如果计算任务中又用到广播变量,则会抛出异常

    broadcast.destroy()

  广播变量在一定数据量范围内可以有效地使作业避免 Shuffle,使计算尽可能本地运行,Spark 的 Map 端连接操作就是用广播变量实现的。

 

累加器
与广播变量只读不同,累加器是一种只能进行增加操作的共享变量

package bigdata

import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]")
    val sparkContext = new SparkContext(sparkConf)
    val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2)
    val aAccumulator: LongAccumulator = sparkContext.longAccumulator("acc")
    rdd.map(x=>{
      aAccumulator.add(x)
      x+1
    }).foreach(print)
    print(aAccumulator)
    sparkContext.stop()
  }
}

  输出:

567234

LongAccumulator(id: 0, name: Some(acc), value: 21)

 

自定义累加器:

package bigdata

import org.apache.spark.util.AccumulatorV2

case class SumAandB(A:Long,B:Long)
class MyAccumulator extends  AccumulatorV2[SumAandB,SumAandB]{
  private var a=0L
  private var b=0L

  override def isZero: Boolean = {
    a==0 && b==0
  }

  override def copy(): AccumulatorV2[SumAandB, SumAandB] = {
    val accumulator = new MyAccumulator
    accumulator.a=this.a
    accumulator.b=this.b
    accumulator
  }

  override def reset(): Unit = {a=0;b=0
  }

  override def add(v: SumAandB): Unit = {
    a=a+v.A
    b=b+v.B
  }

  override def merge(other: AccumulatorV2[SumAandB, SumAandB]): Unit = {
    other match{
      case e:MyAccumulator=>{
        a+=e.a
        b+=e.b
      }
      case _ =>
    }}

  override def value: SumAandB = SumAandB(a,b)
}

  

使用自定义累加器:

package bigdata

import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]")
    val sparkContext = new SparkContext(sparkConf)
    val myAccumulator = new MyAccumulator
    sparkContext.register(myAccumulator,"myAccumulator")

    val rdd: RDD[SumAandB] = sparkContext.parallelize(List(SumAandB(1, 2), SumAandB(0, 2), SumAandB(3, 2)), 2)
    rdd.map(s=>{
      myAccumulator.add(s)
      s
    }).foreach(println)
    print(myAccumulator)
    sparkContext.stop()
  }
}

  

spark 广播变量 累加器

标签:pre   取数   amp   只读   使用   import   override   main   rdd   

原文地址:https://www.cnblogs.com/ls-oyang/p/13338707.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!