标签:
功能:提交一个新的Topology,并为Topology创建storm-id(topology-id),校验其结构,设置必要的元数据,最后为Topology分配任务.
实现源码:
1 | (^void submitTopology |
2 | [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] |
3 | (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology |
4 | (SubmitOptions. TopologyInitialStatus/ACTIVE))) |
从以上源码中看出submitTopology内部是对submitTopologyWithOpts方法的调用。
submitTopologyWithOpts函数原型如下:
1 | ^void submitTopologyWithOpts |
2 | [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology |
3 | ^SubmitOptions submitOptions] |
在submitTopologyWithOpts中主要做了以下几件事情:
normalize-topology
实现源码:
1 | (defn normalize-topology [storm-conf ^StormTopology topology] |
2 | (let [ret (.deepCopy topology)] |
3 | (doseq [[_ component] (all-components ret)] |
4 | (.set_json_conf |
5 | (.get_common component) |
6 | (->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)} |
7 | (merge (component-conf component)) |
8 | to-json ))) |
9 | ret )) |
实现说明:
component-parallelism实现源码(计算组件并行度):
1 | (defn- component-parallelism [storm-conf component] |
2 | (let [storm-conf (merge storm-conf (component-conf component)) |
3 | num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component)) |
4 | max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM) |
5 | ] |
6 | (if max-parallelism |
7 | (min max-parallelism num-tasks) |
8 | num-tasks))) |
1 | TopologyBuilder builder = new TopologyBuilder(); |
2 | // 4对应对用用户设置的组件并行度,10对应TOPOLOGY-TASK配置项的值 |
3 | builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random").setNumTasks(6); Config conf = new Config(); |
4 | // 8对应 TOPOLOGY-MAX-TASK-PARALLELISM配置项的值 |
5 | Conf.setMaxTaskParallelism(8); |
system-topology!
功能:
验证用户提交的Topology,同时为提交的topology添加一些系统组件和流。
实现源码:
1 | (defn system-topology! [storm-conf ^StormTopology topology] |
2 | (validate-basic! topology) |
3 | (let [ret (.deepCopy topology)] |
4 | (add-acker! storm-conf ret) |
5 | (add-metric-components! storm-conf ret) |
6 | (add-system-components! storm-conf ret) |
7 | (add-metric-streams! ret) |
8 | (add-system-streams! ret) |
9 | (validate-structure! ret) |
10 | ret |
11 | )) |
实现说明:
验证过程:
获取Topology中所有组件和组件的输入(包括component-id、stream-id、Grouping),对输入组件依次判断输入组件ID(component-id)是否在该Topology中,不存在则抛出异常,存在则再判断该组件的流类型是否为所对应的stream-id,若不存在则抛出异常,存在则继续检查该流的分组方式(Grouping)是否与能对应,所有组件检查完毕后没有异常抛出表示该Topology有效.
标签:
原文地址:http://www.cnblogs.com/jianyuan/p/4792443.html