Spark 优化
· 数据序列化
· 内存优化
确定内存使用
调整数据结构
序列化的RDD存储
垃圾回收调整
· 其它注意事项
并行粒度
Reduce任务内存使用
广播大的变量
数据本地性
· 总结
基于内存的计算(当然也可以用磁盘)是Spark的一个重要特性,在一个Spark程序中,CPU、带宽和内存都可能成为瓶颈。更多情况下,在内存合适的情况下,带宽往往成为程序的瓶颈,但是有时候你也需要做一些调整,例如,调整RDD的序列化方式,来减少内存的使用。本篇文章分成两个主题:数据序列化和内存调整。数据序列化对于网络优化是至关重要的,对于减少内存的使用也很重要。详情如下。
数据序列化
在任何的分布式应用中,序列化都扮演着重要的角色。序列化的速度缓慢,在很大程度上会拖累计算的速度。一般情况下,如果你想优化你的Spark程序,先优化你的序列化方式吧。
Spark在便利性和性能之间做了一个平衡,提供了两种序列化的方式。
· Java序列化: Java序列化是Spark默认的序列化方式,只要你实现了Java.io.Serializable接口,就可以使用该序列化方式,你也可以通过java.io.Externalizable实现自己的序列化方式,以达到提供性能的目的。Java序列化很灵活,但是速度比较慢,并且会导致大量的序列化formats.
· Kryo序列化:序列化是Google提供的一种序列化方式,速度更快,使用内存更少,速度是Java序列化的10倍。但是并不支持Java序列化的所有形式(无所谓),并且使用之前需要对需要序列化的类进行注册。建议使用这种方式。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer").
设置完后,Shuffle操作是work节点间的数据交换会使用KryoSerializer,RDD持久化到硬盘也会使用KryoSerializer。Spark为什么没有使用KryoSerializer作为默认的序列化方式,仅仅因为它需要注册(有点烦),但是我们推荐在任务的网络敏感性应用中都使用它。
一个KryoSerializer使用例子(Spark1.1.0版本中还没有支持这种写法,1.2.0中有)
val conf=newSparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Seq(classOf[MyClass1], classOf[MyClass2]))
val sc=newSparkContext(conf)
如果序列化对象较大,调大spark.kryoserializer.buffer.mb配置.默认 2。如果你不注册需要序列化的class,Kryo照样工作,但是会报错类全名,会浪费一些内存。
内存调整
内存调整需要考虑的三个方面:你的对象使用了多少内存,对象访问的代价,垃圾回收的代价。
默认情况下,Java对象访问很快,但是很容易就消耗掉“原生”数据2到5倍的内存。原因如下:
· 每个不同的Java对象都有一个“对象头”,大概有16个bytes大小,包含了class的一些原信息,例如一个指向class的指针。如果一个对象只有一点点的数据,那么它的元数据就比数据本身还有大好几倍。
· Java字符串会比“原生”字符多占用40个byte(以数组的形式存储每个char并保存其它数据,例如长度)。例如1个10字符的字符串占用内存是60byte = 2 * 10 + 40
· 普通的集合类,例如HashMap和 LinkedList,是使用链表结构实现的,这样的结构针对每个entry有一个“包装”对象。这个对象不仅有一个“对象”头,还有一个指向下一个对象的指针(通常会占用8个byte)
· 原生数据类型的集合通常以“盒子”对象存储,例如java.lang.Integer.
这一节会讨论如何知道内存消耗了多少,如何减少内存的使用:通过改变你的数据结构或者以序列化的形式存储你的数据。我们也会讲到改变Spark缓存大小和调整Java垃圾回收这两种方式。
确定内存使用
把你的数据放入RDD,然后把RDD放入缓存,看SparkContext打印到drivier上的日志。把每个partition消耗的内存加起来就Ok了。
INFOBlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size:717.5 KB, free: 332.3 MB)
partition 1 of RDD 0 消耗内存717.5 KB.
调整数据结构
想减少内存的消耗,第一步是减少使用带有额外内存消耗的Java特性,例如,基于指针的数据结构和包装对象。方法如下:
1. 使用数组、原生数据类型而不是标准的Java或者Scala集合类来设计你的数据结构。“fastutil”库提供了原生数据类型的集合类,这些集合类和Java标准库兼容。
2. 尽量减少带有很多小对象和指针的嵌套数据结构。
3. 使用数值ID代替字符串ID
4. 如果你的内存少于32GB ,JVM配置请使用-XX:+UseCompressedOops。该配置会使用4个bytes来做指针而不是8个bytes.你可以把这个配置加到spark-env.sh配置文件中。
序列化RDD存储
尽管做了上述的优化,你的对象依然太大,这时最简单的方式是使用序列化的方式存储你的数据,例如使用RDD persistence API中的MEMORY_ONLY_SER.这时Spark会把每个RDD partition存储成一个大的byte数组。序列化存储唯一的缺点是导致对象的访问时间加长,因为在访问的时候还要做一个反序列化。如果你要使用序列化的形式存储数据,我们建议使用Kryo,这会比java的序列化节省内存(当然不是原生的Java对象)
垃圾回收调整
就存储在你的程序中的RDD而言,当你的RDD有较大的“搅动”的时候,垃圾回收也可能会是一个问题。(这在一个RDD一次读,然后在它上面对此操作的情况下并不是问题)。当Java需要驱逐旧的对象为新的对象腾出空间的时候,他会追中你所有的Java对象并找到没有用的对象。需要记住的一点是,Java垃圾回收的成本与Java对象的数量成正比。因此,使用具有较少对象的数据结构(int数组而不是LinkedList)会大大降低垃圾回收的成本、一个更好的方式是使用序列化的方式持久化对象,正如上面描述的:使用序列化RDD存储会让每个Partition一个byte数组。如果GC是个问题,你首先应该使用序列化缓存。
由于你的工作节点上的RDD缓存大小和计算任务可用内存大小的干扰,垃圾回收也可能是一个问题。我们下面会讨论通过调整RDD cache的大小来解决这种问题。
测量GC的影响
GC调整的第一步是收集统计信息(垃圾回收的频率,垃圾回收消耗的时间)。做法:添加“-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps”到Java options(See the configurationguide for info on passing Java optionsto Spark jobs.)。然后运行spark job,然后在你的work节点的日志上就可以看到这些信息了。
缓存大小调整
spark.executor.memory :Spark可以用来缓存RDD的内存比例,剩下的内存可以被程序的其它任何对象使用。
conf.set("spark.storage.memoryFraction","0.5"):调整RDD可用缓存大小,解决JVM垃圾回收过于频繁的问题或者是内存溢出的问题。
高级GC调整
JVM内存管理基础知识:
· Java堆空间分为两个部分:Young和Old。Young存储生命周期较短的对象,Old存储生命周期较长的对象。
· Young分成三个部分:Eden、Survivor1、Survivor2
· 垃圾回收过程:Eden空间占满,在Eden上开启垃圾回收主线程,Eden和Survivor1中活着的对象会被拷贝到Survivor2。Survivor区是用来做交换的。如果一个对象太老了或者Survivor2满了,对象被移送到Old.最后当Old快满的时候,开启一个full GC
Spark GC调优的目的是Old用来存储long-livedRDD,Young用来存储short-lived对象。以此避免full GC.下面这些步骤可能是有用的:
· 一个任务完成之前,full GC被多次调用,这就意味着运行成的内存不够。
· 根据GC日志,如果OldGen快满了,可以减少RDD缓存可用的内存比例。少缓存一些数据让task运行的跟快些是一个明智的选择。
· 如果有太多的minor collections,但是full GC很少,增加Eden的内存,例如-Xmn=4/3*E,原来是E
· 举例,Spark程序从HDFS加载数据,HDFS数据块64M,则希望同时运行4个Task.则Eden size大约为=4 * 3 * 64M。这里我们假设HDFS的块是压缩的,解压缩的块是普通块的2 ~ 3倍。
· 新的设置生效后,从新观察GC的频率域时间。
GC优化效果取决于你的程序和可用的内存数量。更多优化见many more tuningoptions
其它需要注意的事项
并行粒度
tasks num = CPU核心数的2 ~ 3倍
任务内存使用
会为每个task建一个hash表进行group操作,这个表有可能会非常大,把内存挤爆了。可以提高并行度,把单个task的输入降下来。Spark task基于JVM的多线程,启动的代价可以短刀200ms,你可以放心的多开几个task.
广播大的变量
Spark 共享变量可以极大地降低序列化任务的大小和job启动的代价,如果你的任务使用了driver程序中的比较大的对象,可以考虑把对象转化成共享变量。Spark会打印每个task的序列化大小,你可以根据它判断你的task是否太大。一般情况下,如果你的task大于 20K,你就可以考虑使用共享变量进行优化了。
数据本地化
数据本地化是影响Spark job性能的一个重要的指标。如果数据和代码在一起,计算的速度应该会加快。如果数据和代码不在一块呢?我们必须移动其中的一个,使二者在一起。通常情况下,移动代码比移动数据更加高效(代码的体积通常比较小)。Spark在进行任务调度的时候就围绕这样一个原则:数据本地化原则(数据不动,代码动的原则)。
什么是数据本地化呢?,从近到远,我们分了几个等级。详情如下:
· PROCESS_LOCAL:数据和代码在同一个JVM里。这是最理想的数据本地性。
· NODE_LOCAL数据和代码在相同的节点上,例如在同一HDFS的存储节点上,或者在同一节点的两个不同的executor里。这种级别的数据本地性会比PROCESS_LOCAL慢一点,因为数据需要在两个进程间传递。
· NO_PREF数据可以被从任何位置进行访问,我们都看成是一样的。
· RACK_LOCAL数据在同一机架的不同机器上,因此数据传输需要通过网络,典型情况下需要通过一个交换机。
· ANY data不在同一机架的互联网上的任意位置。
Spark优先选择使用最佳的本地性级别,但是如果在每个空闲的executor里没有未处理的数据,Spark会降低这种级别。牵涉到两点:1等待忙着的CPU空闲了重新开一个task,这样数据就不用传输了。2在远端开一个新的任务,把数据传过去。
Spark的做法是先等待一段时间(spark.locality),如果在等待时间内忙着的CPU闲下来了,采用1,否者采用2.
总结
Spark调优的一个简短的说明,重点讲了数据序列化和内存调优。大部分情况下,使用Kryo序列化和以序列化方式持久化数据会解决大部分的性能问题。
原文地址:http://blog.csdn.net/hi_1234567/article/details/43700717