码迷,mamicode.com
首页 > 其他好文 > 详细

storm源码剖析(3):topology启动过程

时间:2015-04-16 19:46:05      阅读:239      评论:0      收藏:0      [点我收藏+]

标签:

storm的topology启动过程是执行strom jar topology1.jar MAINCLASS ARG1 ARG2

鉴于前面已经分析了脚本的解析过程,现在重点分析topology1.jar的执行。

以storm-starter中的ExclamationTopology为例,来进行剖析:

public class ExclamationTopology {

  public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }


  }

  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word", new TestWordSpout(), 10);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}

可以看到一个topology的启动包括三个步骤:

(1)创建TopologyBuilder,设置输入源spout,设置输出源bolt

(2)创建Config,设置配置项

(3)提交topology

创建TopologyBuilder

TopologyBuilder对象创建很简单,先来看看setSpout():

public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
        validateUnusedId(id);
        initCommon(id, spout, parallelism_hint);
        _spouts.put(id, spout);
        return new SpoutGetter(id);
    }

首先,判断componentId是否使用过了,如果使用过,则直接剖错。

然后,初始化Commponent:创建ComponentCommon对象,并设置属性,然后在TopologyBuilder 的成员变量Map<String, IRichSpout> _commons中记录下common,其key为componentId(这里为“word”)。代码如下:

private void initCommon(String id, IComponent component, Number parallelism) {
        ComponentCommon common = new ComponentCommon();
        common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
        if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue());
        Map conf = component.getComponentConfiguration();
        if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
        _commons.put(id, common); 
    }

其中ComponentCommon是使用thrift定义的,在storm.thrift中定义,代码如下:

struct ComponentCommon {
  1: required map<GlobalStreamId, Grouping> inputs;
  2: required map<string, StreamInfo> streams; //key is stream id
  3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component

  // component specific configuration respects:
  // topology.debug: false
  // topology.max.task.parallelism: null // can replace isDistributed with this
  // topology.max.spout.pending: null
  // topology.kryo.register // this is the only additive one
  
  // component specific configuration
  4: optional string json_conf;
}

最后,在TopologyBuilder 的成员变量Map<String, IRichSpout> _spouts,记录下spout的记录。其中key也是componentId(这里为“word”)。

 

再来看看setBolt,与setSpout的处理一样,最终在TopologyBuilder 的成员变量Map<String, IRichSpout> _commons中记录下common,其key为componentId(这里为“exclaim1”);在TopologyBuilder 的成员变量Map<String, IRichSpout> _bolts,记录下bolt的记录。其中key也是componentId(这里为“exclaim1”).

之后,.shuffleGrouping("word")这部分,是调用setBolt返回的,BoltDeclarer中的shuffleGrouping。

         最终将会调用到grouping,其中streamId在这里没有指定,会使用"default"来替代。

public BoltDeclarer shuffleGrouping(StringcomponentId) {

   return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);

}

 

public BoltDeclarer shuffleGrouping(StringcomponentId, String streamId) {

   return grouping(componentId, streamId, Grouping.shuffle(newNullStruct()));

}

在这里grouping最后一个参数是生成了Grouping对象,并填充shuffle为NullStruct,其中Grouping是在storm.thrift定义的一个联合体,thrift会生成对应的java代码,内部定义了很多种grouping的方式。

                                                                                                                                      

private BoltDeclarer grouping(StringcomponentId, String streamId, Grouping grouping)

{

    _commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId,streamId), grouping);

    return this;

}

grouing函数是将之前记录在_commons中的,bolt的componentId对应的ComponentCommon的键值对,取出来设置ComponentCommon中的inputs的值。以第一个setBolt为例,就是取出"exclaim1"这个componentId对应的ComponentCommon,将里面的inputs设置为,这个输入是从哪里来的,也就是"word"这个componentId,streamId为"default"的这个spout流作为第一个bolt的输入源。

创建Config

Config比较简单,继承自Map,通过setXxx()为自身添加配置。

在这个例子中有两个set函数的调用。

conf.setDebug(true);就是在Map中插入一条记录("topology.debug" -> "true"),标记是打开debug模式的。

conf.setNumWorkers(3);同样在Map中插入一条记录("topology.workers" -> 3),标记worker数为3个。

提交Topology

 

storm源码剖析(3):topology启动过程

标签:

原文地址:http://www.cnblogs.com/jerryshao2015/p/4432966.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!