码迷,mamicode.com
首页 > 其他好文 > 详细

storm操作zookeeper源码分析-cluster.clj

时间:2014-08-26 22:38:32      阅读:354      评论:0      收藏:0      [点我收藏+]

标签:des   style   color   os   java   使用   io   for   文件   

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。
clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协议定义如下:

ClusterState协议
(defprotocol ClusterState
 (set-ephemeral-node [this path data])
 (delete-node [this path])
 (create-sequential [this path data])
 ;; if node does not exist, create persistent with this data
 (set-data [this path data])
 (get-data [this path watch?])
 (get-version [this path watch?])
 (get-data-with-version [this path watch?])
 (get-children [this path watch?])
 (mkdirs [this path])
 (close [this])
 (register [this callback])
 (unregister [this id]))

StormClusterState协议封装了一组storm与zookeeper进行交互的函数,可以将StormClusterState协议中的函数看成ClusterState协议中函数的"组合"。StormClusterState协议定义如下:

StormClusterState协议
(defprotocol StormClusterState
 (assignments [this callback])
 (assignment-info [this storm-id callback])
 (assignment-info-with-version [this storm-id callback])
 (assignment-version [this storm-id callback])
 (active-storms [this])
 (storm-base [this storm-id callback])
 (get-worker-heartbeat [this storm-id node port])
 (executor-beats [this storm-id executor->node+port])
 (supervisors [this callback])
 (supervisor-info [this supervisor-id]) ;; returns nil if doesn‘t exist
 (setup-heartbeats! [this storm-id])
 (teardown-heartbeats! [this storm-id])
 (teardown-topology-errors! [this storm-id])
 (heartbeat-storms [this])
 (error-topologies [this])
 (worker-heartbeat! [this storm-id node port info])
 (remove-worker-heartbeat! [this storm-id node port])
 (supervisor-heartbeat! [this supervisor-id info])
 (activate-storm! [this storm-id storm-base])
 (update-storm! [this storm-id new-elems])
 (remove-storm-base! [this storm-id])
 (set-assignment! [this storm-id info])
 (remove-storm! [this storm-id])
 (report-error [this storm-id task-id node port error])
 (errors [this storm-id task-id])
 (disconnect [this]))

命名空间backtype.storm.cluster除了定义ClusterState和StormClusterState这两个重要协议外,还定义了两个重要函数:mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函数如下:
该函数返回一个实现了ClusterState协议的对象,通过这个对象就可以与zookeeper进行交互了。

