标签:
DelayedOperation包括两种:DelayedFetch和DelayedProduce,它们的存在是由Kafka Protocol决定的,而Kafka Protocol是由实际需求决定的……
存在DelayedFetch是为了更有效率的fetch,也就是batch fetch;存在DelayedProduce是为了等待更多副本的写入,以达到用户指定的持久性保证(也就是消息更不容易丢)。
对于这些DelayedOperation而言,什么时候不再需要delay是必须指明的,跟据操作的不同,delay被满足的条件有所不同,但也有共通的,比如max wait time。
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, correlationId: Int = FetchRequest.DefaultCorrelationId, clientId: String = ConsumerConfig.DefaultClientId, replicaId: Int = Request.OrdinaryConsumerId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) extends RequestOrResponse(Some(RequestKeys.FetchKey)) { ... }
注意其中的两个参数, 按照Kafka Protocol的说明,它们的含义是这样的:
当一个FetchRequest由于这两个参数而需要等待时,Kafka就生成一个DelayedFetch对象,来表示‘delay‘的语义。
case class PartitionFetchInfo(offset: Long, fetchSize: Int) case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionFetchInfo) { ... } case class FetchMetadata(fetchMinBytes: Int, fetchOnlyLeader: Boolean, fetchOnlyCommitted: Boolean, isFromFollower: Boolean, fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) {... } class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) extends DelayedOperation(delayMs) { ... }
其中
总之,FetchRequest提供了关于原始的FetchRequest的信息,以及在FetchRequest处理的步骤中补充的一些信息。
case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { ... }
注意这里边的两个参数
class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) extends DelayedOperation(delayMs) { ... }
case class ProduceMetadata(produceRequiredAcks: Short, produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) { ... }
DelayedProduce基本和DelayedFetch是一个思中,由ProduceMetadata记录了一些加工后的信息,便于以后判断delay条件时使用。对于DelayedProduce,主要是在把消息集append到本地后,就可以获取append之后这个消息集的最后一条消息的offset,由此推出来当follower来请求到至少哪一条消息时,就说明这个replica已经拉取完了这个message set, 这个offset就是requiredOffset。
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) { ... }
对于上面的定义里,需要注意的一点时,在单个FetchRequest和ProduceRequest里,都可以produce以及fetch到之个partition。而delay参数,比如ack以及minBytes都是针对整个Request的。所以,对于ProduceRequest而言,只有它包括的所有partition的ISR都已经确认,这个Request也会整体不再被delay。而对于FetchRequest,只要总的bytes超过了minBytes,不管是不是所有Partition的数据都有,它都会不再被delay。
ProduceRequest的这种行为是由于对于它的响应只会有一个,所以需要等所有partition都处理完了,再能给出这个响应。这个对于异步的Producer也是一样。如果需要允许确认一部分Partition就返回响应,那么server和client端的处理就会麻烦很多。
而FetchRequest的这种行为使得无法保证fetch到的数据中不同partition的比例,可能并不是特别好。但是大部分情况下,还都是只针对一个Partition发一个FetchRequest,所以不会受到影响。
总之,这部分的Kafka Protocol设计得并不太完美,但的确比较简洁,算是有得有失吧。
在看源码之前,可以先大致想一下DelayedOperation的通用处理过程,这样会更容易理解源码中的定义。
首先,这两种Request都有max wait时间,为此需要定期检查被delayed的这些request是否超时(expire)了,为了做到这一点,需要一个定时任务。
除了时间触发之外,对DelayedOperation的delay条件是否满足的检查还有一些是事件触发的,比如,对于DelayedProduce,当一个replica发来fetch请求时,leader就获取了replica拉取进度的新信息,因此就需要检查下DelayedProduce是否可以被满足(可以被满足就是说这个DelayedOperation应该摆脱delay状态)。同样的,对于DelayedFetch,当有新的producer request里的消息append到leader,就需要检查下处于delayed状态的DelayedFetch请求是否可以被满足。因此,DelayedOperation必须有接口可以被调用以检查是否delay条件被满足了。
此外,DelayedOperation应该指明在delay条件满足之后应该怎么做,比如此时应该发送response;以及应该在expire之后应该怎么做。
因此,它大概需要以下方法:
源码中的这个接口的定义有些混乱,因为有些方法的定义跟具体的处理过程紧密相关。但是也是有一些情况是上面没有考虑到的,比如同步,比如在源码的DelayedOperation接口的forceComplete方法中
private val completed = new AtomicBoolean(false) /* * Force completing the delayed operation, if not already completed. * This function can be triggered when * * 1. The operation has been verified to be completable inside tryComplete() * 2. The operation has expired and hence needs to be completed right now * * Return true iff the operation is completed by the caller: note that * concurrent threads can try to complete the same operation, but only * the first thread will succeed in completing the operation and return * true, others will still return false */ def forceComplete(): Boolean = { if (completed.compareAndSet(false, true)) { // cancel the timeout timer cancel() onComplete() true } else { false } }
在Kafka的架构中,有多个线程共同处理请求,因此可能会有多个线程同时检查delay条件以及在发现满足条件后执尝试行操作。但是对于onComplete这样的方法,在DelayedOperation的整个生命周期中,只允许被调用一次(因为这个方法会返回给client响应),因此需要对访问它的方法进行同步。而DelayedOperation约定onComplete方法只允许被forceComplete方法调用,因此在forceComplete方法中用AtomicBoolean的CAS操作构造了一段只被执行一次的代码。即,只允许把completed从false设成true的那个线程执行cancel和onComplete。由于CAS是原子操作,而且没有其它地方把它从true改成false,所以if里操作对于这个DelayedOperation,在其生命周期中只会被执行一次。
总的来说,这些DelayedOperation在超时后,就直接调forceComplete,然后调onExpiration(这个顺序有点乱,不过在DelayedFetch和DelayedProduce中,onExpiration都只是做簿记工作)。在由事件触发时,就调用tryComplete,如果在tryComplete中发现delay条件被满足,就调用forceComplete。
ReplicaManager之DelayedOperation
标签:
原文地址:http://www.cnblogs.com/devos/p/5049820.html