标签:cas 进入 操作 hive line group 常用sql avr meta
解决方案二
1. 增加reduce 的jvm内存
2. 增加reduce 个数
3. customer partition
4. 其他优化的讨论.
5. reduce sort merge排序算法的讨论
6. 正在实现中的hive skewed join.
7. pipeline
8. distinct
9. index 尤其是bitmap index
方式1
既然reduce 本身的计算需要以合适的内存作为支持,在硬件环境容许的情况下,增加reduce 的内存大小显然有改善数据倾斜的可能,这种方式尤其适合数据分布第一种情况,单个值有大量记录, 这种值的所有纪录已经超过了分配给reduce 的内存,无论你怎么样分区这种情况都不会改变. 当然这种情况的限制也非常明显,
1. 内存的限制存在
2. 可能会对集群其他任务的运行产生不稳定的影响.
方式2
这个对于数据分布第二种情况有效,唯一值较多,单个唯一值的记录数不会超过分配给reduce 的内存. 如果发生了偶尔的数据倾斜情况,增加reduce 个数可以缓解偶然情况下的某些reduce 不小心分配了多个较多记录数的情况. 但是对于第一种数据分布无效.
方式3
一种情况是某个领域知识告诉你数据分布的显著类型,比如hadoop definitive guide 里面的温度问题,一个固定的组合(观测站点的位置和温度) 的分布是固定的, 对于特定的查询如果前面两种方式都没用,实现自己的partitioner 也许是一个好的方式.
方式5
reduce 分配的内存远小于处理的数据量时,会产生multi-pass sort 的情况是瓶颈,那么就要问
1. 这种排序是有必要的嘛?
2. 是否有其他排序算法或优化可以根据特定情况降低他瓶颈的阈值?
3. map reduce 适合处理这种情况嘛?
关于问题1. 如果是group by , 那么对于数据分布情况1 ,hash 比sort 好非常多,即使某一个reduce 比其他reduce 处理多的多的数据,hash 的计算方式也不会差距太大.
问题2. 一个是如果实现block shuffle 肯定会极大的减少排序本身的成本, 另外,如果分区之后的reduce 不是使用copy –> sort-merge –> reduce 的计算方式, 在copy 之后将每个block 的头部信息保存在内存中,不用sort – merge 也可以直接计算reduce, 只不过这时候变成了随机访问,而不是现在的sort-merge 之后的顺序访问. block shuffle 的实现有两种类型,一种是当hadoop 中真正有了列数据格式的时候,数据有更大的机会已经排过序并且按照block 来切分,一般block 为1M ( 可以关注avro-806 ) , 这时候的mapper 什么都不做,甚至连计算分区的开销都小了很多倍,直接进入reduce 最后一步,第二种类型为没有列数据格式的支持,需要mapper 排序得到之后的block 的最大最小值,reduce 端在内存中保存最大最小值,copy 完成后直接用这个值来做随机读然后进行reduce. ( block shuffle 的实现可以关注 MAPREDUCE-4039 , hash 计算可以关注 MAPREDUCE-1639)
问题3 . map reduce 只有两个函数,一个map 一个 reduce, 一旦发生数据倾斜就是partition 失效了,对于join 的例子,某一个key 分配了过多的记录数,对于只有一次partittion的机会,分配错了数据倾斜的伤害就已经造成了,这种情况很难调试,但是如果你是基于map-reduce-reduce 的方式计算,那么对于同一个key 不需要分配到同一个reduce 中,在第一个reduce 中得到的结果可以在第二个reduce 才汇总去重,第二个reduce 不需要sort – merge 的步骤,因为前一个reduce 已经排过序了,中间的reduce 处理的数据不用关心partition 怎么分,处理的数据量都是一样大,而第二个reduce 又不使用sort-merge 来排序,不会遇到现在的内存大小的问题,对于skewed join 这种情况瓶颈自然小很多.
方式6
目前hive 有几个正在开发中的处理skewed join 情况的jira case, HIVE-3086 , HIVE-3286 ,HIVE-3026 . 简单介绍一下就是facebook 希望通过手工处理提前枚举的方式列出单个倾斜的值,在join 的时候将这些值特殊列出当作map join 来处理,对于其他值使用原来的方式. 我个人觉得这太不伸缩了,值本身没有考虑应用过滤条件和优化方式之后的数据量大小问题,他们提前列出的值都是基于整个分区的. join key 如果为组合key 的情况也应该没有考虑,对metastore 的储存问题有限制,对输入的大表和小表都会scan 两次( 一次处理非skew key , 一次处理skew key 做map join), 对输出表也会scan 两次(将两个结果进行merge) , skew key 必须提前手工列出这又存在额外维护的成本,目前因为还没有完整的开发完到能够投入生产的情况,所以等所有特性处理完了有了文档在看看这个处理方式是否有效,我个人认为的思路应该是接着bucked map join 的思路往下走,只不过不用提前处理cluster key 的问题, 这时候cluster key 的选择应该是join key + 某个能分散join key 的列, 这等于将大表的同一个key 的值分散到了多个不同的reduce 中,而小表的join key 也必须cluster 到跟大表对应的同一个key , join 中对于数据分布第二种情况不用太难,增加reduce 个数就好,主要是第一种,需要大表的join key 能够分散,对于同样join key 的小表又能够匹配到所有大表中的记录. 这种思路就是不用扫描大表两遍或者结果输出表,不需要提前手工处理,数据是动态sample 的应用了过滤条件之后的数据,而不是提前基于统计数据的不准确结果. 这个基本思路跟tenzing 里面描述的distributed hash join 是一样的,想办法切成合适的大小然后用hash 和 map join .
方式7
当同时出现join 和group 的时候, 那么这两个操作应该是以pipeline (管道) 的方式执行. 在join 的时候就可以直接使用group 的操作符减少大量的数据,而不是等待join 完成,然后写入磁盘,group 又读取磁盘做group操作. HIVE-2206 正在做这个优化. hive 里面是没有pipeline 这个概念的. 像是cloudera 的crunch 或者twitter 的Scalding 都是有这种概念的.
方式8
distinct 本身就是group by 的一种简写,我原先以为count(distinct x)这种跟group by 是一样的,但是发现hive 里面distinct 明显比group by 要慢,可能跟group by 会有map 端的combiner有关, 另外观察到hive 在预估count(distinct x) 的reduce 个数比group by 的个数要少 , 所以hive 中使用count(distinct x) , 要么尽量把reduce 个数设置大,直接设置reduce 个数或者hive.exec.reducers.bytes.per.reducer 调小,我个人比较喜欢调后面一个,hive 目前的reduce 个数没有统计信息的情况下就是用map端输入之前的数值, 如果你是join 之后还用count(distinct x) 的话,这个默认值一般都会悲剧,如果有where 条件并能过滤一定数量的数据,那么默认reduce 个数可能就还好一点. 不管怎样,多浪费一点reduce slot 总比等十几甚至几十分钟要好, 或者转换成group by 的写法也不错,写成group by 的时候distributed by 也很有帮助.
方式9
hive 中的index 就是物化视图,对于group by 和distinct 的情况等于变成了map 端在做计算,自然不存在倾斜. 尤其是bitmap index , 对于唯一值比较少的列优势更大,不过index 麻烦的地方在于需要判断你的sql 是不是常用sql , 另外如果create index 的时候没有选你查询的时候用的字段,这个index 是不能用的( hive 中是永远不可能有DBMS中的用index 去lookup 或者join 原始表这种概念的)
总结
数据倾斜没有一劳永逸的方式可以解决,了解你的数据集的分布情况,然后了解你所使用计算框架的运行机制和瓶颈,针对特定的情况做特定的优化,做多种尝试,观察是否有效
标签:cas 进入 操作 hive line group 常用sql avr meta
原文地址:https://www.cnblogs.com/Vowzhou/p/10455558.html