private void makeAssignment(String topologyName, String topologyId, TopologyInitialStatus status) throws FailedAssignTopologyException { //1、创建topology的分配事件 TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTopologyId(topologyId); assignEvent.setScratch(false); assignEvent.setTopologyName(topologyName); assignEvent.setOldStatus(Thrift .topologyInitialStatusToStormStatus(status)); //2、丢入事件处理队列 TopologyAssign.push(assignEvent); //3、等待时间返回 boolean isSuccess = assignEvent.waitFinish(); if (isSuccess == true) { LOG.info("Finish submit for " + topologyName); } else { throw new FailedAssignTopologyException( assignEvent.getErrorMsg()); } }
public void run() { LOG.info("TopologyAssign thread has been started"); runFlag = true; while (runFlag) { TopologyAssignEvent event; try { event = queue.take(); } catch (InterruptedException e1) { continue; } if (event == null) { continue; } boolean isSuccess = doTopologyAssignment(event); .............. }
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception { String topologyId = event.getTopologyId(); LOG.info("Determining assignment for " + topologyId); TopologyAssignContext context = prepareTopologyAssign(event); Set<ResourceWorkerSlot> assignments = null; if (!StormConfig.local_mode(nimbusData.getConf())) { IToplogyScheduler scheduler = schedulers .get(DEFAULT_SCHEDULER_NAME); //开始进行作业的调度 assignments = scheduler.assignTasks(context); } else { assignments = mkLocalAssignment(context); } ............ }
private void putWorkerToSupervisor(List<ResourceWorkerSlot> result, List<SupervisorInfo> supervisors) { int key = 0; //按所需槽位遍历,每次分配一个 for (ResourceWorkerSlot worker : result) { //首先进行必要的判断和置位 if (supervisors.size() == 0) return; if (worker.getNodeId() != null) continue; if (key >= supervisors.size()) key = 0; //1、取出第一个supervisor SupervisorInfo supervisor = supervisors.get(key); worker.setHostname(supervisor.getHostName()); worker.setNodeId(supervisor.getSupervisorId()); worker.setPort(supervisor.getWorkerPorts().iterator().next()); //槽位用完则从集合中删除,不再参与分配 supervisor.getWorkerPorts().remove(worker.getPort()); if (supervisor.getWorkerPorts().size() == 0) supervisors.remove(supervisor); //当一个supervisor分配完后便不再使用,除非supervisor不够用 key++; } }
private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> result, List<SupervisorInfo> supervisors) { ........... supervisors = this.getCanUseSupervisors(supervisors); Collections.sort(supervisors, new Comparator<SupervisorInfo>() { @Override public int compare(SupervisorInfo o1, SupervisorInfo o2) { // TODO Auto-generated method stub return -NumberUtils.compare(o1.getWorkerPorts().size(), o2 .getWorkerPorts().size()); } }); this.putWorkerToSupervisor(result, supervisors); ............. }可以看到,当前排序规则是按slot多少的,我们后续版本中可能会考虑机器负载的一些因素吧。
原文地址:http://blog.csdn.net/lihm0_1/article/details/44310041