码迷,mamicode.com
首页 > Web开发 > 详细

JStorm之Topology提交服务端

时间:2015-01-16 20:58:00      阅读:534      评论:0      收藏:0      [点我收藏+]

标签:storm   jstorm   云计算   实时计算   hadoop   

  topology提交前会先判断集群中是否存在同名作业,如果存在在提交失败,如果没有则会增加集群提交次数SubmittedCount,每次提交成功,该变量都会加1,然后会为该作业分配一个id,生成规则如下:
public static String TopologyNameToId(String topologyName, int counter) {
    return topologyName + "-" + counter + "-" + TimeUtils.current_time_secs();
}<span style="font-family: 'Courier New'; background-color: rgb(255, 255, 255);">    </span>
因此我们从作业id中就可以判断集群作业成功提交次数、提交时间、还有作业名称了,如果我们没有指定acker数量,对topology本身的校验比较细致:
1、组件id是否合法
2、是否存在同名id
3、woker数量是否合法,小于0或null
4、ack数量校验同worker一样
public void submitTopologyWithOpts(String topologyname,
		String uploadedJarLocation, String jsonConf,
		StormTopology topology, SubmitOptions options)
		throws AlreadyAliveException, InvalidTopologyException,
		TopologyAssignException, TException {
	LOG.info("Receive " + topologyname + ", uploadedJarLocation:"
			+ uploadedJarLocation);
	// @@@ Move validate topologyname in client code
	try {
		//校验集群中是否存在同名作业
		checkTopologyActive(data, topologyname, false);
	} catch (AlreadyAliveException e) {
		LOG.info(topologyname + " is already exist ");
		throw e;
	} catch (Throwable e) {
		LOG.info("Failed to check whether topology is alive or not", e);
		throw new TException(e);
	}
	//成功提交次数加1
	int counter = data.getSubmittedCount().incrementAndGet();
	String topologyId = Common.TopologyNameToId(topologyname, counter);


	try {
		//调整相关配置,增加一些默认配置项
		Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils
				.from_json(jsonConf);
		if (serializedConf == null) {
			LOG.warn("Failed to serialized Configuration");
			throw new InvalidTopologyException(
					"Failed to serilaze topology configuration");
		}


		serializedConf.put(Config.TOPOLOGY_ID, topologyId);
		serializedConf.put(Config.TOPOLOGY_NAME, topologyname);
		
		Map<Object, Object> stormConf;


		stormConf = NimbusUtils.normalizeConf(conf, serializedConf,
				topology);


		Map<Object, Object> totalStormConf = new HashMap<Object, Object>(
				conf);
		totalStormConf.putAll(stormConf);


		StormTopology normalizedTopology = NimbusUtils.normalizeTopology(
				stormConf, topology, false);


		// 校验ID、字段合法性,worker和acker数量合法性
		Common.validate_basic(normalizedTopology, totalStormConf,
				topologyId);
		// don't need generate real topology, so skip Common.system_topology
		// Common.system_topology(totalStormConf, topology);


		StormClusterState stormClusterState = data.getStormClusterState();


		// create /local-dir/nimbus/topologyId/xxxx files
		// Copy jar to /local-dir/nimbus/topologyId/stormjar.jar
		setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
				normalizedTopology);


		// generate TaskInfo for every bolt or spout in ZK
		// 为每个组件创建相应znode,并存放相应数据,数据如下:
		/****
		{
			1=TaskInfo[componentId=__acker,componentType=bolt], 
			2=TaskInfo[componentId=step1Bolt,componentType=bolt], 
			3=TaskInfo[componentId=step1Bolt,componentType=bolt], 
			4=TaskInfo[componentId=step1Bolt,componentType=bolt], 
			5=TaskInfo[componentId=step1Bolt,componentType=bolt], 
			6=TaskInfo[componentId=step1Bolt,componentType=bolt], 
			7=TaskInfo[componentId=step1Bolt,componentType=bolt], 
			8=TaskInfo[componentId=spout,componentType=spout], 
			9=TaskInfo[componentId=spout,componentType=spout]
		}
		**/
		//zk的目录结构如下:
		//[zk: localhost:2181(CONNECTED) 20] ls /jstorm/tasks/test-3-1421404402
		//   [3, 2, 1, 6, 5, 4]
		setupZkTaskInfo(conf, topologyId, stormClusterState);


		// make assignments for a topology
		LOG.info("Submit for " + topologyname + " with conf "
				+ serializedConf);
		//这里开始任务分发,任务分发由TopologyAssign完成,这步仅仅是创建一个事件对象放入队列中,然后返回
		//真正的任务分发由其他线程来操作,所以这里返回比较快,除非队列是满的
		// ServiceHandler中
		makeAssignment(topologyname, topologyId, options.get_initial_status());


	} catch (FailedAssignTopologyException e) {
		//异常处理代码就不贴了,见谅
	}


}
private void makeAssignment(String topologyName, String topologyId, 
		TopologyInitialStatus status) throws FailedAssignTopologyException {
	TopologyAssignEvent assignEvent = new TopologyAssignEvent();
	assignEvent.setTopologyId(topologyId);
	assignEvent.setScratch(false);
	assignEvent.setTopologyName(topologyName);
	assignEvent.setOldStatus(Thrift
			.topologyInitialStatusToStormStatus(status));
        //这里放入事件队列中立即返回,所以提交是很快的
	TopologyAssign.push(assignEvent);


	boolean isSuccess = assignEvent.waitFinish();
	if (isSuccess == true) {
		LOG.info("Finish submit for " + topologyName);
	} else {
		throw new FailedAssignTopologyException(
				assignEvent.getErrorMsg());
	}
}


JStorm之Topology提交服务端

标签:storm   jstorm   云计算   实时计算   hadoop   

原文地址:http://blog.csdn.net/lihm0_1/article/details/42780873

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!