标签:hadoop bzip2 spark 线程安全 shark
我们的Hadoop生产环境有两个版本,其中一个是1.0.3,为了支持日志压缩和split,我们添加了hadoop-1.2中关于Bzip2压缩的feature. 一切运行良好。
为了满足公司对迭代计算的需求(复杂HiveSQL,广告推荐算法,机器学习 etc), 我们构建了自己的Spark集群,最初是Standalone Mode,版本spark-0.9.1,支持Shark。
上线后,问题接踵而来,最为致命的是,shark在处理Hadooop bzip2文件时计算结果通常会有偏差,有时差的特别离谱(比如,用shark统计1个5kw行的日志,结果只有
3kw行).
显然shark+hive+spark+hadoop的某个环节出了bug。第一次面对这么复杂的系统,着实头疼。
于是,开始蛮干,部署shark+hive+spark+hadoop开发环境,debug,查看出问题的环节。(这个过程中把Spark-core的源码也缕了一遍),始终没有发现什么问题。
后来,参加了Spark技术大会,和同行交流的过程中,幡然悔悟: Spark的task是线程级并发的,而Hadoop MR的task是进程级并发的,那么,会不会是Bzip2存在线程安全问题呢?
回来后,查看Bzip2Codec相关的代码,终于发现了问题所在。(话说,凌晨3点,没有eclipse,用vim 改的), 迫不及待的重新编译Hadoop和Spark,测试,发现处理Bzip2结果OK了!
由于最近比较忙,向社区提交path需要漫长的过程,暂时没有提交社区。具体的patch如下,如有同行遇到同类问题,请借鉴.
Index: src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
===================================================================
--- src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java (版本 525)
+++ src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java (版本 510)
@@ -129,9 +129,7 @@
private int computedBlockCRC, computedCombinedCRC;
private boolean skipResult = false;// used by skipToNextMarker
- //modified by jicheng.song
- //private static boolean skipDecompression = false;
- private boolean skipDecompression = false;
+ private static boolean skipDecompression = false;
// Variables used by setup* methods exclusively
@@ -317,18 +315,13 @@
* @throws IOException
*
*/
- //modified by jicheng.song
- //public static long numberOfBytesTillNextMarker(final InputStream in) throws IOException{
- public long numberOfBytesTillNextMarker(final InputStream in) throws IOException{
- this.skipDecompression = true;
- //
- this.in = new BufferedInputStream(in, 1024 * 9);// >1 MB buffer
- this.readMode = readMode;
- //CBZip2InputStream anObject = null;
+ public static long numberOfBytesTillNextMarker(final InputStream in) throws IOException{
+ CBZip2InputStream.skipDecompression = true;
+ CBZip2InputStream anObject = null;
- //anObject = new CBZip2InputStream(in, READ_MODE.BYBLOCK);
+ anObject = new CBZip2InputStream(in, READ_MODE.BYBLOCK);
- return this.getProcessedByteCount();
+ return anObject.getProcessedByteCount();
}
public CBZip2InputStream(final InputStream in) throws IOException {
@@ -402,9 +395,7 @@
if(skipDecompression){
changeStateToProcessABlock();
- //modified by jicheng.song
- //CBZip2InputStream.skipDecompression = false;
- this.skipDecompression = false;
+ CBZip2InputStream.skipDecompression = false;
}
final int hi = offs + len;
.
线上Spark处理Bzip2引出Hadoop Bzip2线程安全问题
标签:hadoop bzip2 spark 线程安全 shark
原文地址:http://blog.csdn.net/mango_song/article/details/42705855