码迷,mamicode.com
首页 > 其他好文 > 详细

DelayedOperationPurgatory(1)

时间:2015-12-19 17:57:43      阅读:156      评论:0      收藏:0      [点我收藏+]

标签:

purgatory就是炼狱的意思。

当一个DelayedOperation需要被delay时,它就被放到DelayedOperationPurgatory,相当于进行一个等待池。

上一篇blog提到过,DelayedOperation想要摆脱delay状态,需要由事件来触发对它状态的检查,或者是超时时间到了。它一个事件发生时,比如一个Partiton有新的消息写入,那么跟这个Partition有关的FetchRequest就都得检查一遍。

直观的,我们需要一个key绑定到DelayedOperation上,来说明这个DelayedOperation会由哪些事件触发,而且一个DelayedOperation可以绑定到多个key, 一个key也可能跟多个DelayedOperation有关。所以这是一个多对多的映射,一边是事件,一边是DelayedOperation。下面来看一下Kafka用何种数据结构来保存这种映射关系。

DelayedOperation pool

DelayedOperationPurgatory就提供了这种功能。它把这种映射保存到一个Pool里。这个Pool的核心是一个ConcurrentHashMap.

 private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))

Pool构造器的参数是一个value factory,即当这个pool里一个key没有value时,就用这个函数生成一个value。所以,对于value为空的key,这个pool就会构造一个watch这个key的Watchers.(这个构造器含义可以不必搞清楚)

Watchers

这个Pool的key是Any类型,也就是Scala里所有对象的基类,value是Watchers。注释里这么描述Watchers的

A linked list of watched delayed operations based on some key

Watchers把这些DelayedOperation保存在自己的一个instance field里

 private[this] val operations = new LinkedList[T]()

由于Watchers是DelayedOperationPurgatory的内部类,T就源于DelayedOperationPurgatory的签名

class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000)
        extends Logging with KafkaMetricsGroup 

所以operations这个域里保存的就是一个DelayedOperation的列表。

所以,DelayedOperationPurgatory用来保存DelayedOperation的数据结构是一个Map[Any, List[DelayedOpertion]]的结构。

对于Watchers来说,有两件事要做

  • 当它关注的key有事件发生时,需要调用它的方法来遍历operations,找出其中可以被complete(即不再被delay)的operation。这个方法就是tryCompleteWatched
  • 由于一个DelayedOperation可以对应多个key,所以当这个Watchers对应的key没有被触发,它保存的operations里的元素仍然可能由于其它的key触发而而被complete。所以外界需要能主动地检测这个Watchers里的哪些operation已经被complete了,并且移除这些元素。这个方法就是purgeCompleted

DelayedOperationPurgatory的运转

 1. 在一个DelayedOperation被生成后,它被使用DelayedOperationPurgatory的

def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean

方法,把自己和自己想要watch的keys传给purgatory。

2. 当有事件发生时,比如在append消息到leader之后,调用DelayedOperationPurgatory的

def checkAndComplete(key: Any): Int

方法,来尝试complete watch这个key的那些DelayedOperation。

3. 由于即使一个DelayedOperation被complete,然后从一个key对应的Watches里被移除,但是它可能还在其它的key的Watchers里,这些DelayedOperation事实上已是垃圾,还在占用内存。因此有一个线程专门从所有的Watches里移除已经complete的那些。同时这个线程也负责清理timeout的DelayedOperation, 这个线程就是ExpiredOperationReaper。

 

DelayedOperationPurgatory(1)

标签:

原文地址:http://www.cnblogs.com/devos/p/5055656.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!