大家都知道,要提交Storm Topology 到Cluster,需要运行如下命令:
${STORM_HOME}/bin/storm jar xxxxxxxxxxx.jar ${main class} [args ...]
def main(): if len(sys.argv) <= 1: print_usage() sys.exit(-1) global CONFIG_OPTS config_list, args = parse_config_opts(sys.argv[1:]) parse_config(config_list) COMMAND = args[0] ARGS = args[1:] (COMMANDS.get(COMMAND, unknown_command))(*ARGS) if __name__ == "__main__": main()
COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer, "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue, "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage, "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor}
def jar(jarfile, klass, *args): """Syntax: [storm jar topology-jar-path class ...] Runs the main method of class with the specified arguments. The storm jars and configs in ~/.storm are put on the classpath. The process is configured so that StormSubmitter (http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html) will upload the jar at topology-jar-path when the topology is submitted. """ exec_storm_class( klass, jvmtype="-client", extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"], args=args, jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False): global CONFFILE storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR]) if(storm_log_dir == None or storm_log_dir == "nil"): storm_log_dir = STORM_DIR+"/logs" all_args = [ JAVA_CMD, jvmtype, get_config_opts(), "-Dstorm.home=" + STORM_DIR, "-Dstorm.log.dir=" + storm_log_dir, "-Djava.library.path=" + confvalue("java.library.path", extrajars), "-Dstorm.conf.file=" + CONFFILE, "-cp", get_classpath(extrajars), ] + jvmopts + [klass] + list(args) print("Running: " + " ".join(all_args)) if fork: os.spawnvp(os.P_WAIT, JAVA_CMD, all_args) else: os.execvp(JAVA_CMD, all_args) # replaces the current process and # never returns
private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>(); private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>(); private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
private void initCommon(String id, IComponent component, Number parallelism) { ComponentCommon common = new ComponentCommon(); common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue()); Map conf = component.getComponentConfiguration(); if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf)); _commons.put(id, common); }
private ComponentCommon getComponentCommon(String id, IComponent component) { ComponentCommon ret = new ComponentCommon(_commons.get(id)); OutputFieldsGetter getter = new OutputFieldsGetter(); component.declareOutputFields(getter); ret.set_streams(getter.getFieldsDeclaration()); return ret; }
当所有的bolt和spout都set完毕之后,我们就会调用createTopology方法生成一个StormTopology,由StormSubmitter来submit topology
/** * Submits a topology to run on the cluster. A topology runs forever or until * explicitly killed. * * * @param name the name of the storm. * @param stormConf the topology-specific configuration. See {@link Config}. * @param topology the processing to execute. * @param opts to manipulate the starting of the topology * @param progressListener to track the progress of the jar upload process * @throws AlreadyAliveException if a topology with this name is already running * @throws InvalidTopologyException if an invalid topology was submitted */ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException { if(!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); } stormConf = new HashMap(stormConf); stormConf.putAll(Utils.readCommandLineOpts()); Map conf = Utils.readStormConfig(); conf.putAll(stormConf); try { String serConf = JSONValue.toJSONString(stormConf); if(localNimbus!=null) { LOG.info("Submitting topology " + name + " in local mode"); localNimbus.submitTopology(name, null, serConf, topology); } else { NimbusClient client = NimbusClient.getConfiguredClient(conf); if(topologyNameExists(conf, name)) { throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } submitJar(conf, progressListener); try { LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); if(opts!=null) { client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); } else { // this is for backwards compatibility client.getClient().submitTopology(name, submittedJar, serConf, topology); } } catch(InvalidTopologyException e) { LOG.warn("Topology submission exception: "+e.get_msg()); throw e; } catch(AlreadyAliveException e) { LOG.warn("Topology already alive exception", e); throw e; } finally { client.close(); } } LOG.info("Finished submitting topology: " + name); } catch(TException e) { throw new RuntimeException(e); } }
Nimbus可以 说是storm中最核心的部分,它的主要功能有两个: