标签:云计算 jstorm 实时计算 storm hadoop
一个topology包含一或多个spout bolt,spout负责在数据源获得数据并发送给bolt,每个bolt负责做完处理后发给下一个bolt。通常topology的创建是由TopologyBuilder来创建的,该组件会记录包含哪些spout bolt,并做相应验证:各组件是否有id冲突,校验方法如下:private void validateUnusedId(String id) { if (_bolts.containsKey(id)) { throw new IllegalArgumentException( "Bolt has already been declared for id " + id); } if (_spouts.containsKey(id)) { throw new IllegalArgumentException( "Spout has already been declared for id " + id); } if (_stateSpouts.containsKey(id)) { throw new IllegalArgumentException( "State spout has already been declared for id " + id); } } <span style="font-family: 'Courier New'; background-color: rgb(255, 255, 255);"> </span>TopologyBuilder会保存各个组件到相应的数据结构中,数据结构如下:
public class TopologyBuilder { // 存放所有的bolt private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>(); // 存放所有的spout private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>(); //存放各组件配置信息 private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>(); .......... }组件配置信息存放方法如下
private void initCommon(String id, IComponent component, Number parallelism) { ComponentCommon common = new ComponentCommon(); common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); if (parallelism != null) common.set_parallelism_hint(parallelism.intValue()); else { common.set_parallelism_hint(Integer.valueOf(1)); } Map conf = component.getComponentConfiguration(); if (conf != null) common.set_json_conf(Utils.to_json(conf)); _commons.put(id, common); }信息保存好后,在topology阶段builder会根据这些信息创建一个StormTopology实例,然后由StormSubmitter.submitTopology进行提交,该阶段分两步:1、上传jar文件 2、提交作业
public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException { //读取配置信息并进行相关校验 if (!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException( "Storm conf is not valid. Must be json-serializable"); } stormConf = new HashMap(stormConf); stormConf.putAll(Utils.readCommandLineOpts()); Map conf = Utils.readStormConfig(); conf.putAll(stormConf); putUserInfo(conf, stormConf); try { String serConf = Utils.to_json(stormConf); if (localNimbus != null) { LOG.info("Submitting topology " + name + " in local mode"); localNimbus.submitTopology(name, null, serConf, topology); } else { NimbusClient client = NimbusClient.getConfiguredClient(conf); //校验集群中有无同名topology在运行 if (topologyNameExists(conf, name)) { throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } //上传jar文件,下面会详细解释这个方法 submitJar(conf); try { LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); //提交topology,会调用服务端ServiceHandler的submitTopology方法,开始启动这个topology,那就属于服务端的事情了 if (opts != null) { client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts); } else { // this is for backwards compatibility client.getClient().submitTopology(name, path, serConf, topology); } } finally { client.close(); } } LOG.info("Finished submitting topology: " + name); } catch (InvalidTopologyException e) { ....... } }jar文件上传包含两部分,jar文件本身和其依赖的库文件都会被传到服务端,默认上传buf大小为512K,可以通过nimbus.thrift.max_buffer_size来调整buf大小,服务端保存的目录结构如下:
[hongmin.lhm@rt2l02045 ~]$tree /home/hongmin.lhm/jstorm_data/nimbus/inbox/ /home/hongmin.lhm/jstorm_data/nimbus/inbox/ `-- 7c1b7d1e-9134-4ed8-b664-836271b49bd3 `-- stormjar-7c1b7d1e-9134-4ed8-b664-836271b49bd3.jar
private static void submitJar(Map conf) { if (submittedJar == null) { NimbusClient client = NimbusClient.getConfiguredClient(conf); try { LOG.info("Jar not uploaded to master yet. Submitting jar..."); String localJar = System.getProperty("storm.jar"); path = client.getClient().beginFileUpload(); String[] pathCache = path.split("/"); String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar"; List<String> lib = (List<String>) conf .get(GenericOptionsParser.TOPOLOGY_LIB_NAME); Map<String, String> libPath = (Map<String, String>) conf .get(GenericOptionsParser.TOPOLOGY_LIB_PATH); if (lib != null && lib.size() != 0) { for (String libName : lib) { String jarPath = path + "/" + libName; client.getClient().beginLibUpload(jarPath); submitJar(conf, libPath.get(libName), jarPath, client); } if (localJar != null) submittedJar = submitJar(conf, localJar, uploadLocation, client); } else { submittedJar = submitJar(conf, localJar, uploadLocation, client); } } catch (Exception e) { throw new RuntimeException(e); } finally { client.close(); } } else { LOG.info("Jar already uploaded to master. Not submitting jar."); } }
标签:云计算 jstorm 实时计算 storm hadoop
原文地址:http://blog.csdn.net/lihm0_1/article/details/42777709