标签:
了解完Nimbus服务的启动细节后,我们将目光移到Nimbus的处理细节上,也就是nimbus.clj代码中的 service-handler [conf inimbus] 方法。
(defserverfn service-handler [conf inimbus]
;;调用 inimbus的prepare方法,
;;(master-inimbus-dir conf) -> $storm.local.dir/nimbus/inimbus
;; 如果不存在$storm.local.dir/nimbus/ 目录,则创建一个这样的目录
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
;; nimbus 是一个Map,包含了storm配置、INimbus实例、与集群内的zookeeper操作接口,不同topology的心跳缓存(Map),
;; executor分配器 :scheduler
(let [nimbus (nimbus-data conf inimbus)]
;;nimbus.topology.validator
(.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
;;清除那些zookeeper上保存的,而在 nimbus的本地目录/nimbus/stormdist没有发现对应的topolgy,清除操作包括
;; 1. 将topology对应的任务信息清除 ./assignments/$storm-id
;;2. 将topology对应的storm信息清除 ./storms/$storm-id
(cleanup-corrupt-topologies! nimbus)
;; 得到 zookeeper下 ./storms 所有的znode节点名称,遍历
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
;;这个过程不是很清楚,这个是怎样得到的,完成状态转化?
(transition! nimbus storm-id :startup))
;;在backtype/storm/timer.clj 中定义schedule-recurring
;; 添加一个定时执行的mk-assignments(如果nimbus.reassign为true)、do-cleanup任务
(schedule-recurring (:timer nimbus)
0
(conf NIMBUS-MONITOR-FREQ-SECS)
(fn []
(when (conf NIMBUS-REASSIGN)
(locking (:submit-lock nimbus)
(mk-assignments nimbus)))
;;do-cleanup清除废弃的topology
;;(jar文件,zookeeper上保存有关topology的信息,包括心跳记录,
;; errors记录以及Nimbus进程保存的该topology的心跳
(do-cleanup nimbus)
))
;; Schedule Nimbus inbox cleaner
(schedule-recurring (:timer nimbus)
0
(conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
(fn []
(clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
))
;; 返回一个service-handler实现
(reify Nimbus$Iface …)
)
在上面的代码中,(schedule-recurring (:timer nimbus) …) 定时执行mk-assignments和do-cleanup 这两个任务是如何实现的呢?
schedule-recurring 方法定义在time.clj中,本篇主要分析的也是这个文件。
在schedule-recurring中调用了schedule方法,首先来看一下schedule方法:
;;timer 是调控队列的一个守护线程实例
;;将afn方法加入到timer的等待队列中
;;队列元素为[执行时间,方法句柄,方法id]
(defnk schedule
[timer delay-secs afn :check-active true]
(when check-active (check-active! timer))
(let [id (uuid)
;; 取出timer控制的队列
^PriorityQueue queue (:queue timer)]
(locking (:lock timer)
;;将[执行时间,方法句柄,方法id] 信息添加到等待队列中
(.add queue [(+ (current-time-millis) (secs-to-millis-long delay-secs)) afn id]))))
在来分析一下schedule-recurring 方法:
(defn schedule-recurring
[timer delay-secs recur-secs afn]
(schedule timer
delay-secs
(fn this []
;; 执行业务逻辑
(afn)
; This avoids a race condition with cancel-timer.
(schedule timer recur-secs this :check-active false))))
在方法体中调用了 schedule方法,传入schedule 的afn参数是:
afn = (fn this []
;; 执行业务逻辑
(afn)
; This avoids a race condition with cancel-timer.
(schedule timer recur-secs this :check-active false))
当执行(afn)时候,首先会执行业务逻辑,然后在调用schedule方法,传入schedule 的afn 的值为:
afn = (fn this []
;; 执行业务逻辑
(afn)
; This avoids a race condition with cancel-timer.
(schedule timer recur-secs this :check-active false))
因此这个会以recur-secs的时间间隔不断的去执行用户定义的逻辑。 在nimbus.clj中,就是不断的执行这两个任务:
(mk-assignments nimbus)
(do-cleanup nimbus)
来看一下队列的创建过程:
(let [queue (PriorityQueue. 10 (reify Comparator
(compare
[this o1 o2]
(- (first o1) (first o2)))
(equals
[this obj]
true)))
…])
前文的描述可以知道,队列中保持的element是:
[执行时间,方法句柄,方法id]
PriorityQueue ,The head of this queue is the least element with respect to the specified ordering. 也就是队首的元素是队列中最小的,在创建PriorityQueue实例的时候,实现了Comparator 的接口,当o1的开始执行时间要大于o2时,此时compare 返回的值>1,也就是o1>o2。 因此在等待队列中,对首的方法开始执行时间永远是最先开始的,那么方法的调度策略也就成为了一个谁急谁先调用。
在time.clj中开启了一个守护线程不断的从这个队列中比较当前时间i与对首元素的开始执行时间j,若j>i,则将这个线程sleep j-i,否则,则取出对首元素的方法,执行调用(afn).
更多有关PriorityQueue(优先级队列)有关的实现细节待续。
标签:
原文地址:http://www.cnblogs.com/dzhchiyu/p/5268206.html