标签:
ByteBufferMessageSet类的源代码在源代码目录message目录下.这个类主要封装了message,messageset,messageandoffset等类的对象.在Log类中读写log的时候基本上都是以这个类的对象为基本操作对象的.
下面看看类的具体代码.首先是初始化部分.
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging { //这里作为入口传入ByteBuffer类型的buffer private var shallowValidByteCount = -1 //设定了全局的验证字节统计变量. //这里是几个不同的构造函数.但是都是以最初的构造函数为基础的.这里调用了BBMS对象的create方法创建新buffer. def this(compressionCodec: CompressionCodec, messages: Message*) { //参数是压缩标志和message this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*)) } //同上.只是参数不同而已. def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) { this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, messages:_*)) } //最简单的消息构造函数. def this(messages: Message*) { this(NoCompressionCodec, new AtomicLong(0), messages: _*) }
上面是几个不同的构造函数部分.构造函数应用到的ByteBufferMessageSet.create函数代码如下.
object ByteBufferMessageSet { //创建函数的参数是offset,压缩,message.返回的是bytebuffer类型. private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = { if(messages.size == 0) { //消息为空的话就返回一个空的buffer MessageSet.Empty.buffer } else if(compressionCodec == NoCompressionCodec) { //没有压缩的话.就分配一个新的messages大小的buffer. val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) for(message <- messages) writeMessage(buffer, message, offsetCounter.getAndIncrement) //在这里将消息内容写入buffer中. buffer.rewind() //将指针设置到开始位置. buffer //返回buffer } else { //这里则是启用压缩的处理动作了. val byteArrayStream = new ByteArrayOutputStream(MessageSet.messageSetSize(messages)) //获取一个messages大小的字节流对象 val output = new DataOutputStream(CompressionFactory(compressionCodec, byteArrayStream)) //创建写入对象. var offset = -1L try { for(message <- messages) { //将消息压缩写入字节流对象中去. offset = offsetCounter.getAndIncrement output.writeLong(offset) output.writeInt(message.size) output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) } } finally { output.close() } val bytes = byteArrayStream.toByteArray //转成字节 val message = new Message(bytes, compressionCodec) //创建一个压缩的消息对象. val buffer = ByteBuffer.allocate(message.size + MessageSet.LogOverhead) /分配一个buffer writeMessage(buffer, message, offset) //将压缩过的message写入到buffer. buffer.rewind() buffer } } def decompress(message: Message): ByteBufferMessageSet = { val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream val inputStream: InputStream = new ByteBufferBackedInputStream(message.payload) val intermediateBuffer = new Array[Byte](1024) val compressed = CompressionFactory(message.compressionCodec, inputStream) try { Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead => outputStream.write(intermediateBuffer, 0, dataRead) } } finally { compressed.close() } val outputBuffer = ByteBuffer.allocate(outputStream.size) outputBuffer.put(outputStream.toByteArray) outputBuffer.rewind new ByteBufferMessageSet(outputBuffer) } //将message写入buffer private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) { buffer.putLong(offset) //先写入offset 之后是size,最后是消息本身. buffer.putInt(message.size) buffer.put(message.buffer) message.buffer.rewind() } }
上面就是关于初始化部分做的工作.下面看看具体的函数.这个函数是作为核心函数.messages映射成对应的messageandoffset就在这里实现.被外部调用的好几个函数也都是依赖这个函数实现.
override def iterator: Iterator[MessageAndOffset] = internalIterator() /** iterator over compressed messages without decompressing */ def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true) /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/ private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { //可以看见返回的是一个迭代对象,对象类想是messageandoffset new IteratorTemplate[MessageAndOffset] { //这是一个抽象类.在这个地方返回了一个匿名类的实例. var topIter = buffer.slice() //复制一份buffer var innerIter: Iterator[MessageAndOffset] = null def innerDone():Boolean = (innerIter == null || !innerIter.hasNext) def makeNextOuter: MessageAndOffset = { //这里主要把没个message映射成messageandoffset // if there isn‘t at least an offset and size, we are done if (topIter.remaining < 12) return allDone() val offset = topIter.getLong() val size = topIter.getInt() if(size < Message.MinHeaderSize) throw new InvalidMessageException("Message found with corrupt size (" + size + ")") // we have an incomplete message if(topIter.remaining < size) return allDone() //上面这些主要是检查啥的.不细说了.主要映射在下面完成. // read the current message and check correctness val message = topIter.slice() message.limit(size) //通过size截取第一个message topIter.position(topIter.position + size) //将指针往前移. val newMessage = new Message(message) //构建新message if(isShallow) { new MessageAndOffset(newMessage, offset) //做映射返回. } else { //剩下是判断是否有压缩的情况. newMessage.compressionCodec match { case NoCompressionCodec => innerIter = null new MessageAndOffset(newMessage, offset) case _ => innerIter = ByteBufferMessageSet.decompress(newMessage).internalIterator() //有压缩用对象里的方法处理. if(!innerIter.hasNext) innerIter = null makeNext() } } } //在这个函数里调用映射函数.最后这个函数有next函数调用. override def makeNext(): MessageAndOffset = { if(isShallow){ makeNextOuter } else { if(innerDone()) makeNextOuter else innerIter.next } } } }
以上就是一个核心函数的处理过程.看看对应的抽象类IteratorTemplate的定义.
abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] { private var state: State = NOT_READY private var nextItem = null.asInstanceOf[T] def next(): T = { if(!hasNext()) throw new NoSuchElementException() state = NOT_READY if(nextItem == null) throw new IllegalStateException("Expected item but none found.") nextItem } def peek(): T = { if(!hasNext()) throw new NoSuchElementException() nextItem } def hasNext(): Boolean = { if(state == FAILED) throw new IllegalStateException("Iterator is in failed state") state match { case DONE => false case READY => true case _ => maybeComputeNext() } } protected def makeNext(): T def maybeComputeNext(): Boolean = { state = FAILED nextItem = makeNext() if(state == DONE) { false } else { state = READY true } } protected def allDone(): T = { state = DONE null.asInstanceOf[T] } def remove = throw new UnsupportedOperationException("Removal not supported") protected def resetState() { state = NOT_READY } }
定义很简单直白.
Kafka 源代码分析之ByteBufferMessageSet
标签:
原文地址:http://www.cnblogs.com/cloud-zhao/p/5737711.html