标签:storm
大家都知道,要提交Storm Topology 到Cluster,需要运行如下命令:
${STORM_HOME}/bin/storm jar xxxxxxxxxxx.jar ${main class} [args ...]def main():
    if len(sys.argv) <= 1:
        print_usage()
        sys.exit(-1)
    global CONFIG_OPTS
    config_list, args = parse_config_opts(sys.argv[1:])
    parse_config(config_list)
    COMMAND = args[0]
    ARGS = args[1:]
    (COMMANDS.get(COMMAND, unknown_command))(*ARGS)
    
if __name__ == "__main__":
    main()COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer,
            "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
            "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
            "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
            "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor}def jar(jarfile, klass, *args):
    """Syntax: [storm jar topology-jar-path class ...]
    Runs the main method of class with the specified arguments. 
    The storm jars and configs in ~/.storm are put on the classpath. 
    The process is configured so that StormSubmitter 
    (http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html)
    will upload the jar at topology-jar-path when the topology is submitted.
    """
    exec_storm_class(
        klass,
        jvmtype="-client",
        extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],
        args=args,
        jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):
    global CONFFILE
    storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])
    if(storm_log_dir == None or storm_log_dir == "nil"):
        storm_log_dir = STORM_DIR+"/logs"
    all_args = [
        JAVA_CMD, jvmtype, get_config_opts(),
        "-Dstorm.home=" + STORM_DIR,
        "-Dstorm.log.dir=" + storm_log_dir, 
        "-Djava.library.path=" + confvalue("java.library.path", extrajars),
        "-Dstorm.conf.file=" + CONFFILE,
        "-cp", get_classpath(extrajars),
    ] + jvmopts + [klass] + list(args)
    print("Running: " + " ".join(all_args))
    if fork:
        os.spawnvp(os.P_WAIT, JAVA_CMD, all_args)
    else:
        os.execvp(JAVA_CMD, all_args) # replaces the current process and
        # never returns
进程启动之后,就开始调用你自己写的Topology代码了,我们一般用TopologyBuilder来构建Topology,TopologyBuilder有三个变量
private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
    private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
    private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();_commons存放该组件额外的一些信息,并行度,额外配置等等。每set一个组件时都会调用初始化common方法
private void initCommon(String id, IComponent component, Number parallelism) {
        ComponentCommon common = new ComponentCommon();
        common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
        if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue());
        Map conf = component.getComponentConfiguration();
        if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
        _commons.put(id, common);
    }private ComponentCommon getComponentCommon(String id, IComponent component) {
        ComponentCommon ret = new ComponentCommon(_commons.get(id));
        
        OutputFieldsGetter getter = new OutputFieldsGetter();
        component.declareOutputFields(getter);
        ret.set_streams(getter.getFieldsDeclaration());
        return ret;        
    }
当所有的bolt和spout都set完毕之后,我们就会调用createTopology方法生成一个StormTopology,由StormSubmitter来submit topology
 /**
     * Submits a topology to run on the cluster. A topology runs forever or until
     * explicitly killed.
     *
     *
     * @param name the name of the storm.
     * @param stormConf the topology-specific configuration. See {@link Config}.
     * @param topology the processing to execute.
     * @param opts to manipulate the starting of the topology
     * @param progressListener to track the progress of the jar upload process
     * @throws AlreadyAliveException if a topology with this name is already running
     * @throws InvalidTopologyException if an invalid topology was submitted
     */
    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException {
        if(!Utils.isValidConf(stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);
        stormConf.putAll(Utils.readCommandLineOpts());
        Map conf = Utils.readStormConfig();
        conf.putAll(stormConf);
        try {
            String serConf = JSONValue.toJSONString(stormConf);
            if(localNimbus!=null) {
                LOG.info("Submitting topology " + name + " in local mode");
                localNimbus.submitTopology(name, null, serConf, topology);
            } else {
                NimbusClient client = NimbusClient.getConfiguredClient(conf);
                if(topologyNameExists(conf, name)) {
                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                }
                submitJar(conf, progressListener);
                try {
                    LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);
                    if(opts!=null) {
                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
                    } else {
                        // this is for backwards compatibility
                        client.getClient().submitTopology(name, submittedJar, serConf, topology);
                    }
                } catch(InvalidTopologyException e) {
                    LOG.warn("Topology submission exception: "+e.get_msg());
                    throw e;
                } catch(AlreadyAliveException e) {
                    LOG.warn("Topology already alive exception", e);
                    throw e;
                } finally {
                    client.close();
                }
            }
            LOG.info("Finished submitting topology: " +  name);
        } catch(TException e) {
            throw new RuntimeException(e);
        }
    }
提交Topology的操作是,初始化NimbusClient,上传Jar包,检查该Topology是否存在,一切完工后,接下来就交由Nimbus来做了。
Nimbus可以 说是storm中最核心的部分,它的主要功能有两个:
抱歉,太晚了。后续补上
标签:storm
原文地址:http://blog.csdn.net/wzhg0508/article/details/41402003