-
从部署图了解
Spark
部署了什么, 有什么组件运行在集群中 -
通过对
WordCount
案例的解剖, 来理解执行逻辑计划的生成 -
通过对逻辑执行计划的细化, 理解如何生成物理计划
标签:常用 组件 extern bool 分数 字符串表 优化 做什么 hdfs
如何判断宽窄依赖:
===================================
从部署图了解 Spark
部署了什么, 有什么组件运行在集群中
通过对 WordCount
案例的解剖, 来理解执行逻辑计划的生成
通过对逻辑执行计划的细化, 理解如何生成物理计划
如无特殊说明, 以下部分均针对于 |
如何生成 RDD
如何控制 RDD 之间的关系
本章要回答如下三个问题
如何生成 RDD
生成什么 RDD
如何计算 RDD 中的数据
val sc = ...
val textRDD = sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop"))
val splitRDD = textRDD.flatMap(_.split(" "))
val tupleRDD = splitRDD.map((_, 1))
val reduceRDD = tupleRDD.reduceByKey(_ + _)
val strRDD = reduceRDD.map(item => s"${item._1}, ${item._2}")
println(strRDD.toDebugString)
strRDD.collect.foreach(item => println(item))
textFile
算子的背后map
算子的背后flatMap
算子的背后textRDD
→ splitRDD
→ tupleRDD
RDD
?RDD
?RDD
中的数据 ?讨论什么是 RDD 之间的依赖关系
继而讨论 RDD 分区之间的关系
最后确定 RDD 之间的依赖关系分类
完善案例的逻辑关系图
RDD
之间的依赖关系?reduceByKey
算子会生成 ShuffledRDD
上个小节通过例子演示了 RDD 的分区间的关系有两种形式
一对一, 一般是直接转换
多对一, 一般是 Shuffle
本小节会说明如下问题:
如果分区间得关系是一对一或者多对一, 那么这种情况下的 RDD 之间的关系的正式命名是什么呢?
RDD 之间的依赖关系, 具体有几种情况呢?
RDD 的逻辑图本质上是对于计算过程的表达, 例如数据从哪来, 经历了哪些步骤的计算
每一个步骤都对应一个 RDD, 因为数据处理的情况不同, RDD 之间的依赖关系又分为窄依赖和宽依赖 *
常见的窄依赖其实也是有分类的, 而且宽窄以来不太容易分辨, 所以通过本章, 帮助同学明确窄依赖的类型
物理图的意义
如何划分 Task
如何划分 Stage
生成逻辑图和物理图的系统组件
Job
和 Stage
, Task
之间的关系
如何调度 Job
Job
是什么 ?Job
和 Stage
的关系Stage
和 Task
的关系本章节重点是介绍 Shuffle
的流程, 因为根据 ShuffleWriter
的实现不同, 其过程也不同, 所以前半部分根据默认的存储引擎 SortShuffleWriter
来讲解
后半部分简要介绍一下其它的 ShuffleWriter
Shuffle
过程的组件结构ShuffleWriter
?SortShuffleWriter
的执行过程理解闭包以及 Spark 分布式运行代码的根本原理
理解累加变量的使用场景
理解广播的使用场景
闭包就是一个封闭的作用域, 也是一个对象
Spark 算子所接受的函数, 本质上是一个闭包, 因为其需要封闭作用域, 并且序列化自身和依赖, 分发到不同的节点中运行
var count = 0
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)
sc.parallelize(Seq(1, 2, 3, 4, 5))
.foreach(count += _)
println(count)
上面这段代码是一个非常错误的使用, 请不要仿照, 这段代码只是为了证明一些事情
先明确两件事, var count = 0
是在 Driver 中定义的, foreach(count += _)
这个算子以及传递进去的闭包运行在 Executor 中
这段代码整体想做的事情是累加一个变量, 但是这段代码的写法却做不到这件事, 原因也很简单, 因为具体的算子是闭包, 被分发给不同的节点运行, 所以这个闭包中累加的并不是 Driver 中的这个变量
Accumulators(累加器) 是一个只支持 added
(添加) 的分布式变量, 可以在分布式环境下保持一致性, 并且能够做到高效的并发.
原生 Spark 支持数值型的累加器, 可以用于实现计数或者求和, 开发者也可以使用自定义累加器以实现更高级的需求
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)
val counter = sc.longAccumulator("counter")
sc.parallelize(Seq(1, 2, 3, 4, 5))
.foreach(counter.add(_))
// 运行结果: 15
println(counter.value)
注意点:
Accumulator 是支持并发并行的, 在任何地方都可以通过 add
来修改数值, 无论是 Driver 还是 Executor
只能在 Driver 中才能调用 value
来获取数值
在 WebUI 中关于 Job 部分也可以看到 Accumulator 的信息, 以及其运行的情况
累计器件还有两个小特性, 第一, 累加器能保证在 Spark 任务出现问题被重启的时候不会出现重复计算. 第二, 累加器只有在 Action 执行的时候才会被触发.
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)
val counter = sc.longAccumulator("counter")
sc.parallelize(Seq(1, 2, 3, 4, 5))
.map(counter.add(_)) // 这个地方不是 Action, 而是一个 Transformation
// 运行结果是 0
println(counter.value)
开发者可以通过自定义累加器来实现更多类型的累加器, 累加器的作用远远不只是累加, 比如可以实现一个累加器, 用于向里面添加一些运行信息
class InfoAccumulator extends AccumulatorV2[String, Set[String]] {
private val infos: mutable.Set[String] = mutable.Set()
override def isZero: Boolean = {
infos.isEmpty
}
override def copy(): AccumulatorV2[String, Set[String]] = {
val newAccumulator = new InfoAccumulator()
infos.synchronized {
newAccumulator.infos ++= infos
}
newAccumulator
}
override def reset(): Unit = {
infos.clear()
}
override def add(v: String): Unit = {
infos += v
}
override def merge(other: AccumulatorV2[String, Set[String]]): Unit = {
infos ++= other.value
}
override def value: Set[String] = {
infos.toSet
}
}
@Test
def accumulator2(): Unit = {
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)
val infoAccumulator = new InfoAccumulator()
sc.register(infoAccumulator, "infos")
sc.parallelize(Seq("1", "2", "3"))
.foreach(item => infoAccumulator.add(item))
// 运行结果: Set(3, 1, 2)
println(infoAccumulator.value)
sc.stop()
}
注意点:
可以通过继承 AccumulatorV2
来创建新的累加器
有几个方法需要重写
reset 方法用于把累加器重置为 0
add 方法用于把其它值添加到累加器中
merge 方法用于指定如何合并其他的累加器
value
需要返回一个不可变的集合, 因为不能因为外部的修改而影响自身的值
理解为什么需要广播变量, 以及其应用场景
能够通过代码使用广播变量
广播变量用于将变量缓存在集群中的机器中, 避免机器内的 Executors 多次使用网络拉取数据
广播变量的使用步骤: (1) 创建 (2) 在 Task 中获取值 (3) 销毁
标签:常用 组件 extern bool 分数 字符串表 优化 做什么 hdfs
原文地址:https://www.cnblogs.com/mediocreWorld/p/11610059.html