标签:
Spark 算子大致可以分为以下两类:
1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。
Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
1、map算子
2、flatMap算子
3、mapPartitions算子
4、union算子
5、cartesian算子
6、grouBy算子
7、filter算子
8、sample算子
9、cache算子
10、persist算子
11、mapValues算子
12、combineByKey算子
13、reduceByKey算子
14、join算子
Spark算子的作用,详细见http://www.cnblogs.com/zlslch/p/5723979.html
2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。
Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。
1、foreach算子
2、saveAsTextFile算子
3、collect算子
4、count算子
Spark算子的作用,详细见http://www.cnblogs.com/zlslch/p/5723979.html
1. Transformations 算子
(1) map
将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素。源码中 map 算子相当于初始化一个 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。
图 2 中每个方框表示一个 RDD 分区,左侧的分区经过用户自定义函数 f:T->U 映射为右侧的新 RDD 分区。但是,实际只有等到 Action算子触发后这个 f 函数才会和其他函数在一个stage 中对数据进行运算。在图 1 中的第一个分区,数据记录 V1 输入 f,通过 f 转换输出为转换后的分区中的数据记录 V‘1。
图1 map 算子对 RDD 转换 图2 flapMap 算子对 RDD 转换
(2) flatMap
将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将生成的 RDD 的每个集合中的元素合并为一个集合,内部创建 FlatMappedRDD(this,sc.clean(f))。
图 2 表 示 RDD 的 一 个 分 区 进 行 flatMap函 数 操 作, flatMap 中 传 入 的 函 数 为 f:T->U, T和 U 可以是任意的数据类型。将分区中的数据通过用户自定义函数 f 转换为新的数据。外部大方框可以认为是一个 RDD 分区,小方框代表一个集合。 V1、 V2、 V3 在一个集合作为 RDD 的一个数据项,可能存储为数组或其他容器,转换为V‘1、 V‘2、 V‘3 后,将原来的数组或容器结合拆散,拆散的数据形成为 RDD 中的数据项。
(3) mapPartitions
mapPartitions 函 数 获 取 到 每 个 分 区 的 迭 代器,在 函 数 中 通 过 这 个 分 区 整 体 的 迭 代 器 对整 个 分 区 的 元 素 进 行 操 作。 内 部 实 现 是 生 成
MapPartitionsRDD。图 1-8 中的方框代表一个 RDD 分区。图 3 中,用户通过函数 f (iter)=>iter.f ilter(_>=3) 对分区中所有数据进行过滤,大于和等于 3 的数据保留。一个方块代表一个 RDD 分区,含有 1、 2、 3 的分区过滤只剩下元素 3。
图3 mapPartitions 算子对 RDD 转换
(4) union
使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同。并不进行去重操作,保存所有元素,如果想去重
可以使用 distinct()。同时 Spark 还提供更为简洁的使用 union 的 API,通过 ++ 符号相当于 union 函数操作。
图 4 中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。合并后, V1、 V2、 V3……V8
形成一个分区,其他元素同理进行合并。
图 4 union 算子对 RDD 转换
(5) cartesian
对 两 个 RDD 内 的 所 有 元 素 进 行 笛 卡 尔 积 操 作。 操 作 后, 内 部 实 现 返 回CartesianRDD。图5 中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。
例 如: V1 和 另 一 个 RDD 中 的 W1、 W2、 Q5 进 行 笛 卡 尔 积 运 算 形 成 (V1,W1)、(V1,W2)、 (V1,Q5)。
图 5 cartesian 算子对 RDD 转换
(6) groupBy
groupBy :将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。
函数实现如下:
1)将用户函数预处理:
val cleanF = sc.clean(f)
2)对数据 map 进行函数操作,最后再进行 groupByKey 分组操作。
this.map(t => (cleanF(t), t)).groupByKey(p)
其中, p 确定了分区个数和分区函数,也就决定了并行化的程度。
图6 中方框代表一个 RDD 分区,相同key 的元素合并到一个组。例如 V1 和 V2 合并为 V, Value 为 V1,V2。形成 V,Seq(V1,V2)。
图 6 groupBy 算子对 RDD 转换
(7) filter
filter 函数功能是对元素进行过滤,对每个 元 素 应 用 f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将被过滤
掉。 内 部 实 现 相 当 于 生 成 FilteredRDD(this,sc.clean(f))。
下面代码为函数的本质实现:
deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
图 7 中每个方框代表一个 RDD 分区, T 可以是任意的类型。通过用户自定义的过滤函数 f,对每个数据项操作,将满足条件、返回结果为 true 的数据项保留。例如,
过滤掉 V2 和 V3 保留了 V1,为区分命名为 V‘1。
(8) sample
sample 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。内部实现是生成 SampledRDD(withReplacement, fraction, seed)。
函数参数设置:
? withReplacement=true,表示有放回的抽样。
? withReplacement=false,表示无放回的抽样。
图 8 中 的 每 个 方 框 是 一 个 RDD 分 区。 通 过 sample 函 数, 采 样 50% 的 数 据。V1、 V2、 U1、 U2……U4 采样出数据 V1 和 U1、 U2 形成新的 RDD。
图 7 filter 算子对 RDD 转换 图 8 sample 算子对 RDD 转换
(9) cache
cache 将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY) 函数的功能。
图9 中每个方框代表一个 RDD 分区,左侧相当于数据分区都存储在磁盘,通过 cache 算子将数据缓存在内存。
图 9 Cache 算子对 RDD 转换
(10) persist
persist 函数对 RDD 进行缓存操作。数据缓存在哪 里 依 据 StorageLevel 这 个 枚 举 类 型 进 行 确 定。 有以下几种类型的组合(见图 1-14), DISK 代表磁盘,
MEMORY 代表内存, SER 代表数据是否进行序列化存储。
下面为函数定义, StorageLevel 是枚举类型,代表存储模式,用户可以通过图 9按需进行选择。
persist(newLevel:StorageLevel)
图 10 中 列 出 persist 函 数 可 以 进 行 缓 存 的 模 式。 例 如, MEMORY_AND_DISK_SER 代表数据可以存储在内存和磁盘,并且以序列化的方式存储,其他
同理。
图 10 persist 算子对 RDD 转换
图 11中方框代表 RDD 分区。 disk 代表存储在磁盘, mem 代表存储在内存。数据最初全部存储在磁盘,通过 persist(MEMORY_AND_DISK) 将数据缓存到内存,但是
有的分区无法容纳在内存,将含有 V1、 V2、 V3 的分区存储到磁盘。
(11) mapValues
mapValues :针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。
图 12 中的方框代表 RDD 分区。 a=>a+2 代表对 (V1,1) 这样的 Key Value 数据对,数据只对 Value 中的 1 进行加 2 操作,返回结果为 3。
图 11 Persist 算子对 RDD 转换 图 12 mapValues 算子 RDD 对转换
(12) combineByKey
下面代码为 combineByKey 函数的定义:
combineByKey[C](createCombiner:(V) C,
mergeValue:(C, V) C,
mergeCombiners:(C, C) C,
partitioner:Partitioner,
mapSideCombine:Boolean=true,
serializer:Serializer=null):RDD[(K,C)]
说明:
? createCombiner: V => C, C 不存在的情况下,比如通过 V 创建 seq C。
? mergeValue: (C, V) => C,当 C 已经存在的情况下,需要 merge,比如把 item V
加到 seq C 中,或者叠加。
mergeCombiners: (C, C) => C,合并两个 C。
? partitioner: Partitioner, Shuff le 时需要的 Partitioner。
? mapSideCombine : Boolean = true,为了减小传输量,很多 combine 可以在 map
端先做,比如叠加,可以先在一个 partition 中把所有相同的 key 的 value 叠加,
再 shuff le。
? serializerClass: String = null,传输需要序列化,用户可以自定义序列化类:
例如,相当于将元素为 (Int, Int) 的 RDD 转变为了 (Int, Seq[Int]) 类型元素的 RDD。图 13中的方框代表 RDD 分区。如图,通过 combineByKey, 将 (V1,2), (V1,1)数据合并为( V1,Seq(2,1))。
图 13 comBineByKey 算子对 RDD 转换
(13) reduceByKey
reduceByKey 是比 combineByKey 更简单的一种情况,只是两个值合并成一个值,( Int, Int V)(Int, Int C),比如叠加。所以 createCombiner red
ceBykey 很简单,就是直接返回 v,而 mergeValue和 mergeCombiners 逻辑是相同的,没有区别。
函数实现:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
= {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
图14中的方框代表 RDD 分区。通过用户自定义函数 (A,B) => (A + B) 函数,将相同 key 的数据 (V1,2) 和 (V1,1) 的 value 相加运算,结果为( V1,3)。
图 14 reduceByKey 算子对 RDD 转换
(14) join
join 对两个需要连接的 RDD 进行 cogroup函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个
key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。
下 面 代 码 为 join 的 函 数 实 现, 本 质 是通 过 cogroup 算 子 先 进 行 协 同 划 分, 再 通 过flatMapValues 将合并的数据打散。
this.cogroup(other,partitioner).f latMapValues{case(vs,ws)=>
for(v<-vs;w<-ws)yield(v,w) }
图 15 是对两个 RDD 的 join 操作示意图。大方框代表 RDD,小方框代表 RDD 中的分区。函数对相同 key 的元素,如 V1 为 key 做连接后结果为 (V1,(1,1)) 和 (V1,(1,2))。
图 15 join 算子对 RDD 转换
2. Actions 算子
本质上在 Action 算子中通过 SparkContext 进行了提交作业的 runJob 操作,触发了RDD DAG 的执行。
例如, Action 算子 collect 函数的代码如下,感兴趣的读者可以顺着这个入口进行
码剖析:
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = {
/* 提交 Job*/
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
(1) foreach
foreach 对 RDD 中的每个元素都应用 f 函数操作,不返回 RDD 和 Array, 而是返回Uint。图 16 表示 foreach 算子通过用户自定义函数对
每个数据项进行操作。本例中自定义函数为 println(),控制台打印所有数据项。
图 16 foreach 算子对 RDD 转换
(2) saveAsTextFile
函数将数据输出,存储到 HDFS 的指定目录。下面为 saveAsTextFile 函数的内部实现,其内部
通过调用 saveAsHadoopFile 进行实现:
this.map(x => (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
将 RDD 中的每个元素映射转变为 (null, x.toString),然后再将其写入 HDFS。
图 17 中左侧方框代表 RDD 分区,右侧方框代表 HDFS 的 Block。通过函数将RDD 的每个分区存储为 HDFS 中的一个 Block。
图 17 saveAsHadoopFile 算子对 RDD 转换
(3) collect
collect 相当于 toArray, toArray 已经过时不推荐使用, collect 将分布式的 RDD 返回为一个单机的 scala Array 数组。在这个数组上运用 scala 的函数式操作。
图 18中左侧方框代表 RDD 分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到 Driver 程序所在的节点,以数组形式存储。
图 18 Collect 算子对 RDD 转换
(4) count
count 返回整个 RDD 的元素个数。
内部函数实现为:
defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum
图 19中,返回数据的个数为 5。一个方块代表一个 RDD 分区。
图19 count 对 RDD 算子转换
标签:
原文地址:http://www.cnblogs.com/zlslch/p/5723857.html