Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log
Offset MessageSize Message
需要注意的是,在Kafka的文档以及源码中,消息(Message)并不包括它的offset。Kafka的log是由一条一条的记录构成的,Kafka并没有给这种记录起个专门的名字,但是需要记住的是这个“记录”并不等于"Message"。Offset MessageSize Message加在一起,构成一条记录。而在Kafka Protocol中,Message具体的格式为
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
Field |
Description |
Attributes |
This byte holds metadata attributes about the message. The lowest 2 bits contain the compression codec used for the message. The other bits should be set to 0. |
Crc |
The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer. |
Key |
The key is an optional message key that was used for partition assignment. The key can be null. |
MagicByte |
This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 0. |
Offset |
This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn‘t actually know the offset and can fill in any value here it likes. |
Value |
The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null.
之所以要强调记录与Message的区别,是为了更好地理解MessageSet的概念。Kafka protocol里对于MessageSet的定义是这样的
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
也就是说MessageSet是由多条记录组成的,而不是消息,这就决定了一个MessageSet实际上不需要借助其它信息就可以从它对应的字节流中切分出消息,而这决定了更重要的性质:Kafka的压缩是以MessageSet为单位的。而以MessageSet为单位压缩,决定了对于压缩后的MessageSet,不需要在它的外部记录这个MessageSet的结构,也就决定了Kafka的消息是可以递归包含的,也就是前边"value"字段的说明“Kafka supports recursive messages in which case this may itself contain a message set"。
if(isShallow) { //是否要进行深层迭代 new MessageAndOffset(newMessage, offset) } else { //如果要深层迭代的话 newMessage.compressionCodec match { case NoCompressionCodec => innerIter = null new MessageAndOffset(newMessage, offset) //如果这个Message没有压缩,就直接把它作为一个Message返回 case _ => innerIter = ByteBufferMessageSet.deepIterator(newMessage) //如果这个Message采用了压缩,就对它进行深层迭代 if(!innerIter.hasNext) innerIter = null makeNext() } }
而ByteBufferMessageSet的deepIterator方法就是对这个Message的value进行解压,然后从中按照Offset MessageSize Message的格式读取一条条记录,对于这次读取的Message,就不再进行深层迭代了。下面是deepIterator的makeNext方法,它被不断调用以生成迭代器的元素
override def makeNext(): MessageAndOffset = { try { // read the offset val offset = compressed.readLong() // read record size val size = compressed.readInt() if (size < Message.MinHeaderSize) throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator") // read the record into an intermediate record buffer // and hence has to do extra copy val bufferArray = new Array[Byte](size) compressed.readFully(bufferArray, 0, size) val buffer = ByteBuffer.wrap(bufferArray) val newMessage = new Message(buffer) // the decompressed message should not be a wrapper message since we do not allow nested compression new MessageAndOffset(newMessage, offset) } catch { case eofe: EOFException => compressed.close() allDone() case ioe: IOException => throw new KafkaException(ioe) } }
至于一个MessageSet中不能包含多个压缩后的Message(压缩后的Message也就是以压缩后的MessageSet作为value的Message),Kafka Protocol中是这么说的
The outer MessageSet should contain only one compressed "Message" (see KAFKA-1718 for details).
报各这个问题的人是Go语言client的作者,他发现自己发的Message明显没有过大,但是发生了MessageSizeTooLargeException。后来跟其它人讨论,发现是因为broker端在调用Log.append时,会把传送给这个方法的MessageSet解压开,然后再组合成一个压缩后的MessageSet(ByteBufferMessageSet)。而Go语言的客户端发送的MessageSet中包含了多个压缩后的Message,这样即使发送时的Message不会超过message.max.bytes的限制,但是broker端再次生成的Message就超过了这个限制。所以,Kafka Protocol对这种情况做了特殊说明:The outer MessageSet should contain only one compressed "Message"。
这个offset的值只有两种可能:1, 被压缩的MessageSet里Message的最大offset; 2, 被压缩的MessageSet里Message的最小offset.
由于FetchRequest协议中的offset是要求broker提供大于等于这个offset的消息,因此broker会检查log,找到符合条件的,然后传输出去。那么由于FetchRequest中的offset位置的消息可位于一个compressed message中,所以broker需要确定一个compressed Message是否需要被包含在respone中。
在处理FetchRequest时,broker的逻辑也正是如此。对FetchRequest的处理会调用到Log#read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None)方法,然后调用到LogSegment的read方法,它的之后的调用有很多,所有不贴代码了,它的注释说明了读取的逻辑
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified
即,返回的MessageSet的第一条Message的offset >= startOffset。
而在broker给compressed Message赋予offset时,其逻辑也是赋予其包含的messages中的最大offset。这段逻辑在ByteBufferMessageSet的create方法中:
messageWriter.write(codec = compressionCodec) { outputStream => val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) //创建压缩流 try { for (message <- messages) { offset = offsetCounter.getAndIncrement //offsetCounter是一个AtomicLong,使用它的当前值作为这条Message的offset,然后+1作为下一条消息的offset output.writeLong(offset)//写入这条日志记录的offset output.writeInt(message.size)//写入这条日志记录的大小 output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) //写入这条记录的Message } } finally { output.close() } } val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead) writeMessage(buffer, messageWriter, offset)//以最后一个Message的offset作为这个compressed Message的offset
在broker把收到的producer request里的MessageSet append到Log之前,以及consumer和follower获取消息之后,都需要进行校验。
1. broker和consumer把收到的消息append到log之前
2. consumser收到消息后
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo
在replica的fetcher线程调用append方法时,会把assignOffsets设成false,而leader处理produce request时,会把assignOffsets设成true。
val appendInfo = analyzeAndValidateMessageSet(messages) //验证消息 // if we have any valid messages, append them to the log if(appendInfo.shallowCount == 0) return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log var validMessages = trimInvalidBytes(messages, appendInfo)//trim掉不可用的部分或者残缺的消息 try { // they are valid, insert them in the log lock synchronized { appendInfo.firstOffset = nextOffsetMetadata.messageOffset if(assignOffsets) { //如果需要重新赋予offset // assign offsets to the message set val offset = new AtomicLong(nextOffsetMetadata.messageOffset) try { validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact) //验证消息并且赋予offset } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log ‘%s‘".format(name), e) } appendInfo.lastOffset = offset.get - 1 } else { // we are taking the offsets we are given if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset) throw new IllegalArgumentException("Out of order offsets found in " + messages) } // re-validate message sizes since after re-compression some may exceed the limit 对压缩后消息重新验证MessageSize是否超过了允许的最大值 for(messageAndOffset <- validMessages.shallowIterator) { if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) { // we record the original message set size instead of trimmed size // to be consistent with pre-compression bytesRejectedRate recording BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) } }
def computeChecksum(): Long =
CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset)
def entrySize(message: Message): Int = LogOverhead + message.size --------------------------------- val MessageSizeLength = 4 val OffsetLength = 8 val LogOverhead = MessageSizeLength + OffsetLength
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
config.compressionType就是broker配置里的compression.type的值,如果它是“producer", 就会使用producer request使用压缩方式,否则就使用config.compressionType指明的压缩方式。注意如果一个MessageSet里的Message采用了不同的压缩方式,最后被当成sourceCodec的是最后一个压缩了的消息的压缩方式。
只有leader处理produce request时,会调用ByteBufferMessageSet的这个方法。 它不会检测analyzeAndValidateMessageSet已经检测的内容,但是会把这个MessageSet进行深度遍历(即如果它里边的消息是压缩后,就把这个消息解压开再遍历),这样它就能做analyzeAndValidateMessageSet不能进行的检测:对于compacted topic检测其key是否为空,如果为空就抛出InvalidMessageException。
def makeNextOuter: MessageAndOffset = { // if there isn‘t at least an offset and size, we are done if (topIter.remaining < 12) return allDone() val offset = topIter.getLong() val size = topIter.getInt() if(size < Message.MinHeaderSize) throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator") // we have an incomplete message if(topIter.remaining < size) return allDone() . ... }
public static final String CHECK_CRCS_CONFIG = "check.crcs"; private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";
config的文档说,检查checksum是为了"ensures no on-the-wire or on-disk corruption to the message occurred."即,为了保证没有在网络传输出或者磁盘存储时出现了消息的损坏。但是checksum计算时会带来开销,所以追求最佳性能,可以关掉checksum的检查。
前边提到了,在leader收到ProduceRequet之后,它会解压开compressed message(也就是是这个KIP里的compressed messageset,这两说说法的确有些乱),然后给里边包含的message set的每条消息重新赋予offset。这个做法也是应该的,乍一看也没什么不好。但是问题在于,不仅是直接改个offset这么简单,在改完之后,需要重新压缩这些消息,还要计算。这么一搞,开销就大了。KIP-31就是想把这部分的性能损失降下来。(这个KIP已经是accepted状态)
做法是把在一个compressed message set里边的每个message的offset里记下当前message相对于外层的wrapper message的偏移。用汉语说这个意思比较费劲,KIP里这么说
When the producer compresses a message, write the relative offset value in the raw message‘s offset field. Leave the wrapped message‘s offset blank.
When broker receives a compressed message, it only needs to
- Decompress the message to verify the CRC and relative offset.
- Set outer message‘s base offset. The outer message‘s base offset will be the offset of the last inner message. (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)
注意,这个wrapper message里记的base offset, 是它所含的message set里的最后一个message的offset。这个和当前的compressed message的offset是一致的。
This KIP tries to address the following issues in Kafka.
- Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
- Log rolling might break for a newly created replica as well because of the same reason as (1).
- Some use cases such as streaming processing needs a timestamp in messages.
1. Log retention会不靠谱。当前log retention是在log segment层面做的,是按照log segment的最后修改时间确定是否要删除一个log segment. 但是,当replica重分配发生时,新被分配的这个replica的log segment的修改时间会被设成当前时间。这么一来,它就不能被按照log retention想要做的那样(实际上是想把一段时间之前的消息删除)被删除。
2. 由于和1同样的原因,对于一个新创建的replica(意思应该是移动位置的replica, 并不是增加分区后新加的replica)log rolling有时候也会不靠谱。
3. 有些场景中需要消息含有时间戳,比如流处理。
之前关于这个KIP的讨论主要是关于使用哪个时间, 是使用LogAppendTime(broker time),还是CreateTime(application time)。
The good things about LogAppendTime are: 使用LogAppendTime的好处在于
The good things about CreateTime are: 使用CreateTime的好处是
在俺看来,这两个选择的确挺纠结的。用户肯定是想用自己产生消息的时间,不然很难准确地找到一条消息。但是,如果使用用户指定的时间,broker端的行为就变得复杂了,比如,如果用户指定的时间不是单调递增的,该怎么建时间索引。但是用户产生畸形的时间,倒可以通过配置里max.message.time.difference.ms来控制。或许可以加另一个配置,允许broker在一定范围内修改CreateTime,比如最多可以更改1000ms。这样就能即使消息的timestamp单调增长,也能使用户对消息的时间的估计比较准确。不过,这样可能就需要让broker time的含义变成broker收到消息时间,而不是append到log的时间。否则就难以确定何时该拒绝无法在指定范围内修改timestamp的消息。
当前按照时间戳查找offset得到的结果是非常粗粒度的,只能在log segment的级别。(对于reassigned replica就差得没谱了。)所以这个KIP提议建一个基于时间的对日志的索引,来允许按timestamp搜索消息的结果更准确。