单位有一组业务一直都是使用Streaming压缩文本日志,大体上就是设置作业输出为BZ2格式,怎么输入就怎么输出,没有任何处理功能在里面。但是每行结尾都多出来一个TAB。终于,有一个业务需要使用TAB前的最后一个字段,不去掉不行了。
虽然是个小问题,但是网上搜了一圈,也没有很好的解决。很多人都遇到了,但是单位的业务比较特殊,只有map没有reduce。http://stackoverflow.com/questions/20137618/hadoop-streaming-api-how-to-remove-unwanted-delimiters这个上面直接说“As I discussed with friends, there‘s no easy way to achieve the goal,...”。
Streaming有个特点,默认是按照TAB去区分Key和Value。如果没有设置Key字段的数目,默认一行里面第一个TAB之前的做Key,后面的是Value。如果没有找到Tab,就全都是Key字段,Value是空。之所以后面会多出个Tab,正是Key和Value之间的那个Tab。
首先是考察Streaming的Map,在PipeMapper.java。InputWriter处理输出,所以尝试实现自定义输出。在MapReduce作业配置里面,stream.map.input.writer.class负责指定InputWriter是哪一个,默认是TextInputWriter。Streaming在这里比较坑,增加-Dstream.map.input.writer.class=XXX的选项并不能令Streaming使用自定义的实现类,必须实现自己的IdentifierResolver,然后在其中对不同类型的输入设定不同类型的InputWriter,而其中的输入类型,必须由stream.map.input选项传入。是否设置成功以作业运行时候JobTracker的配置参数表为准。
不巧的是,使用自定义的InputWriter代替TextInputWriter,行尾的Tab是没了,行首又多了个数字。估计是Hadoop给Mapper传入的Key被打印出来了。oooorz....不能瞎猜了,还是看看代码吧。
好在代码蛮短的还是。
Streaming会把本身、以及用户-file -cacheFile -cacheArchive 等选项指定的文件,打成一个Jar包提交到集群进行MR作业。把集群的输出,作为用户实现Mapper的输入;读取用户实现Mapper的输出,作为整个Map作业的输出。Input/Output相对于用户自定义作业,Writer/Reader体现为Streaming的行为,因此是InputWriter和OutputReader。简单来讲,
Hadoop给出的(K,V)---streaming---> 用户自定义Mapper ---streaming--->Hadoop的Mapper输出
Streaming由PipeMapRunner启动作业,异步收集用户作业输出,进而向Hadoop汇报作业进度。整个作业的基础设置、作业提交都是由StreamJob类完成。
作业的执行是PipeMapRed/PipeMapper/PipReducer/PipCombiner这几个类。解决方案也就在这里。在MROutputThread的run方法里面,outCollector.collect(key, value);这句之前,加上下面的代码片段即可。
if (value instanceof Text) { if (value.toString().isEmpty()) value = NullWritable.get(); }
是不是很简单。
为什么这样做是可行的?还是源于org.apache.hadoop.mapreduce.lib.output.TextOutputFormat。直接上代码。
package org.apache.hadoop.mapreduce.lib.output; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.*; /** An {@link OutputFormat} that writes plain text files. */ @InterfaceAudience.Public @InterfaceStability.Stable public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { public static String SEPERATOR = "mapreduce.output.textoutputformat.separator"; protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can‘t find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can‘t find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special * case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } } public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator= conf.get(SEPERATOR, "\t"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } } }
注意到LineRecordWriter.write了么?
后记:
A. 网上很多是想办法修改分隔符,把TAB换成空字符。这是一个非常粗暴的做法,基本上就是埋坑!为什么呢?
日志文本内容可以是很丰富的,这次出问题是因为每行没有TAB。如果换做含有TAB的文本,把分隔符变为空串,就把日志中原有的TAB去掉了。
B. 之所以这么搞,也是受到了stackoverflow的这个Q&A的启发。http://stackoverflow.com/questions/18133290/hadoop-streaming-remove-trailing-tab-from-reducer-output。类似的,Q&A也是采用修改分隔符的办法,是不可取的。但是仔细发现,是可以在自己重写的TextOutputFormat<K,V>里,修改LineRecordWriter.write方法的。
重写TextOutputFormat是十分优雅的解决,看似修改了Hadoop本身的东西,但是在Streaming最新版没有加入这个fix之前,防止对每个版本的Streaming都要变更、重新编译打包。另外,Streaming不是独立的项目,编译它需要同时编译Hadoop!
用vim写Java打包确实略蛋疼,周一上班试试这个更加优雅的办法。
C. 虽然是修改了Streaming代码,但是不需要考虑会影响同一机器所有用户的问题,也不用修改$HADOOP_HOME下的Streaming包。streaming提供了这个参数stream.shipped.hadoopstreaming。
D. 有些设置似乎是指对Reducer生效,对于这种只有Mapper的作业不起作用。比如
mapred.textoutputformat.ignoreseparator mapred.textoutputformat.separator
设置了,没看到什么效果。
再有就是,命令行选项里面如果写-DXXX= \ 这样的语句,似乎也没有把这个参数设置为空串的效果,写-DXXX= ""也是一样。
本文出自 “新青年” 博客,请务必保留此出处http://luckybins.blog.51cto.com/786164/1601722
原文地址:http://luckybins.blog.51cto.com/786164/1601722