码迷,mamicode.com
首页 > 其他好文 > 详细

Kafka 源代码分析之ByteBufferMessageSet

时间:2016-08-04 19:20:30      阅读:184      评论:0      收藏:0      [点我收藏+]

标签:

这里分析一下message的封装类ByteBufferMessageSet类

ByteBufferMessageSet类的源代码在源代码目录message目录下.这个类主要封装了message,messageset,messageandoffset等类的对象.在Log类中读写log的时候基本上都是以这个类的对象为基本操作对象的.

下面看看类的具体代码.首先是初始化部分.

class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging { //这里作为入口传入ByteBuffer类型的buffer
  private var shallowValidByteCount = -1  //设定了全局的验证字节统计变量.
  //这里是几个不同的构造函数.但是都是以最初的构造函数为基础的.这里调用了BBMS对象的create方法创建新buffer.
  def this(compressionCodec: CompressionCodec, messages: Message*) {    //参数是压缩标志和message
    this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*))
  }
  //同上.只是参数不同而已.
  def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) {
    this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, messages:_*))
  }
 //最简单的消息构造函数.
  def this(messages: Message*) {
    this(NoCompressionCodec, new AtomicLong(0), messages: _*)
  }

  上面是几个不同的构造函数部分.构造函数应用到的ByteBufferMessageSet.create函数代码如下.

object ByteBufferMessageSet {
  //创建函数的参数是offset,压缩,message.返回的是bytebuffer类型.
  private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = {
    if(messages.size == 0) {  //消息为空的话就返回一个空的buffer
      MessageSet.Empty.buffer
    } else if(compressionCodec == NoCompressionCodec) {  //没有压缩的话.就分配一个新的messages大小的buffer.
      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
      for(message <- messages)
        writeMessage(buffer, message, offsetCounter.getAndIncrement) //在这里将消息内容写入buffer中.
      buffer.rewind() //将指针设置到开始位置.
      buffer //返回buffer
    } else { //这里则是启用压缩的处理动作了.
      val byteArrayStream = new ByteArrayOutputStream(MessageSet.messageSetSize(messages)) //获取一个messages大小的字节流对象
      val output = new DataOutputStream(CompressionFactory(compressionCodec, byteArrayStream)) //创建写入对象.
      var offset = -1L
      try {
        for(message <- messages) { //将消息压缩写入字节流对象中去.
          offset = offsetCounter.getAndIncrement  
          output.writeLong(offset)
          output.writeInt(message.size)
          output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
        }
      } finally {
        output.close()
      }
      val bytes = byteArrayStream.toByteArray //转成字节
      val message = new Message(bytes, compressionCodec) //创建一个压缩的消息对象.
      val buffer = ByteBuffer.allocate(message.size + MessageSet.LogOverhead) /分配一个buffer
      writeMessage(buffer, message, offset) //将压缩过的message写入到buffer.
      buffer.rewind()
      buffer
    }
  }
  
  def decompress(message: Message): ByteBufferMessageSet = {
    val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream
    val inputStream: InputStream = new ByteBufferBackedInputStream(message.payload)
    val intermediateBuffer = new Array[Byte](1024)
    val compressed = CompressionFactory(message.compressionCodec, inputStream)
    try {
      Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
        outputStream.write(intermediateBuffer, 0, dataRead)
      }
    } finally {
      compressed.close()
    }
    val outputBuffer = ByteBuffer.allocate(outputStream.size)
    outputBuffer.put(outputStream.toByteArray)
    outputBuffer.rewind
    new ByteBufferMessageSet(outputBuffer)
  }
    //将message写入buffer
  private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
    buffer.putLong(offset) //先写入offset 之后是size,最后是消息本身.
    buffer.putInt(message.size)
    buffer.put(message.buffer)
    message.buffer.rewind()
  }
}

   上面就是关于初始化部分做的工作.下面看看具体的函数.这个函数是作为核心函数.messages映射成对应的messageandoffset就在这里实现.被外部调用的好几个函数也都是依赖这个函数实现.

override def iterator: Iterator[MessageAndOffset] = internalIterator()

  /** iterator over compressed messages without decompressing */
  def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true)

  /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/
  private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { //可以看见返回的是一个迭代对象,对象类想是messageandoffset
    new IteratorTemplate[MessageAndOffset] {  //这是一个抽象类.在这个地方返回了一个匿名类的实例.
      var topIter = buffer.slice()  //复制一份buffer
      var innerIter: Iterator[MessageAndOffset] = null

      def innerDone():Boolean = (innerIter == null || !innerIter.hasNext)

      def makeNextOuter: MessageAndOffset = { //这里主要把没个message映射成messageandoffset
        // if there isn‘t at least an offset and size, we are done
        if (topIter.remaining < 12)  
          return allDone()
        val offset = topIter.getLong()
        val size = topIter.getInt()
        if(size < Message.MinHeaderSize)
          throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
        
        // we have an incomplete message
        if(topIter.remaining < size)
          return allDone()
          //上面这些主要是检查啥的.不细说了.主要映射在下面完成.
        // read the current message and check correctness
        val message = topIter.slice()  
        message.limit(size) //通过size截取第一个message
        topIter.position(topIter.position + size) //将指针往前移.
        val newMessage = new Message(message) //构建新message

        if(isShallow) {
          new MessageAndOffset(newMessage, offset) //做映射返回.
        } else { //剩下是判断是否有压缩的情况.
          newMessage.compressionCodec match {
            case NoCompressionCodec =>
              innerIter = null
              new MessageAndOffset(newMessage, offset)
            case _ =>
              innerIter = ByteBufferMessageSet.decompress(newMessage).internalIterator() //有压缩用对象里的方法处理.
              if(!innerIter.hasNext)
                innerIter = null
              makeNext()  
          }
        }
      }
     //在这个函数里调用映射函数.最后这个函数有next函数调用.
      override def makeNext(): MessageAndOffset = {
        if(isShallow){
          makeNextOuter
        } else {
          if(innerDone())
            makeNextOuter
          else
            innerIter.next
        }
      }
      
    }
  }

  以上就是一个核心函数的处理过程.看看对应的抽象类IteratorTemplate的定义.

abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] {
  
  private var state: State = NOT_READY
  private var nextItem = null.asInstanceOf[T]

  def next(): T = {
    if(!hasNext())
      throw new NoSuchElementException()
    state = NOT_READY
    if(nextItem == null)
      throw new IllegalStateException("Expected item but none found.")
    nextItem
  }
  
  def peek(): T = {
    if(!hasNext())
      throw new NoSuchElementException()
    nextItem
  }
  
  def hasNext(): Boolean = {
    if(state == FAILED)
      throw new IllegalStateException("Iterator is in failed state")
    state match {
      case DONE => false
      case READY => true
      case _ => maybeComputeNext()
    }
  }
  
  protected def makeNext(): T
  
  def maybeComputeNext(): Boolean = {
    state = FAILED
    nextItem = makeNext()
    if(state == DONE) {
      false
    } else {
      state = READY
      true
    }
  }
  
  protected def allDone(): T = {
    state = DONE
    null.asInstanceOf[T]
  }
  
  def remove = 
    throw new UnsupportedOperationException("Removal not supported")

  protected def resetState() {
    state = NOT_READY
  }
}

  定义很简单直白.

 

Kafka 源代码分析之ByteBufferMessageSet

标签:

原文地址:http://www.cnblogs.com/cloud-zhao/p/5737711.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!