mk-distributed-cluster-state函数
(defn mk-distributed-cluster-state
 ;; conf绑定了storm.yaml中的配置信息,是一个map对象
 [conf]
 ;; zk绑定一个zk client,Storm使用CuratorFramework与Zookeeper进行交互
 (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
   ;; 创建storm集群在zookeeper上的根目录,默认值为/storm
   (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
   (.close zk))
 ;; callbacks绑定回调函数集合,是一个map对象
 (let [callbacks (atom {})
       ;; active标示zookeeper集群状态
       active (atom true)
       ;; zk重新绑定新的zk client,该zk client设置了watcher,这样当zookeeper集群的状态发生变化时,zk server会给zk client发送相应的event,zk client设置的watcher会调用callbacks中相应回调函数来处理event
       ;; 启动nimbus时,callbacks是一个空集合,所以nimbus端收到event后不会调用任何回调函数;但是启动supervisor时,callbacks中注册了回调函数,所以当supervisor收到zk server发送的event后,会调用相应的回调函数
       ;; mk-client函数定义在zookeeper.clj文件中,请参见其定义部分
     zk (zk/mk-client conf
                        (conf STORM-ZOOKEEPER-SERVERS)
                        (conf STORM-ZOOKEEPER-PORT)
                        :auth-conf conf
                        :root (conf STORM-ZOOKEEPER-ROOT)
                        ;; :watcher绑定一个函数,指定zk client的默认watcher函数,state标示当前zk client的状态;type标示事件类型;path标示zookeeper上产生该事件的znode
                        ;; 该watcher函数主要功能就是执行callbacks集合中的函数,callbacks集合中的函数是在mk-storm-cluster-state函数中通过调用ClusterState的register函数添加的
                        :watcher (fn [state type path]
                                   (when @active
                                     (when-not (= :connected state)
                                       (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
                                     (when-not (= :none type)
                                       (doseq [callback (vals @callbacks)]
                                         (callback type path))))))]
   ;; reify相当于java中的implements,这里表示实现一个协议
   (reify
    ClusterState
    ;; register函数用于将回调函数加入callbacks中,key是一个32位的标识
    (register
      [this callback]
      (let [id (uuid)]
        (swap! callbacks assoc id callback)
        id))
    ;; unregister函数用于将指定key的回调函数从callbacks中删除
    (unregister
      [this id]
      (swap! callbacks dissoc id))
    ;; 在zookeeper上添加一个临时节点
    (set-ephemeral-node
      [this path data]
      (zk/mkdirs zk (parent-path path))
      (if (zk/exists zk path false)
        (try-cause
          (zk/set-data zk path data) ; should verify that it‘s ephemeral
          (catch KeeperException$NoNodeException e
            (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
            (zk/create-node zk path data :ephemeral)
            ))
        (zk/create-node zk path data :ephemeral)))
    ;; 在zookeeper上添加一个顺序节点
    (create-sequential
      [this path data]
      (zk/create-node zk path data :sequential))
    ;; 修改某个节点数据
    (set-data
      [this path data]
      ;; note: this does not turn off any existing watches
      (if (zk/exists zk path false)
        (zk/set-data zk path data)
        (do
          (zk/mkdirs zk (parent-path path))
          (zk/create-node zk path data :persistent))))
    ;; 删除指定节点
    (delete-node
      [this path]
      (zk/delete-recursive zk path))
    ;; 获取指定节点数据。path标示节点路径;watch?是一个布尔类型值,表示是否需要对该节点进行"观察",如果watch?=true,当调用set-data函数修改该节点数据后,
    ;; 会给zk client发送一个事件,zk client接收事件后,会调用创建zk client时指定的默认watcher函数(即:watcher绑定的函数)
    (get-data
      [this path watch?]
      (zk/get-data zk path watch?))
    ;; 与get-data函数的区别就是获取指定节点数据的同时,获取节点数据的version,version表示节点数据修改的次数
    (get-data-with-version
      [this path watch?]
      (zk/get-data-with-version zk path watch?))
    ;; 获取指定节点的version,watch?的含义与get-data函数中的watch?相同
    (get-version
      [this path watch?]
      (zk/get-version zk path watch?))
    ;; 获取指定节点的子节点列表,watch?的含义与get-data函数中的watch?相同
    (get-children
      [this path watch?]
      (zk/get-children zk path watch?))
    ;; 在zookeeper上创建一个节点
    (mkdirs
      [this path]
      (zk/mkdirs zk path))
    ;; 关闭zk client
    (close
      [this]
      (reset! active false)
      (.close zk)))))

mk-storm-cluster-state函数定义如下:
mk-storm-cluster-state函数非常重要,该函数返回一个实现了StormClusterState协议的实例,通过该实例storm就可以更加方便与zookeeper进行交互在启动nimbus和supervisor的函数中均调用了mk-storm-cluster-state函数。关于nimbus和supervisor的启动将在之后的文章中介绍

mk-storm-cluster-state函数
(defn mk-storm-cluster-state
 [cluster-state-spec]
 ;; satisfies?谓词相当于java中的instanceof,判断cluster-state-spec是不是ClusterState实例
 (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
                               [false cluster-state-spec]
                               [true (mk-distributed-cluster-state cluster-state-spec)])
     ;; 绑定topology id->回调函数的map,当/assignments/{topology id}数据发生变化时,zk client执行assignment-info-callback中topology id所对应的回调函数
       assignment-info-callback (atom {})
     ;; assignment-info-with-version-callback与assignment-info-callback类似
       assignment-info-with-version-callback (atom {})
     ;; assignment-version-callback与assignments-callback类似
       assignment-version-callback (atom {})
     ;; 当/supervisors标示的znode的子节点发生变化时,zk client执行supervisors-callback指向的函数
       supervisors-callback (atom nil)
     ;; 当/assignments标示的znode的子节点发生变化时,zk client执行assignments-callback指向的函数
       assignments-callback (atom nil)
     ;; 当/storms/{topology id}标示的znode的数据发生变化时,zk client执行storm-base-callback中topology id所对应的回调函数
       storm-base-callback (atom {})
     ;; register函数将"回调函数(fn ...)"添加到cluster-state的callbacks集合中,并返回标示该回调函数的uuid
       state-id (register
                  cluster-state
               ;; 定义"回调函数",type标示事件类型,path标示znode
                  (fn [type path]
                 ;; subtree绑定路径前缀如"assignments"、"storms"、"supervisors"等,args存放topology id
                    (let [[subtree & args] (tokenize-path path)]
                   ;; condp相当于java中的switch
                      (condp = subtree
                     ;; 当subtree="assignments"时,如果args为空,说明是/assignments的子节点发生变化,执行assignments-callback指向的回调函数,否则
                   ;; 说明/assignments/{topology id}标示的节点数据发生变化,执行assignment-info-callback指向的回调函数
                        ASSIGNMENTS-ROOT (if (empty? args)
                                           (issue-callback! assignments-callback)
                                           (issue-map-callback! assignment-info-callback (first args)))
                     ;; 当subtree="supervisors"时,说明是/supervisors的子节点发生变化,执行supervisors-callback指向的回调函数
                        SUPERVISORS-ROOT (issue-callback! supervisors-callback)
                     ;; 当subtree="storms"时,说明是/storms/{topology id}标示的节点数据发生变化,执行storm-base-callback指向的回调函数
                        STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
                        ;; this should never happen
                        (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
   ;; 在zookeeper上创建storm运行topology所必需的znode
   (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
     (mkdirs cluster-state p))
   ;; 返回一个实现StormClusterState协议的实例
   (reify
     StormClusterState
     ;; 获取/assignments的子节点列表,如果callback不为空,将其赋值给assignments-callback,并对/assignments添加"节点观察"
     (assignments
       [this callback]
       (when callback
         (reset! assignments-callback callback))
       (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
     ;; 获取/assignments/{storm-id}节点数据,即storm-id的分配信息,如果callback不为空,将其添加到assignment-info-callback中,并对/assignments/{storm-id}添加"数据观察"
     (assignment-info
       [this storm-id callback]
       (when callback
         (swap! assignment-info-callback assoc storm-id callback))
       (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
     ;; 获取/assignments/{storm-id}节点数据包括version信息,如果callback不为空,将其添加到assignment-info-with-version-callback中,并对/assignments/{storm-id}添加"数据观察"
     (assignment-info-with-version
       [this storm-id callback]
       (when callback
         (swap! assignment-info-with-version-callback assoc storm-id callback))
       (let [{data :data version :version}
             (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]
       {:data (maybe-deserialize data)
        :version version}))
     ;; 获取/assignments/{storm-id}节点数据的version信息,如果callback不为空,将其添加到assignment-version-callback中,并对/assignments/{storm-id}添加"数据观察"
     (assignment-version
       [this storm-id callback]
       (when callback
         (swap! assignment-version-callback assoc storm-id callback))
       (get-version cluster-state (assignment-path storm-id) (not-nil? callback)))
     ;; 获取storm集群中正在运行的topology id即/storms的子节点列表
     (active-storms
       [this]
       (get-children cluster-state STORMS-SUBTREE false))
     ;; 获取storm集群中所有有心跳的topology id即/workerbeats的子节点列表
     (heartbeat-storms
       [this]
       (get-children cluster-state WORKERBEATS-SUBTREE false))
     ;; 获取所有有错误的topology id即/errors的子节点列表
     (error-topologies
       [this]
       (get-children cluster-state ERRORS-SUBTREE false))
     ;; 获取指定storm-id进程的心跳信息,即/workerbeats/{storm-id}/{node-port}节点数据
     (get-worker-heartbeat
       [this storm-id node port]
       (-> cluster-state
           (get-data (workerbeat-path storm-id node port) false)
           maybe-deserialize))
     ;; 获取指定进程中所有线程的心跳信息
     (executor-beats
       [this storm-id executor->node+port]
       ;; need to take executor->node+port in explicitly so that we don‘t run into a situation where a
       ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
       ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
       ;; we avoid situations like that
       (let [node+port->executors (reverse-map executor->node+port)
             all-heartbeats (for [[[node port] executors] node+port->executors]
                              (->> (get-worker-heartbeat this storm-id node port)
                                   (convert-executor-beats executors)
                                   ))]
         (apply merge all-heartbeats)))
     ;; 获取/supervisors的子节点列表,如果callback不为空,将其赋值给supervisors-callback,并对/supervisors添加"节点观察"
     (supervisors
       [this callback]
       (when callback
         (reset! supervisors-callback callback))
       (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
     ;; 获取/supervisors/{supervisor-id}节点数据,即supervisor的心跳信息
     (supervisor-info
       [this supervisor-id]
       (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
     ;; 设置进程心跳信息
     (worker-heartbeat!
       [this storm-id node port info]
       (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
     ;; 删除进程心跳信息
     (remove-worker-heartbeat!
       [this storm-id node port]
       (delete-node cluster-state (workerbeat-path storm-id node port)))
     ;; 创建指定storm-id的topology的用于存放心跳信息的节点
     (setup-heartbeats!
       [this storm-id]
       (mkdirs cluster-state (workerbeat-storm-root storm-id)))
     ;; 删除指定storm-id的topology的心跳信息节点
     (teardown-heartbeats!
       [this storm-id]
       (try-cause
         (delete-node cluster-state (workerbeat-storm-root storm-id))
         (catch KeeperException e
           (log-warn-error e "Could not teardown heartbeats for " storm-id))))
     ;; 删除指定storm-id的topology的错误信息节点
     (teardown-topology-errors!
       [this storm-id]
       (try-cause
         (delete-node cluster-state (error-storm-root storm-id))
         (catch KeeperException e
           (log-warn-error e "Could not teardown errors for " storm-id))))
     ;; 创建临时节点存放supervisor的心跳信息
     (supervisor-heartbeat!
       [this supervisor-id info]
       (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))
     ;; 创建/storms/{storm-id}节点
     (activate-storm!
       [this storm-id storm-base]
       (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))

     (update-storm!
       [this storm-id new-elems]
       (let [base (storm-base this storm-id nil)
             executors (:component->executors base)
             new-elems (update new-elems :component->executors (partial merge executors))]
         (set-data cluster-state (storm-path storm-id)
                   (-> base
                       (merge new-elems)
                       Utils/serialize))))

     (storm-base
       [this storm-id callback]
       (when callback
         (swap! storm-base-callback assoc storm-id callback))
       (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))

     (remove-storm-base!
       [this storm-id]
       (delete-node cluster-state (storm-path storm-id)))

     (set-assignment!
       [this storm-id info]
       (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))

     (remove-storm!
       [this storm-id]
       (delete-node cluster-state (assignment-path storm-id))
       (remove-storm-base! this storm-id))

     (report-error
       [this storm-id component-id node port error]
       (let [path (error-path storm-id component-id)
             data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
             _ (mkdirs cluster-state path)
             _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
             to-kill (->> (get-children cluster-state path false)
                          (sort-by parse-error-path)
                          reverse
                          (drop 10))]
         (doseq [k to-kill]
           (delete-node cluster-state (str path "/" k)))))

     (errors
       [this storm-id component-id]
       (let [path (error-path storm-id component-id)
             _ (mkdirs cluster-state path)
             children (get-children cluster-state path false)
             errors (dofor [c children]
                           (let [data (-> (get-data cluster-state (str path "/" c) false)
                                          maybe-deserialize)]
                             (when data
                               (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
                               )))
             ]
         (->> (filter not-nil? errors)
              (sort-by (comp - :time-secs)))))

     (disconnect
       [this]
       (unregister cluster-state state-id)
       (when solo?
         (close cluster-state))))))

 

storm操作zookeeper源码分析-cluster.clj

标签:des   style   color   os   java   使用   io   for   文件   

原文地址:http://www.cnblogs.com/ierbar0604/p/3938252.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!