标签:基于 场景 分布式 个数 防止 路由器 .so 部分 server
首先谈一下什么是数据倾斜?
答:map /reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长。
现象是 : 进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成;查看未完成的子任务,可以看到本地读写数据量积累非常大,通常超过10GB可以认定为发生数据倾斜。
如何发生的?
数据的倾斜主要是两个的数据相差的数量不在一个级别上 ,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完。
优化方案 :
方式一 : reduce 本身的计算需要以合适的内存作为支持,在硬件环境容许的情况下,增加reduce 的JVM内存大小显然有改善数据倾斜的可能,这种方式尤其适合数据分布第一种情况,单个值有大量记录, 这种值的所有纪录已经超过了分配给reduce 的内存,无论你怎么样分区这种情况都不会改变. 当然这种情况的限制也非常明显, 1.内存的限制存在,2.可能会对集群其他任务的运行产生不稳定的影响。
方式二 : 这个对于数据分布第二种情况有效,情况(一值较多,单个唯一值的记录数不会超过分配给reduce 的内存). 如果发生了偶尔的数据倾斜情况,增加reduce 个数可以缓解偶然情况下的某些reduce 不小心分配了多个较多记录数的情况. 但是对于第一种数据分布无效。
方式三 : 一种情况是某个领域知识告诉你数据分布的显著类型,比如<<hadoop权威指南>> 里面的温度问题,一个固定的组合(观测站点的位置和温度) 的分布是固定的, 对于特定的查询如果前面两种方式都没用,实现自己的partitioner 也许是一个好的方式。
总结 : 数据倾斜没有一劳永逸的方式可以解决,了解你的数据集的分布情况,然后了解你所使用计算框架的运行机制和瓶颈,针对特定的情况做特定的优化,做多种尝试,观察是否有效。
这样处理是不合理的,因为那么 namenode 格式化操作,是对文件系统进行格式化,namenode 格式化时清空 dfs/name 下空两个目录下的所有文件,之后,会在目录 dfs.name.dir 下创建文件。文本不兼容,有可能时 namenode 与 datanode 的 数据里的 namespaceID、clusterID 不一致,找到两个 ID 位置,修改为一样即可解决。
HDFS是解决海量数据存储问题的存储系统。具有高可靠,读写效率高等特点。通过将大文件拆分成一个一个block块,Hadoop2.x默认是128M,分布式的存储到各个DataNode节点中并且备份,通过横向扩展解决了纵向扩展的问题,大大提升了读写的效率和降低了成本。同时,通过设置NameNode主节点来记录每个block的元数据信息,包括块名,所在DataNode节点,备份所在位置,大小等等信息,实现文件的高可靠存储和高效率读取。而且在Hadoop2.0以上版本,通过HA解决了NameNode的单点故障问题,使得HDFS更为可靠。
先将这些小文件保存到本地的一个路径中同一个文件中,通过shell脚本,可以设置这个新文件达到多大再上传,一般设置为128M,上传到HDFS中,这样就实现了小文件上传之前的合并。还有,一般当天的日志和数据都存在一个HDFS路径中,如果没有达到上传大小,可以设置每天凌晨对前一天的本地文件路径的扫描,如果发现有文件,不管多大,都上传到前一天的HDFS文件目录下。
1. Collect阶段 : 将MapTask的结果输出到默认大小为100M的环形缓冲区 . 保存的是key/value ,Partition分区信息等。
2. Spill阶段 : 当内存中的数据量达到一定的阈值的时候,就会将数据写入到本地磁盘 , 在将数据写入到磁盘之前需要对数据进行一次排序的操作。如果配置了combiner,还会将相同分区号和key的数据进行排序。
3. Merge阶段 : 把所有的溢出的临时文件进行一次合并操作 , 以确保一个MapTask最终只产生一个中间数据文件。
4 . Copy=阶段 : ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据 , 这些数据默认会保存到内存的缓冲区中 , 当内存的缓冲区达到一定的阈值的时候,就会将数据写到磁盘之上
5 . 在reduceTask远程复制数据的同时 , 会在后台开启两个线程对内存到本地的数据文件进行合并操作
6 . Sort阶段 : 在对数据进行合并的同时 , 会进行排序操作 , 由于MapTask阶段已经对数据进行了局部的排序 , ReduceTask只需要保证Copy的数据最终整体有效性即可。
Shuffer中的缓冲区大小会影响到MapReduce程序的执行效率 , 原则上说 , 缓冲区越大 , 磁盘IO的次数越少,执行速度越快.
6 . 两个文件合并的问题
给定a、b两个文件,各存放50亿个url,每个url各占用64字节,内存限制是4G,如何找出a、b文件共同的url?主要的思想是把文件分开进行计算,在对每个文件进行对比,得出相同的URL,因为以上说是含有相同的URL所以不用考虑数据倾斜的问题。详细的解题思路为:可以估计每个文件的大小为5G*64=300G,远大于4G。所以不可能将其完全加载到内存中处理。考虑采取分而治之的方法。
遍历文件a,对每个url求取hash(url)%1000,然后根据所得值将url分别存储到1000个小文件(设为a0,a1,...a999)当中。这样每个小文件的大小约为300M。遍历文件b,采取和a相同的方法将url分别存储到1000个小文件(b0,b1....b999)中。这样处理后,所有可能相同的url都在对应的小文件(a0 vs b0, a1 vs b1....a999 vs b999)当中,不对应的小文件(比如a0 vs b99)不可能有相同的url。然后我们只要求出1000对小文件中相同的url即可。 比如对于a0 vs b0,我们可以遍历a0,将其中的url存储到hash_map当中。然后遍历b0,如果url在hash_map中,则说明此url在a和b中同时存在,保存到文件中即可。
如果分成的小文件不均匀,导致有些小文件太大(比如大于2G),可以考虑将这些太大的小文件再按类似的方法分成小小文件即可。
按照你的队列的形式!给不同的队列划分不同的百分比,当然如果一个队列空闲!那么另外一个队列就会不断的“吞噬”剩下的资源直至达到(自己设定的最大“吞噬”量的百分比),如果这个时候被吞噬的资源又开始忙的话!那么就会释放资源!最终达到自己设定的那个动态的平衡点!
例子 :
Root
队列一 : prod 40% 剩余25%
队列二 :env 60% 剩余75%
队列二的子队列 : mapreduce 50%
队列二的子队列 : spark 50%
动态平衡点 : 队列一 :40% 队列二占 60% 当队列一空闲 或者队列二空闲的时候 , 另外一个不空闲的队列最多吞噬到75% 。
而队列二的子队列 的50%+50% 等于队列二的60%
主要使用的硬件 : CPU+memory
如果当前进程是进行存储的 , 对CPU和内存要求并不是很高。如果当前进程是进行计算的,那么对CPU和memory的要求都是很高的
硬件的主要配置有
:memory 当CPU不够的话 : 程序执行运算会慢一点,当内存不够的
: 会导致heap溢出,GC。因为在内存不够的时候,Yarn里面有一个动态的线程就会检测到 , 然后就给你杀死了
二、Linux层面的一些调优 :
1、调整Linux最大文件打开数和最大进程数
vi /etc/security/limits.conf
soft nofile 65535 单个用户可用的最大进程数量(软限制)
hard nofile 65535 单个用户可用的最大进程数量(硬限制)
soft nproc 65535 可打开的文件描述符的最大数(软限制)
hard nproc 65535 可打开的文件描述符的最大数(硬限制)
2、网络参数net.core.somaxconn(定义每一个端口最大的监听队列的长度,默认值是128)
more
/etc/sysct1.conf | grep net.core.somaxconn
sysct1 -w net.core.somaxconn=37628
echo vm.swappiness=0 >> /etc/sysct1.conf
3、设置swap(虚拟内存)分区 ->0 使用内存,这样会更快
改成0不是禁用swap交换,而是优先考虑内存
More
/etc/sysctl.conf |vm.swappiness
Echo vm.swappiness=0 >> /etc/sysctl.conf
三、NameNode上JVM的参数调优 :
假想一种情况 ,当namnode挂掉以后再次重启的时候 , 会加载元数据到内存中 , 那么在我们的堆存中还分年轻代和老年代。我们创建的对象之处都是在年轻带(新生代)中的。在年轻带中还分:eden、from space、to space。最开始的对象是在eden区中,当eden区放满以后JVM会触发GC,就会将对象放在生还区1(From space)和生还区2(to space)。然后在经过多次GC以后还能存活的对象放在老年代。(下面是新生代的详解,跟本次钓友无关)
(-------新生代 (转载)
新创建的对象分配的新生代中,经历多次GC之后进入老年代。新生代具体细分有分为Eden区和survivor区,survivor区又包含S0和S1两个区域。一般来说,我们创建了一个对象,它首先被放到新生代的Eden区。当Eden填满时执行一次Minor GC,此时存活下来的对象进入survivor区第一块survivor
space S0,并且年龄加1。Eden被清空;等Eden区再满了,就再触发一次Minor GC,Eden和S0中的存活对象又会被复制送入第二块survivor
space S1(这个过程非常重要,因为这种复制算法保证了S1中来自S0和Eden两部分的存活对象占用连续的内存空间,避免了碎片化的发生)。S0和Eden被清空,然后下一轮S0与S1交换角色,如此循环往复。如果对象的复制次数达到16次,该对象就会被送到老年代中。-------)
对于调优我们应该怎么样namenode上JVM的参数:
/etc/hadoop/hadoop-env.sh
找到 export HADOOP_NAMAENODE_OPTS="${HADOOP_NAMENODE_OPTS}
-Xms10240m
-Xmx10240m
-XX:+UseParNewGC
设置年轻代为多线程并行收集
-XX:+UseConcMarkSweepGC
年老代激活CMS收集器(标记算法),可以尽量减少fullGC
-XX:+CMSConcurrentMTEnabled
当该标志被启用时,并发的CMS阶段将以多线程执行
-XX:CMSInitiatingOccupancyFraction=70
当年老代空间被占用70%的时候触发CMS垃圾收集
-XX:+CMSClassUnloadingEnabled 设置这个参数表示对永久带进行垃圾回收,CMS默认不对永久代进行垃圾回收"
总结 : 把年轻带中的内存分配点给老年代
--XX:OldSize : 设置JVM启动分配的老年代内存大小, 新生代内存的初始值大小 -XX:NewSize
我们的namenode启动后,所有的元数据信息一直加载到年老代中。name年老代的压力会比较大,同时年轻带那边会剩余很多的空间,所以有时候我们需要将年老代内存调大。 官方推荐的是3/8或者(1/4--1/3)的内存给年轻带。剩下的给年老代。如果内存足够全部调大也无所谓。比如 : Namenode节点分配的内存是15个G。年轻代给了5个G。年老代给了10G。实际年轻代可能1G都用不上,还空余了4G。反倒年老代已经触发了警戒线。这时候就要适当的调整。 当然这是基于存储来说的
Mr/Hive的调优 : 这个刚好与(namenode相反):我们做mr/hive计算时候,创建的对象就会频繁的发生在年轻代。因为年老代的对象可能就不怎么用了。而且我们经常会出现井喷式的计算,会不断的触发GC。这样就导致了频繁的垃圾回收。所以我们在mr的时候,需要将年轻代给调大了。
四、配置方面的调优 :
一、core-site.xml 的调整
1、
Ipc.server.listen.queue.size
控制了服务端socket的监听队列长度,默认值128.这个要和
net.core.somaxconn 配合使用(定义每一个端口最大的监听队列的长度,默认值是128)
<property>
<name>Ipc.server.listen.queue.size</name>
<value>65535</value>
</property>
2、
Io.file.buffer.size默认值4096(4K),作为hadoop的缓冲区,用于hdfs文件的读写。还有map的输出都用到这个缓冲区,较大的缓存能提高数据传输这是读写sequence file的buffer size,可减少I/O次数。如果集群比较大,建议把参数调到65535-131072
Name :Io.file.buffer.size
Value:65535
3、
Hdfs的回收站,一但操作事物,删掉了HDFS中的数据,可以找回。默认不开启
Name:fs.trash.interval Value:1440(分钟)1天
恢复方式:
Hdfs的回收站在: /user/$USER/.Trash/Current/user/$USER/目录下
Hadoop dfs -mv /xxxxxx/xxxx /oooo/oooo
二、hdfs-site.xml 配置文件的调整:
1、#HDFS中的block块大小的设置
#Name:dfs.blockSize
#Value:134217728(128M)
#但是有时候我们场景中可能出现大量小文件,这时候我们可以适当的调小,比如16M
#虽然可以用hadoop的压缩方案,但是压缩比例太高了,几百M的东西能压缩成几百K,
#所以把block块变小
2、带宽:默认是1M(在balancer的时候,设置hdfs中数据移动速度)
集群一般是千M路由器,所以尽量改大
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>1048576</value>
</property>
3、真正datanode数据保存路径,可以写多块硬盘。主要用来实现IO平衡,因此会显著改进磁盘IO性能
Name:dfs.datanode.data.dir
Value:/sda,/sda2/sda3
但是这样做,会导致慢磁盘的现象。
4、Namenode
server RPC 的处理线程数,默认是10,namenode线程通过RPC的方式跟datanode通信,如果datanode数量太多时可能出现RPC timeout
解决:
提升网络速度或者提高这个值。但是thread数量增多也代表着namenode消耗的内存也随着增加
Dfs.namenode.hadler.count 30
Datanode serverRPC 处理的线程数,默认10
Dfs.datanode.hadler.count 20
5、
Datanode上负责文件操作的线程数
Dfs.datanode.maxtrasfer.threads 8192
6、
datanode所保留的空间大小,需要设置一些,默认是不保留
Name:dfs.datanode.du.reserved
Value:字节为单位
三、MapReduce层面的调优; mapred-site.xml的参数
1、
Mapreduce.framework.name
yarn 已yarn为基础来调度
Mapreduce.job.reduces
3 默认开启的reduce数量,多台机器分担一台机器的压力
2、
Mapreduce环形缓冲区所占内存大小默认100M
Mapreduce.task.io.sort.mb
200
3、
环形缓冲区的阈值 默认的是0.8
Mapreduce.map.sort.spill.percent
0.6
4、
Reduce
Task中合并小文件时,一次合并的文件数据,每次合并的时候选择最小的前10(默认值)进行合并。
Mapreduce.task.io.sort.factor
50
5、
map输出是否进行压缩,如果压缩就会多耗cpu,但是减少传输时间,如果不压缩,就需要较多的传输带宽。需要配 合mapreduce.map.output.compress.codec(指定压缩方式)
Name:Mapreduce.map.output.compress
Value:true
Name:Mapreduce.map.output.compress.codec
Value:org.apache.hadoop.io.compress.SnappyCodec(牺牲CPU换IO和磁盘的方式)
6、
Reduce
shuffle阶段并行传输的数量。根据集群大小可调(牛人做的事)
Name:mapreduce.reduce.shuffle.parallelcopies
Value:20
7、
Map和reduce是通过http传输的,这个是设置传输的并行数
Name:Mapreduce.tasktracker.http.threads
Value:40
8、还有一点效率稳定性参数
(1)
mapreduce.map.speculative: 是否为 Map Task 打开推测执行机制,默认为 true, 如果为 true,则可以并行执行一些 Map 任务的多个实例。
(2)
mapreduce.reduce.speculative: 是否为 Reduce Task 打开推测执行机制,默认为 true
(3)
mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时最小切片大小,默认 1。
(5)mapreduce.input.fileinputformat.split.maxsize:
FileInputFormat做切片时最大切片大小
推测执行机制(Speculative
Execution):它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
mapreduce的计数器 :
在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助mapreduce 框架中提供的全局计数器来实现。
9、容错性相关的参数 :
(1)
mapreduce.map.maxattempts: 每个 Map Task 最大重试次数,一旦重试参数超过该值,则认为 Map Task 运行失败,默认值:4。
(2) mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为 Map Task 运行失败,默认值:4。
(3)
mapreduce.map.failures.maxpercent: 当失败的 Map Task 失败比例超过该值,整个作业则失败,默认值为 0.
如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于 5%的 Map
Task 失败(如果一个 Map Task 重试次数超过mapreduce.map.maxattempts,则认为这个 Map Task 失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。
(4)
mapreduce.reduce.failures.maxpercent: 当失败的 Reduce Task 失败比例超过该值为,整个作业则失败,默认值为 0.
(5) mapreduce.task.timeout:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该 task
处于 block 状态,可能是临时卡住,也许永远会卡住。
为了防止因为用户程序永远 block 不退出,则强制设置了一个超时时间(单位毫秒),默认是600000,值为 0 将禁用超时。
--------------------------------重要参数配置--------------------------
1、
Map阶段,map端的内存分配多少 个 其实就是 : Map Task 可使用的内存上限(单位:MB),默认为 1024。如果 Map Task 实际使用的资源量超过该值,则会被强制杀死。可以适当的调大
Name:mapreduce.map.memory.mb
Value:1280
2、
Map端启用JVM所占用的内存大小 (官方----->200M heap over/GC)
-XX:-UseGCOverheadLimit(禁用GC,我们使用parNew+CMS方式)
Name:Mapreduce.map.java.opts
Value:-Xmx1024m
-XX:-UseGCOverheadLimit
3、
reduce阶段,reduce端的内存分配多少 一个 Reduce Task 可使用的资源上限(单位:MB),默认为 1024。如果
Reduce Task 实际使用的资源量超过该值,则会被强制杀死。
Name:mapreduce.reduce.memory.mb
Value:1280
4、
reduce端启用JVM所占用的内存大小
-XX:-UseGCOverheadLimit(禁用GC)
Name:Mapreduce.reduce.java.opts
Value:-Xmx1024m
-XX:-UseGCOverheadLimit
【注意:java的堆栈肯定要小于memory的,因为还要考虑非堆和其他】上面最大能分配多少,还取决于yarn的配置。
mapreduce.map.cpu.vcores:
每个 Maptask 可用的最多 cpu core 数目, 默认值: 1
mapreduce.reduce.cpu.vcores:
每个 Reducetask 可用最多 cpu core 数
默认值: 1
(这两个参数调整的是核数)
Yarn-site.xml配置 //以下在yarn启动之前就配置在服务器的配置文件中才能生效 :
1、
给nodemanager可用的物理内存
Name:yarn.nodemanager.resource.memory-mb
Value:8192
注意,如果你的节点内存资源不够 8GB,则需要调减小这个值,而 YARN不会智能的探测节点的物理内存总量。
2、
单个任务可申请的最少内存 --》 也就是RM 中每个容器请求的最小配置,以 MB 为单位,默认 1024。
Name:Yarn.scheduler.minimum-allocation-mb
Value:1024
3、
单个任务可申请的最大内存--》也就是RM 中每个容器请求的最大分配,以 MB 为单位,默认 8192。
Name:yarn.scheduler.maximum-allocation-mb
Value:8192
4、
Yarn这个节点可使用的虚拟CPU个数,默认是8.但我们通常配置成跟物理CPU个数一样
Name:Yarn.nodemanager.resource.cpu-vcores
Value:4
标签:基于 场景 分布式 个数 防止 路由器 .so 部分 server
原文地址:https://www.cnblogs.com/fjdsj/p/10092530.html