标签:之间 classes 总数 顺序 调优 不同 require 需要 运行时
1、使用Sparkconf配置Spark
对Spark进行性能调优,通常就是修改Spark应用的运行时配置选项。
Spark中最主要的配置机制通过SparkConf类对Spark进行配置,当创建出一个SparkContext时,就需要创建出一个SparkConf实例。
Sparkconf实例包含用户要重载的配置选项的键值对。调用set()方法来添加配置项的设置,然后把这个对象传给SparkContext的构造方法。
调用setAppName()和setMaster()来分别设置spark.app.name和spark.master的值。
例如:
//创建一个conf对象 val conf = new SparkConf() conf.set("spark.app.name","My Spark App") conf.set("spark.master","local[4]") conf.set("spark.ui.port","36000") //使用这个配置对象创建一个SparkContext val sc = new SparkContext(conf)
Spark运行通过spark-submit工具动态设置配置项。当应用被spark-submit脚本启动时,脚本会把这些配置项设置到运行环境中。
例如:
$ bin/spark-submit --class com.example.MyAPP \ --master local[4] --name "My Spark App" --conf spark.ui.port=36000 myApp.jar
Spark有特定的优先级顺序来选择实际配置,优先级最高的是在用户代码中显示调用set()方法设置的选项,其次是通过spark-submit传递的参数,再次是写在配置文件中的值,最后是系统的默认值。
2、Spark执行的组成部分:作业、任务和步骤
通过Spark示例展示Spark执行的各个阶段,以了解用户代码如何被编译为下层的执行计划。
val input = sc.textFile("input.txt") val tokenized = input.map(line => line.split(" ")).filter(words => words.size > 0) val counts = tokenized.map(words => (words(0),1)).reduceByKey{(a,b) => a+b}
以上示例执行了三次转化操作,最终生成一个叫做counts的RDD。程序定义了一个RDD对象的有向无环图,每个RDD维护了其指向一个或多个父节点的引用,以及表示其与父节点之间关系的信息。
这里counts的谱系图如下:
在调用行动操作之前,RDD都只是存储着可以让我们计算出具体数据的描述信息。要触发实际计算,需要对counts调用一个行动操作,比如使用collect()将数据收集到驱动器程序。
counts.collect()
Spark调度器会创建出用于计算行动操作的RDD物理执行计划。Spark调度器从最终需要被调用行动操作的RDD出发,向上回溯所有必须计算的RDD。调度器会访问RDD的父节点,父节点的父节点,以此类推,递归向上生成计算所有必要的祖先RDD的物理计划。如下:
流水线执行:当RDD不需要混洗数据就可以从父节点计算出来时,调度器就会自动进行流水线执行。在物理执行时,执行计划输出的缩进等级与父节点相同的RDD会与父节点在同一个步骤中进行流水线执行。
除了流水线执行的优化,当一个RDD已经缓存在集群内存或磁盘上时,Spark的内部调度器也会自动截短RDD谱系图。这种情况下,Spark会短路求值,直接基于缓存下来的RDD进行计算。
特定的行动操作所生成的步骤的集合被称为一个作业。
一个物理步骤会启动很多任务,每个任务都是在不同的数据分区上做同样的事情。任务内部的流程是一样的,包括:(1)从数据存储或已有RDD或数据混洗的输出中获取输入数据。(2)执行必要的操作来计算出这些操作所代表的RDD。(3)把输出写到一个数据混洗文件中,写入外部存储或者是发回驱动器程序。
3、Spark优化的关键性能
RDD的逻辑表示其实是一个对象集合。在物理执行期间,RDD会被分为一系列的分区,每个分区都是整个数据的子集。当Spark调度并运行任务时,Spark会为每个分区中的数据创建出一个任务。输入RDD一般会根据其底层的存储系统选择并行度。
并行度会从两方面影响程序的性能:当并行度过低时,Spark集群会出现资源闲置的情况,而当并行度过高时,每个分区产生的间接开销累计起来就会更大。
Spark有两种方法来对操作的并行度进行调优:一种是在数据混洗操作时,使用参数的方式为混洗后的RDD指定并行度。第二种方法是对于任何已有的RDD,可以进行重新分区来获取更多或者更少的分区数。可以使用repartition()实现重新分区操作,该操作会把RDD随机打乱并分成设定的分区数目。使用coalesce()操作没有打乱数据,比repartition()更为高效。
当Spark需要通过网络传输数据,或者将数据溢写到磁盘上时,Spark需要把数据序列化为二进制格式。序列化会在数据进行混洗操作时发生,此时有可能需要通过网络传输大量数据。
Spark默认会使用Java内建的序列化库。Spark也支持第三方序列化库Kryo,可以提供比Java的序列化工具更短的序列化时间和更高压缩比的二进制表示。
使用Kryo序列化工具示例如下:
val conf = new SparkConf().setMaster("local").setAppName("partitions") conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") //严格要求注册类 获得最佳性能 conf.set("spark.kryo.registrationRequired","true") conf.registerKryoClasses(Array(classOf[MyClass],classOf[MyotherClass]))
Spark内存有以下用途:
可以通过调整调整内存各区域比例得到更好的性能表现。
其它优化:
Spark默认的cache()操作会以MEMORY_ONLY的存储等级持久化数据,当缓存新的RDD时分区空间不够,旧的分区会被删除。当用到这些分取数据时,在进行重算。使用persist()方法以MEMORY_AND_DISK存储等级存储,内存中放不下的分区会被写入磁盘,需要时再从磁盘读取回来。这种方式会有更好的性能。
还有一种是缓存序列化后的对象而非直接缓存。通过MEMORY_ONLY_SER 或者 MEMORY_AND_DISK_SER的存储等级实现。
提供给Spark的硬件资源会显著影响应用的完成时间,影响集群规模的主要参数包括:分配给没各执行器节点的内存大小,每个执行器节点占用的核心数,执行器节点总数,以及用来存储临时数据的本地磁盘数量。
标签:之间 classes 总数 顺序 调优 不同 require 需要 运行时
原文地址:http://www.cnblogs.com/caiyisen/p/7541836.html