标签:storm jstorm 云计算 实时计算 hadoop
topology提交前会先判断集群中是否存在同名作业,如果存在在提交失败,如果没有则会增加集群提交次数SubmittedCount,每次提交成功,该变量都会加1,然后会为该作业分配一个id,生成规则如下:public static String TopologyNameToId(String topologyName, int counter) { return topologyName + "-" + counter + "-" + TimeUtils.current_time_secs(); }<span style="font-family: 'Courier New'; background-color: rgb(255, 255, 255);"> </span>因此我们从作业id中就可以判断集群作业成功提交次数、提交时间、还有作业名称了,如果我们没有指定acker数量,对topology本身的校验比较细致:
public void submitTopologyWithOpts(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException, TException { LOG.info("Receive " + topologyname + ", uploadedJarLocation:" + uploadedJarLocation); // @@@ Move validate topologyname in client code try { //校验集群中是否存在同名作业 checkTopologyActive(data, topologyname, false); } catch (AlreadyAliveException e) { LOG.info(topologyname + " is already exist "); throw e; } catch (Throwable e) { LOG.info("Failed to check whether topology is alive or not", e); throw new TException(e); } //成功提交次数加1 int counter = data.getSubmittedCount().incrementAndGet(); String topologyId = Common.TopologyNameToId(topologyname, counter); try { //调整相关配置,增加一些默认配置项 Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils .from_json(jsonConf); if (serializedConf == null) { LOG.warn("Failed to serialized Configuration"); throw new InvalidTopologyException( "Failed to serilaze topology configuration"); } serializedConf.put(Config.TOPOLOGY_ID, topologyId); serializedConf.put(Config.TOPOLOGY_NAME, topologyname); Map<Object, Object> stormConf; stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology); Map<Object, Object> totalStormConf = new HashMap<Object, Object>( conf); totalStormConf.putAll(stormConf); StormTopology normalizedTopology = NimbusUtils.normalizeTopology( stormConf, topology, false); // 校验ID、字段合法性,worker和acker数量合法性 Common.validate_basic(normalizedTopology, totalStormConf, topologyId); // don't need generate real topology, so skip Common.system_topology // Common.system_topology(totalStormConf, topology); StormClusterState stormClusterState = data.getStormClusterState(); // create /local-dir/nimbus/topologyId/xxxx files // Copy jar to /local-dir/nimbus/topologyId/stormjar.jar setupStormCode(conf, topologyId, uploadedJarLocation, stormConf, normalizedTopology); // generate TaskInfo for every bolt or spout in ZK // 为每个组件创建相应znode,并存放相应数据,数据如下: /**** { 1=TaskInfo[componentId=__acker,componentType=bolt], 2=TaskInfo[componentId=step1Bolt,componentType=bolt], 3=TaskInfo[componentId=step1Bolt,componentType=bolt], 4=TaskInfo[componentId=step1Bolt,componentType=bolt], 5=TaskInfo[componentId=step1Bolt,componentType=bolt], 6=TaskInfo[componentId=step1Bolt,componentType=bolt], 7=TaskInfo[componentId=step1Bolt,componentType=bolt], 8=TaskInfo[componentId=spout,componentType=spout], 9=TaskInfo[componentId=spout,componentType=spout] } **/ //zk的目录结构如下: //[zk: localhost:2181(CONNECTED) 20] ls /jstorm/tasks/test-3-1421404402 // [3, 2, 1, 6, 5, 4] setupZkTaskInfo(conf, topologyId, stormClusterState); // make assignments for a topology LOG.info("Submit for " + topologyname + " with conf " + serializedConf); //这里开始任务分发,任务分发由TopologyAssign完成,这步仅仅是创建一个事件对象放入队列中,然后返回 //真正的任务分发由其他线程来操作,所以这里返回比较快,除非队列是满的 // ServiceHandler中 makeAssignment(topologyname, topologyId, options.get_initial_status()); } catch (FailedAssignTopologyException e) { //异常处理代码就不贴了,见谅 } }
private void makeAssignment(String topologyName, String topologyId, TopologyInitialStatus status) throws FailedAssignTopologyException { TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTopologyId(topologyId); assignEvent.setScratch(false); assignEvent.setTopologyName(topologyName); assignEvent.setOldStatus(Thrift .topologyInitialStatusToStormStatus(status)); //这里放入事件队列中立即返回,所以提交是很快的 TopologyAssign.push(assignEvent); boolean isSuccess = assignEvent.waitFinish(); if (isSuccess == true) { LOG.info("Finish submit for " + topologyName); } else { throw new FailedAssignTopologyException( assignEvent.getErrorMsg()); } }
标签:storm jstorm 云计算 实时计算 hadoop
原文地址:http://blog.csdn.net/lihm0_1/article/details/42780873