标签:
一,需求:
在map执行前,即setInputFormatClass过程,会进行数据的读入,默认的是每次读入一行数据,进行计算。现在需要改成每次读入两行数据并且合并结果输出。
二,思路及解决方法:
建议先看看他们的源码,理解思路。
我这里是采用的TextInputFormat.class的输入格式。它的key是每一行的偏移位置,value就是它这一行的内容。其中有创建LineRecordReader类,它就是用来读取数据的封装类,我们需要重写它。
在LineRecordReader类中,观察出其nextKeyValue()方法中,有涉及到读取数据的方法,readLine(),在这个readLine()方法之前加个boolean值,用来控制后面不会将已经读到了的数据清空,然后再加个for循环用来做多次读取。再把这个传到readLine()中重写这个方法。
这事又需要重写它的父类LineReader,在LineRecordReader中是调用的SplitLineReader类,它是继承的LineReader类,还需要重写其他两个类,UncompressedSplitLineReader和CompressedSplitLineReader这两个类好像是用来做压缩的,不用管直接复制就行。
回到LineReader类,我们需要重载他的readLine()方法增加了一个boolean的参数。并将参数传到重载的readCustomLine()和readDefaultLine()在这个两个方法中只需利用boolean值,对数据清除进行判断,其他代码复制即可。
下面一个简图展示这个过程:
1,输入的数据:
2,结果:
源码展示:
,1, 测试类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestUserInputFormat {
public static class UserMapper extends Mapper<LongWritable,Text, LongWritable, Text>{
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
public static void main(String[] args) {
try {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,"Test lineRecordReader");
job.setJarByClass(TestUserInputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(UserMapper.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://192.168.61.128:9000/inline/"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.61.128:9000/outline/"+System.currentTimeMillis()+"/"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2,输入格式类
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import com.google.common.base.Charsets;
/** An {@link InputFormat} for plain text files. Files are broken into lines.
* Either linefeed or carriage-return are used to signal end of line. Keys are
* the position in the file, and values are the line of text.. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
}
3,读取数据类:
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
/**
* Treats keys as offset in file and value as line.
*/
@InterfaceAudience.LimitedPrivate({"MapReduce", "Pig"})
@InterfaceStability.Evolving
public class LineRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
public static final String MAX_LINE_LENGTH =
"mapreduce.input.linerecordreader.line.maxlength";
private long start;
private long pos;
private long end;
private SplitLineReader in;
private FSDataInputStream fileIn;
private Seekable filePosition;
private int maxLineLength;
private LongWritable key;
private Text value;
private boolean isCompressedInput;
private Decompressor decompressor;
private byte[] recordDelimiterBytes;
public LineRecordReader() {
}
public LineRecordReader(byte[] recordDelimiter) {
this.recordDelimiterBytes = recordDelimiter;
}
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
private int maxBytesToConsume(long pos) {
return isCompressedInput
? Integer.MAX_VALUE
: (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
}
private long getFilePosition() throws IOException {
long retVal;
if (isCompressedInput && null != filePosition) {
retVal = filePosition.getPos();
} else {
retVal = pos;
}
return retVal;
}
private int skipUtfByteOrderMark() throws IOException {
// Strip BOM(Byte Order Mark)
// Text only support UTF-8, we only need to check UTF-8 BOM
// (0xEF,0xBB,0xBF) at the start of the text stream.
int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
Integer.MAX_VALUE);
int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
// Even we read 3 extra bytes for the first line,
// we won‘t alter existing behavior (no backwards incompat issue).
// Because the newSize is less than maxLineLength and
// the number of bytes copied to Text is always no more than newSize.
// If the return size from readLine is not less than maxLineLength,
// we will discard the current line and read the next line.
pos += newSize;
int textLength = value.getLength();
byte[] textBytes = value.getBytes();
if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
(textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
// find UTF-8 BOM, strip it.
LOG.info("Found UTF-8 BOM and skipped it");
textLength -= 3;
newSize -= 3;
if (textLength > 0) {
// It may work to use the same buffer and not do the copyBytes
textBytes = value.copyBytes();
value.set(textBytes, 3, textLength);
} else {
value.clear();
}
}
return newSize;
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
boolean flag=true;
for(int i=1;i<=2;i++){
if(i==2){
flag=false;
}
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos),flag);
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
@Override
public LongWritable getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
}
}
public synchronized void close() throws IOException {
try {
if (in != null) {
in.close();
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
}
}
}
4,读取数据的父类LineReader:
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
/**
* A class that provides a line reader from an input stream.
* Depending on the constructor used, lines will either be terminated by:
* <ul>
* <li>one of the following: ‘\n‘ (LF) , ‘\r‘ (CR),
* or ‘\r\n‘ (CR+LF).</li>
* <li><em>or</em>, a custom byte sequence delimiter</li>
* </ul>
* In both cases, EOF also terminates an otherwise unterminated
* line.
*/
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public class LineReader implements Closeable {
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private InputStream in;
private byte[] buffer;
// the number of bytes of real data in the buffer
private int bufferLength = 0;
// the current position in the buffer
private int bufferPosn = 0;
private static final byte CR = ‘\r‘;
private static final byte LF = ‘\n‘;
// The line delimiter
private final byte[] recordDelimiterBytes;
/**
* Create a line reader that reads from the given stream using the
* default buffer-size (64k).
* @param in The input stream
* @throws IOException
*/
public LineReader(InputStream in) {
this(in, DEFAULT_BUFFER_SIZE);
}
/**
* Create a line reader that reads from the given stream using the
* given buffer-size.
* @param in The input stream
* @param bufferSize Size of the read buffer
* @throws IOException
*/
public LineReader(InputStream in, int bufferSize) {
this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = null;
}
/**
* Create a line reader that reads from the given stream using the
* <code>io.file.buffer.size</code> specified in the given
* <code>Configuration</code>.
* @param in input stream
* @param conf configuration
* @throws IOException
*/
public LineReader(InputStream in, Configuration conf) throws IOException {
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
}
/**
* Create a line reader that reads from the given stream using the
* default buffer-size, and using a custom delimiter of array of
* bytes.
* @param in The input stream
* @param recordDelimiterBytes The delimiter
*/
public LineReader(InputStream in, byte[] recordDelimiterBytes) {
this.in = in;
this.bufferSize = DEFAULT_BUFFER_SIZE;
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = recordDelimiterBytes;
}
/**
* Create a line reader that reads from the given stream using the
* given buffer-size, and using a custom delimiter of array of
* bytes.
* @param in The input stream
* @param bufferSize Size of the read buffer
* @param recordDelimiterBytes The delimiter
* @throws IOException
*/
public LineReader(InputStream in, int bufferSize,
byte[] recordDelimiterBytes) {
this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = recordDelimiterBytes;
}
/**
* Create a line reader that reads from the given stream using the
* <code>io.file.buffer.size</code> specified in the given
* <code>Configuration</code>, and using a custom delimiter of array of
* bytes.
* @param in input stream
* @param conf configuration
* @param recordDelimiterBytes The delimiter
* @throws IOException
*/
public LineReader(InputStream in, Configuration conf,
byte[] recordDelimiterBytes) throws IOException {
this.in = in;
this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = recordDelimiterBytes;
}
/**
* Close the underlying stream.
* @throws IOException
*/
public void close() throws IOException {
in.close();
}
/**
* Read one line from the InputStream into the given Text.
*
* @param str the object to store the given line (without newline)
* @param maxLineLength the maximum number of bytes to store into str;
* the rest of the line is silently discarded.
* @param maxBytesToConsume the maximum number of bytes to consume
* in this call. This is only a hint, because if the line cross
* this threshold, we allow it to happen. It can overshoot
* potentially by as much as one buffer length.
*
* @return the number of bytes read including the (longest) newline
* found.
*
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
if (this.recordDelimiterBytes != null) {
return readCustomLine(str, maxLineLength, maxBytesToConsume);
} else {
return readDefaultLine(str, maxLineLength, maxBytesToConsume);
}
}
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume,boolean flag) throws IOException {
if (this.recordDelimiterBytes != null) {
return readCustomLine(str, maxLineLength, maxBytesToConsume,flag);
} else {
return readDefaultLine(str, maxLineLength, maxBytesToConsume,flag);
}
}
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
throws IOException {
return in.read(buffer);
}
/**
* Read a line terminated by one of CR, LF, or CRLF.
*/
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
/* We‘re reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
* everything and read another buffer from the stream.
* 2. An unambiguously terminated line is in buffer, so we just
* copy to str.
* 3. Ambiguously terminated line is in buffer, i.e. buffer ends
* in CR. In this case we copy everything up to CR to str, but
* we also need to see what follows CR: if it‘s LF, then we
* need consume LF as well, so next call to readLine will read
* from after that.
* We use a flag prevCharCR to signal if previous character was CR
* and, if it happens to be at the end of the buffer, delay
* consuming it until we have a chance to look at the char that
* follows.
*/
str.clear();
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //length of terminating newline
boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
}
bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) {
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
break;
}
if (prevCharCR) { //CR + notLF, we are at notLF
newlineLength = 1;
break;
}
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn;
if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer
}
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int)bytesConsumed;
}
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume,boolean flag)
throws IOException {
/* We‘re reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
* everything and read another buffer from the stream.
* 2. An unambiguously terminated line is in buffer, so we just
* copy to str.
* 3. Ambiguously terminated line is in buffer, i.e. buffer ends
* in CR. In this case we copy everything up to CR to str, but
* we also need to see what follows CR: if it‘s LF, then we
* need consume LF as well, so next call to readLine will read
* from after that.
* We use a flag prevCharCR to signal if previous character was CR
* and, if it happens to be at the end of the buffer, delay
* consuming it until we have a chance to look at the char that
* follows.
*/
if(flag){
str.clear();
}
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //length of terminating newline
boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
}
bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) {
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
break;
}
if (prevCharCR) { //CR + notLF, we are at notLF
newlineLength = 1;
break;
}
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn;
if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer
}
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int)bytesConsumed;
}
private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume,boolean flag)
throws IOException {
/* We‘re reading data from inputStream, but the head of the stream may be
* already captured in the previous buffer, so we have several cases:
*
* 1. The buffer tail does not contain any character sequence which
* matches with the head of delimiter. We count it as a
* ambiguous byte count = 0
*
* 2. The buffer tail contains a X number of characters,
* that forms a sequence, which matches with the
* head of delimiter. We count ambiguous byte count = X
*
* // *** eg: A segment of input file is as follows
*
* " record 1792: I found this bug very interesting and
* I have completely read about it. record 1793: This bug
* can be solved easily record 1794: This ."
*
* delimiter = "record";
*
* supposing:- String at the end of buffer =
* "I found this bug very interesting and I have completely re"
* There for next buffer = "ad about it. record 179 ...."
*
* The matching characters in the input
* buffer tail and delimiter head = "re"
* Therefore, ambiguous byte count = 2 **** //
*
* 2.1 If the following bytes are the remaining characters of
* the delimiter, then we have to capture only up to the starting
* position of delimiter. That means, we need not include the
* ambiguous characters in str.
*
* 2.2 If the following bytes are not the remaining characters of
* the delimiter ( as mentioned in the example ),
* then we have to include the ambiguous characters in str.
*/
if(flag){
str.clear();
}
int txtLength = 0; // tracks str.getLength(), as an optimization
long bytesConsumed = 0;
int delPosn = 0;
int ambiguousByteCount=0; // To capture the ambiguous characters count
do {
int startPosn = bufferPosn; // Start from previous end position
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
if (bufferLength <= 0) {
if (ambiguousByteCount > 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
bytesConsumed += ambiguousByteCount;
}
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) {
if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
delPosn++;
if (delPosn >= recordDelimiterBytes.length) {
bufferPosn++;
break;
}
} else if (delPosn != 0) {
bufferPosn--;
delPosn = 0;
}
}
int readLength = bufferPosn - startPosn;
bytesConsumed += readLength;
int appendLength = readLength - delPosn;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
bytesConsumed += ambiguousByteCount;
if (appendLength >= 0 && ambiguousByteCount > 0) {
//appending the ambiguous characters (refer case 2.2)
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
ambiguousByteCount = 0;
// since it is now certain that the split did not split a delimiter we
// should not read the next record: clear the flag otherwise duplicate
// records could be generated
unsetNeedAdditionalRecordAfterSplit();
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
if (bufferPosn >= bufferLength) {
if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
ambiguousByteCount = delPosn;
bytesConsumed -= ambiguousByteCount; //to be consumed in next
}
}
} while (delPosn < recordDelimiterBytes.length
&& bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
}
return (int) bytesConsumed;
}
/**
* Read a line terminated by a custom delimiter.
*/
private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
/* We‘re reading data from inputStream, but the head of the stream may be
* already captured in the previous buffer, so we have several cases:
*
* 1. The buffer tail does not contain any character sequence which
* matches with the head of delimiter. We count it as a
* ambiguous byte count = 0
*
* 2. The buffer tail contains a X number of characters,
* that forms a sequence, which matches with the
* head of delimiter. We count ambiguous byte count = X
*
* // *** eg: A segment of input file is as follows
*
* " record 1792: I found this bug very interesting and
* I have completely read about it. record 1793: This bug
* can be solved easily record 1794: This ."
*
* delimiter = "record";
*
* supposing:- String at the end of buffer =
* "I found this bug very interesting and I have completely re"
* There for next buffer = "ad about it. record 179 ...."
*
* The matching characters in the input
* buffer tail and delimiter head = "re"
* Therefore, ambiguous byte count = 2 **** //
*
* 2.1 If the following bytes are the remaining characters of
* the delimiter, then we have to capture only up to the starting
* position of delimiter. That means, we need not include the
* ambiguous characters in str.
*
* 2.2 If the following bytes are not the remaining characters of
* the delimiter ( as mentioned in the example ),
* then we have to include the ambiguous characters in str.
*/
str.clear();
int txtLength = 0; // tracks str.getLength(), as an optimization
long bytesConsumed = 0;
int delPosn = 0;
int ambiguousByteCount=0; // To capture the ambiguous characters count
do {
int startPosn = bufferPosn; // Start from previous end position
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
if (bufferLength <= 0) {
if (ambiguousByteCount > 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
bytesConsumed += ambiguousByteCount;
}
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) {
if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
delPosn++;
if (delPosn >= recordDelimiterBytes.length) {
bufferPosn++;
break;
}
} else if (delPosn != 0) {
bufferPosn--;
delPosn = 0;
}
}
int readLength = bufferPosn - startPosn;
bytesConsumed += readLength;
int appendLength = readLength - delPosn;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
bytesConsumed += ambiguousByteCount;
if (appendLength >= 0 && ambiguousByteCount > 0) {
//appending the ambiguous characters (refer case 2.2)
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
ambiguousByteCount = 0;
// since it is now certain that the split did not split a delimiter we
// should not read the next record: clear the flag otherwise duplicate
// records could be generated
unsetNeedAdditionalRecordAfterSplit();
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
if (bufferPosn >= bufferLength) {
if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
ambiguousByteCount = delPosn;
bytesConsumed -= ambiguousByteCount; //to be consumed in next
}
}
} while (delPosn < recordDelimiterBytes.length
&& bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
}
return (int) bytesConsumed;
}
/**
* Read from the InputStream into the given Text.
* @param str the object to store the given line
* @param maxLineLength the maximum number of bytes to store into str.
* @return the number of bytes read including the newline
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str, int maxLineLength) throws IOException {
return readLine(str, maxLineLength, Integer.MAX_VALUE);
}
/**
* Read from the InputStream into the given Text.
* @param str the object to store the given line
* @return the number of bytes read including the newline
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
protected int getBufferPosn() {
return bufferPosn;
}
protected int getBufferSize() {
return bufferSize;
}
protected void unsetNeedAdditionalRecordAfterSplit() {
// needed for custom multi byte line delimiters only
// see MAPREDUCE-6549 for details
}
}
5,几个打酱油的类:
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SplitLineReader extends LineReader {
public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
super(in, recordDelimiterBytes);
}
public SplitLineReader(InputStream in, Configuration conf,
byte[] recordDelimiterBytes) throws IOException {
super(in, conf, recordDelimiterBytes);
}
public boolean needAdditionalRecordAfterSplit() {
return false;
}
}
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
/**
* Line reader for compressed splits
*
* Reading records from a compressed split is tricky, as the
* LineRecordReader is using the reported compressed input stream
* position directly to determine when a split has ended. In addition the
* compressed input stream is usually faking the actual byte position, often
* updating it only after the first compressed block after the split is
* accessed.
*
* Depending upon where the last compressed block of the split ends relative
* to the record delimiters it can be easy to accidentally drop the last
* record or duplicate the last record between this split and the next.
*
* Split end scenarios:
*
* 1) Last block of split ends in the middle of a record
* Nothing special that needs to be done here, since the compressed input
* stream will report a position after the split end once the record
* is fully read. The consumer of the next split will discard the
* partial record at the start of the split normally, and no data is lost
* or duplicated between the splits.
*
* 2) Last block of split ends in the middle of a delimiter
* The line reader will continue to consume bytes into the next block to
* locate the end of the delimiter. If a custom delimiter is being used
* then the next record must be read by this split or it will be dropped.
* The consumer of the next split will not recognize the partial
* delimiter at the beginning of its split and will discard it along with
* the next record.
*
* However for the default delimiter processing there is a special case
* because CR, LF, and CRLF are all valid record delimiters. If the
* block ends with a CR then the reader must peek at the next byte to see
* if it is an LF and therefore part of the same record delimiter.
* Peeking at the next byte is an access to the next block and triggers
* the stream to report the end of the split. There are two cases based
* on the next byte:
*
* A) The next byte is LF
* The split needs to end after the current record is returned. The
* consumer of the next split will discard the first record, which
* is degenerate since LF is itself a delimiter, and start consuming
* records after that byte. If the current split tries to read
* another record then the record will be duplicated between splits.
*
* B) The next byte is not LF
* The current record will be returned but the stream will report
* the split has ended due to the peek into the next block. If the
* next record is not read then it will be lost, as the consumer of
* the next split will discard it before processing subsequent
* records. Therefore the next record beyond the reported split end
* must be consumed by this split to avoid data loss.
*
* 3) Last block of split ends at the beginning of a delimiter
* This is equivalent to case 1, as the reader will consume bytes into
* the next block and trigger the end of the split. No further records
* should be read as the consumer of the next split will discard the
* (degenerate) record at the beginning of its split.
*
* 4) Last block of split ends at the end of a delimiter
* Nothing special needs to be done here. The reader will not start
* examining the bytes into the next block until the next record is read,
* so the stream will not report the end of the split just yet. Once the
* next record is read then the next block will be accessed and the
* stream will indicate the end of the split. The consumer of the next
* split will correctly discard the first record of its split, and no
* data is lost or duplicated.
*
* If the default delimiter is used and the block ends at a CR then this
* is treated as case 2 since the reader does not yet know without
* looking at subsequent bytes whether the delimiter has ended.
*
* NOTE: It is assumed that compressed input streams *never* return bytes from
* multiple compressed blocks from a single read. Failure to do so will
* violate the buffering performed by this class, as it will access
* bytes into the next block after the split before returning all of the
* records from the previous block.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class CompressedSplitLineReader extends SplitLineReader {
SplitCompressionInputStream scin;
private boolean usingCRLF;
private boolean needAdditionalRecord = false;
private boolean finished = false;
public CompressedSplitLineReader(SplitCompressionInputStream in,
Configuration conf,
byte[] recordDelimiterBytes)
throws IOException {
super(in, conf, recordDelimiterBytes);
scin = in;
usingCRLF = (recordDelimiterBytes == null);
}
@Override
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
throws IOException {
int bytesRead = in.read(buffer);
// If the split ended in the middle of a record delimiter then we need
// to read one additional record, as the consumer of the next split will
// not recognize the partial delimiter as a record.
// However if using the default delimiter and the next character is a
// linefeed then next split will treat it as a delimiter all by itself
// and the additional record read should not be performed.
if (inDelimiter && bytesRead > 0) {
if (usingCRLF) {
needAdditionalRecord = (buffer[0] != ‘\n‘);
} else {
needAdditionalRecord = true;
}
}
return bytesRead;
}
@Override
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
int bytesRead = 0;
if (!finished) {
// only allow at most one more record to be read after the stream
// reports the split ended
if (scin.getPos() > scin.getAdjustedEnd()) {
finished = true;
}
bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
}
return bytesRead;
}
@Override
public boolean needAdditionalRecordAfterSplit() {
return !finished && needAdditionalRecord;
}
}
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.Text;
/**
* SplitLineReader for uncompressed files.
* This class can split the file correctly even if the delimiter is multi-bytes.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class UncompressedSplitLineReader extends SplitLineReader {
private boolean needAdditionalRecord = false;
private long splitLength;
/** Total bytes read from the input stream. */
private long totalBytesRead = 0;
private boolean finished = false;
private boolean usingCRLF;
public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf,
byte[] recordDelimiterBytes, long splitLength) throws IOException {
super(in, conf, recordDelimiterBytes);
this.splitLength = splitLength;
usingCRLF = (recordDelimiterBytes == null);
}
@Override
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
throws IOException {
int maxBytesToRead = buffer.length;
if (totalBytesRead < splitLength) {
maxBytesToRead = Math.min(maxBytesToRead,
(int)(splitLength - totalBytesRead));
}
int bytesRead = in.read(buffer, 0, maxBytesToRead);
// If the split ended in the middle of a record delimiter then we need
// to read one additional record, as the consumer of the next split will
// not recognize the partial delimiter as a record.
// However if using the default delimiter and the next character is a
// linefeed then next split will treat it as a delimiter all by itself
// and the additional record read should not be performed.
if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) {
if (usingCRLF) {
needAdditionalRecord = (buffer[0] != ‘\n‘);
} else {
needAdditionalRecord = true;
}
}
if (bytesRead > 0) {
totalBytesRead += bytesRead;
}
return bytesRead;
}
@Override
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
int bytesRead = 0;
if (!finished) {
// only allow at most one more record to be read after the stream
// reports the split ended
if (totalBytesRead > splitLength) {
finished = true;
}
bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
}
return bytesRead;
}
@Override
public boolean needAdditionalRecordAfterSplit() {
return !finished && needAdditionalRecord;
}
@Override
protected void unsetNeedAdditionalRecordAfterSplit() {
needAdditionalRecord = false;
}
}
标签:
原文地址:http://blog.csdn.net/young_so_nice/article/details/51334294