标签:task 保存 操作 模块 部分 contex row 复制 使用
我的个人博客:https://www.luozhiyun.com/
第一,MapReduce模型的抽象层次低,大量的底层逻辑都需要开发者手工完成。
第二,只提供Map和Reduce两个操作。
举个例子,两个数据集的Join是很基本而且常用的功能,但是在MapReduce的世界中,需要对这两个数据集 做一次Map和Reduce才能得到结果。
第三,在Hadoop中,每一个Job的计算结果都会存储在HDFS文件存储系统中,所以每一步计算都要进行硬 盘的读取和写入,大大增加了系统的延迟。
第四,只支持批数据处理,欠缺对流数据处理的支持。
Spark最基本的数据抽象叫作弹性分布式数据集(Resilient Distributed Dataset, RDD),它代表一个可以被 分区(partition)的只读数据集,它内部可以有很多分区,每个分区又有大量的数据记录(record)。
RDD是Spark最基本的数据结构。Spark提供了很多对RDD的操作,如Map、Filter、flatMap、groupByKey和Union等等,极大地提升了对各 种复杂场景的支持。开发者既不用再绞尽脑汁挖掘MapReduce模型的潜力,也不用维护复杂的MapReduce 状态机。
相对于Hadoop的MapReduce会将中间数据存放到硬盘中,Spark会把中间数据缓存在内存中,从而减少了 很多由于硬盘读写而导致的延迟。
在任务(task)级别上,Spark的并行机制是多线程模型,而MapReduce是多进程模型。
多进程模型便于细粒度控制每个任务占用的资源,但会消耗较多的启动时间。
而Spark同一节点上的任务以多线程的方式运行在一个JVM进程中,可以带来更快的启动速度、更高的CPU 利用率,以及更好的内存共享。
RDD表示已被分区、不可变的,并能够被并行操作的数据集合。
分区代表同一个RDD包含的数据被存储在系统的不同节点中。逻辑上,我们可以认为RDD是一个大的数组。数组中的每个元素代表一个分区(Partition)。
在物理存储中,每个分区指向一个存放在内存或者硬盘中的数据块(Block),而这些数据块是独立的,它 们可以被存放在系统中的不同节点。
RDD中的每个分区存有它在该RDD中的index。通过RDD的ID和分区的index可以唯一确定对应数据块的编 号,从而通过底层存储层的接口中提取到数据进行处理。
不可变性代表每一个RDD都是只读的,它所包含的分区信息不可以被改变。
我们只可以对现有的RDD进行转换转换(Transformation)操作,得到新的RDD作为中间计算的结果。
如:
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
我们首先读入文本文件data.txt,创建了第一个RDD lines,它的每一个元素是一行文 本。然后调用map函数去映射产生第二个RDD lineLengths,每个元素代表每一行简单文本的字数。最后调 用reduce函数去得到第三个RDD totalLength,它只有一个元素,代表整个文本的总字数。
对于代表中间结果的RDD,我们需要记录它是通过哪个RDD进行哪些转 换操作得来,即依赖关系依赖关系,而不用立刻去具体存储计算出的数据本身。
在一个有N步的计算模型中,如果记载第N步输出RDD的节点发生故障,数据丢失,我们可以从第N-1 步的RDD出发,再次计算,而无需重复整个N步计算过程。
Spark不需要将每个中间计算结果进行数据复制以防数据丢失,因为每一步产生的RDD里都会存储它的依赖关系。
所以并行操作的前提是不同的RDD之间有着怎样的依赖关系。
例如在一个有N步的计算模型中,第N-1 步的RDD就是第N步RDD的父RDD,相反则是子RDD。
Spark支持两种依赖关系:窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)。
窄依赖就是父RDD的分区可以一一对应到子RDD的分区,宽依赖就是父RDD的每个分区可以被多个子RDD的 分区使用。
显然,窄依赖允许子RDD的每个分区可以被并行处理产生,而宽依赖则必须等父RDD的所有分区都被计算好 之后才能开始处理。
所以需要考虑以下两点:
在计算过程中,对于一些计算过程比较耗时的RDD,我们可以将它缓存至硬盘或HDFS中,标记这个RDD有 被检查点处理过,并且清空它的所有依赖关系。同时,给它新建一个依赖于CheckpointRDD的依赖关系,CheckpointRDD可以用来从硬盘中读取RDD和生成新的分区信息。
当某个子RDD需要错误恢复时,回溯至该RDD,发现它被检查点记录过,就可以直接去硬盘中读取这 个RDD,而无需再向前回溯计算。
用来记录RDD持久化时的存储级别,常用的有以下几个:
RDD的数据操作分为两种:转换(Transformation)和动作(Action)。
转换是用来把一个RDD转换成另一个RDD
Map
它把一个RDD中的所有数据通过一个函数,映射成一个新的RDD,任何原 RDD中的元素在新RDD中都有且只有一个元素与之对应。
rdd = sc.parallelize(["b", "a", "c"]) rdd2 = rdd.map(lambda x: (x, 1)) // [('b', 1), ('a', 1), ('c', 1)]
Filter
filter这个操作,是选择原RDD里所有数据中满足某个特定条件的数据,去返回一个新的RDD。
动作则是通过计算返回一个结果
Reduce
它会把RDD中的元素根据一个输入函数聚合起来。
from operator import add sc.parallelize([1, 2, 3, 4, 5]).reduce(add)// 15
Count
Count会返回RDD中元素的个数。
sc.parallelize([2, 3, 4]).count() // 3
Spark在每次转换操作的时候,使用了新产生的 RDD 来记录计算逻辑,这样就把作用在 RDD 上的所有计算 逻辑串起来,形成了一个链条。当对 RDD 进行动作时,Spark 会从计算链的最后一个RDD开始,依次从上 一个RDD获取数据并执行计算逻辑,最后输出结果。
每当我们对RDD调用一个新的action操作时,整个RDD都会从头开始运算。因此,我们应该对多次使用的RDD进行一个持久化操作。
Spark的persist()和cache()方法支持将RDD的数据缓存至内存或硬盘中。
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd1 = rdd.map(lambda x: x+5)
rdd2 = rdd1.filter(lambda x: x % 2 == 0)
rdd2.persist()
count = rdd2.count() // 3
first = rdd2.first() // 6
rdd2.unpersist()
在缓存RDD的时候,它所有的依赖关系也会被一并存下来。所以持久化的RDD有自动的容错机制。如果RDD 的任一分区丢失了,通过使用原先创建它的转换操作,它将会被自动重算。
持久化可以选择不同的存储级别。正如我们讲RDD的结构时提到的一样,有MEMORY_ONLY,MEMORY_AND_DISK,DISK_ONLY等。cache()方法会默认取MEMORY_ONLY这一级别。
如上图所示,Spark SQL提供类似于SQL的操作接口,允许数据仓库应用程序直接获取数据,允许使用者通过命令行 操作来交互地查询数据,还提供两个API:DataFrame API和DataSet API。
DataSet也是不可变分布式的数据单元,它既有与RDD类似的各种转换和动作函 数定义,而且还享受Spark SQL优化过的执行引擎,使得数据搜索效率更高。
DataSet支持的转换和动作也和RDD类似,比如map、filter、select、count、show及把数据写入文件系统 中。
DataSet上的转换操作也不会被立刻执行,只是先生成新的DataSet,只有当遇到动作操作,才会把 之前的转换操作一并执行,生成结果。
当动作操作执行时,Spark SQL的查询优化器会优化这个逻辑计划,并生成一个可以分布式执行的、包含分 区信息的物理计划。
DataSet所描述的数据都被组织到有名字的列中。
如上图所示,左侧的RDD虽然以People为类型参数,但Spark框架本身不了解People类的内部结构。所有的 操作都以People为单位执行。
而右侧的DataSet却提供了详细的结构信息与每列的数据类型
其次,由于DataSet存储了每列的数据类型。所以,在程序编译时可以执行类型检测。
DataFrame可以被看作是一种特殊的DataSet。它也是关系型数据库中表一样的结构化存储机制,也是分布 式不可变的数据结构。
DataFrame每一行的类型固定为 Row,他可以被当作DataSet[Row]来处理,我们必须要通过解析才能获取各列的值。
在性能方面,DataFrame和DataSet的性能要比RDD更好。
Spark程序运行时,Spark SQL中的查询优化器会对语句进行分析,并生成优化过的RDD在底层执行。
对于错误检测而言,RDD和DataSet都是类型安全的,而DataFrame并不是类型安全的。这是因为它不存储每一列的信息如名字 和类型。
无论是DataFrame API还是DataSet API,都是基于批处理模式对静态数据进行处理的。比如,在每天 某个特定的时间对一天的日志进行处理分析。
而Spark Streaming就是针对流处理的组件。
Spark Streaming会像微积分一样用时间片拆分了无限的数据流,然后对每一个数据片用类似于批处理的方法进行处理,输 出的数据也是一块一块的。
Spark Streaming提供一个对于流数据的抽象DStream。DStream可以由来自Apache Kafka、Flume或者 HDFS的流数据生成,也可以由别的DStream经过各种转换操作得来。
底层DStream也是由很多个序列化的RDD构成,按时间片(比如一秒)切分成的每个数据单位都是一 个RDD。然后,Spark核心引擎将对DStream的Transformation操作变为针对Spark中对 RDD的 Transformation操作,将RDD经过操作变成中间结果保存在内存中。
下图就是DStream的内部形式,即一个连续的RDD序列,每一个RDD代表一个时间窗口的输入数据流。
对DStream的转换操作,意味着对它包含的每一个RDD进行同样的转换操作。比如下边的例子。
sc = SparkContext(master, appName) ssc = StreamingContext(sc, 1) lines = sc.socketTextStream("localhost", 9999) words = lines.flatMap(lambda line: line.split(" "))
上面的操作本质上,对一个DStream进行flatMap操作,就是对它里边的每一个RDD进行flatMap操作,生成了一系列新 的RDD,构成了一个新的代表词语的DStream。
任何Spark Streaming的程序都要首先创建一个StreamingContext的对象,它是所有Streaming操作的入口。
StreamingContext中最重要的参数是批处理的时间间隔,即把流数据细分成数据块的粒度。
这个时间间隔决定了流处理的延迟性,所以,需要我们根据需求和资源来权衡间隔的长度。
滑动窗口操作有两个基本参数:
比如,对热点搜索词语进行统计,每隔10秒钟输出过去60秒内排名前十位的热点词。
统计窗口长度就是60s,滑动间隔就是10s。
优点
2016年,Spark在其2.0版本中推出了结构化流数据处理的模块Structured Streaming。
Structured Streaming是基于Spark SQL引擎实现的,依靠Structured Streaming,在开发者眼里,流数据和 静态数据没有区别。我们完全可以像批处理静态数据那样去处理流数据。
Spark Streaming就是把流数据按一定的时间间隔分割成许多个小的数据块进行批处理。
而在Structured Streaming的模型中,我们要把数据看成一个无边界的关系型的数据表。每一个数据都是表中的一行,不断会有新的数据行被添加到表里来。
Structured Streaming的三种输出模式:
需要注意的是,Structured Streaming并不会完全存储输入数据。每个时间间隔它都会读取最新的输入,进 行处理,更新输出表,然后把这次的输入删除。Structured Streaming只会存储更新输出表所需要的信息。
简易度和性能
Spark Streaming提供的DStream API与RDD API很类似,相对比较低level。
而Structured Streaming提供的DataFrame API就是这么一个相对高level的API,大部分开发者都很熟悉关系型 数据库和SQL。这样的数据抽象可以让他们用一套统一的方案去处理批处理和流处理,不用去关心具体的执 行细节。
而且,DataFrame API是在Spark SQL的引擎上执行的,Spark SQL有非常多的优化功能。
实时性
Structured Streaming它更像是实时处理,能做到用更小的时间间 隔,最小延迟在100毫秒左右。
而且在Spark 2.3版本中,Structured Streaming引入了连续处理的模式,可以做到真正的毫秒级延迟。
标签:task 保存 操作 模块 部分 contex row 复制 使用
原文地址:https://www.cnblogs.com/luozhiyun/p/12388899.html