码迷,mamicode.com
首页 > 其他好文 > 详细

Spark-RDD

时间:2019-10-18 16:09:55      阅读:83      评论:0      收藏:0      [点我收藏+]

标签:容错性   ike   迭代   split   输入   flat   程序   本地   sub   

RDD(Resilient Distributed Datasets)弹性分布式数据集,是在集群应用中分享数据的一种高效,通用,容错的抽象,是Spark提供的最重要的抽象的概念,它是一种有容错机制的特殊集合,可以分布在集群的节点上,以函数式编操作集合的方式,进行各种并行操作。

RDD是只读的,不可变的数据集。RDD也是容错的,假如其中一个RDD坏掉,RDD中有记录之前的依赖关系,依赖关系中记录算子和分区,可以重新生成。RDD实现分布式数据集容错方法有两种:数据检查点和记录更新。同时RDD是高效的,不需要物化。它也是分区记录的集合,可以缓存的。
每个RDD都包含有一组RDD分区(partition),数据集的原子组成部分,还有对父RDD的一组依赖,这些依赖描述了RDD的Lineage;以及一个函数,说明在父RDD上执行何种计算;还包含元数据,描述分区模式和数据存放的位置。

RDD依赖

RDD之间的依赖关系分为宽依赖和窄依赖两类。对于窄依赖,子RDD的每个分区依赖于常数个父分区,它与数据规模无关。输入输出是一对一的算子,但是其中一种方式的结果RDD的分区结构不变,主要是map,flatMap。但是如union,coalesce结果RDD的分区结构会发生变化。对于宽依赖,子RDD的每个分区都依赖于所有的父RDD分区。
对于两种依赖关系,窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle。窄依赖能够更有效地进行失效节点的恢复,重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。

RDD特征

同时RDD有五个特征,其中分区,一系列的依赖关系和函数是三个基本特征,最佳位置和分区策略是可选。RDD是移动计算而不是移动数据。
RDD和spark之间,RDD是一种具有容错性基于内存的集群计算抽象方法,Spark则是这个抽象方法的实现。

RDD与Mapreduce

Hadoop Mapreduce
1.不适于大量的迭代
2.不适于交互式查询
基于数据流方式不能复用曾经的结果或中间计算结果
RDD
1.自动进行内存,磁盘切换
2.高效容错
3.Task失败,会进行特定次数的重试
4.Stage失败,会自动进行特定次数的重试
5.只会计算失败的分片
6.checkpoint persist
7.数据分配的高度弹性

RDD为何高效

RDD是不可变集合类型,且操作为lazy
RDD的写操作为粗粒度,读操作可以是粗粒度也可为细粒度
所有的RDD操作都返回迭代器,可以让框架集成

创建RDD

1.集合创建
程序中一个已有的集合传给SparkContext的parallelize()

1
2
3
4
5
Scala创建
val lines=sc.parallelize(List("padas","i like padas"))

Java创建
JavaRDD<String> lines=sc.parallelize(Arrays.asList("padas","i like padas"))

2.本地文件系统

1
2
3
4
5
Scala 
val input=sc.textFile("本地路径")

Java
JavaRDD<String> input=sc.textFile("本地路径"")

3.HDFS
1
2
3
4
5
Scala 
val input=sc.textFile("HDFS路径")

Java
JavaRDD<String> input=sc.textFile("HDFS路径"")

4.基于数据库 SQL NOSQL
5.基于S3
6.基于数据流

RDD操作

转化操作

返回新的RDD,RDD的转化操作都是惰性求值,事实上为回溯求值操作
map() flatMap() filter() distinct() sample()
union() intersection() subtract() cartesian()
filter

大专栏  Spark-RDDe">1
2
3
4
5
6
7
8
9
JavaRDD filter = parallelize.filter(new Function<Integer, Boolean>() {


public Boolean (Integer arg0) throws Exception {

return arg0>4;
}

});

foreach

1
2
3
4
5
6
7
8
9
10
filter.foreach(new VoidFunction(){


public void (Object arg0) throws Exception {

System.out.println(arg0);
}


});

flatMap()

1
2
3
4
5
6
7
8
9
10
JavaRDD<String> flatMap = mapToPair.flatMap(new FlatMapFunction<Tuple2<Integer,String>,String>(){


public Iterable<String> (Tuple2<Integer, String> arg0) throws Exception {

return Arrays.asList(arg0._2.split(" "));
}


});

Map()

1
2
3
4
5
6
7
8
9
10
JavaPairRDD<Integer, String> mapToPair = textFile.mapToPair(new PairFunction<String,Integer,String>(){


public Tuple2<Integer, String> (String arg0) throws Exception {

return new Tuple2(1,arg0);
}


});

行动操作

不产生RDD
collect() count() countByValue() take(num) top(num) reduce(func) foreach(func)

controller

persist cache checkpoint
persist适用条件
某步计算耗时,计算链条过长,checkpoint所在的RDD必须持久化(触发job),shuffle之后,shuffle之前(系统默认)

1
2
3
val cached=sc.textFile("path").flatMap(_.split(" ")).map(word=>(word,1)).reduceBykey(_+_).cache
cached.count
速度明显提高,但注意cache后别放其他算子,否则每次重新cache

汇总

1.reduce为Action操作,不产生RDD,看源码中有无runjob;
每一个shuffle产生一个新的RDD,触发stage
2.并行度问题
并行=分区=task
Spark的并行度看内存使用,看CPU,与数据规模无关
并行度=文件大小/128
3.RDD上的一系列数据分片的计算逻辑相同

Spark-RDD

标签:容错性   ike   迭代   split   输入   flat   程序   本地   sub   

原文地址:https://www.cnblogs.com/sanxiandoupi/p/11698412.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!