标签:tar integer strong use esc eject 类型 请求 cep
这里的ApplicationMaster只负责管理一个Framework。因此它也被设计成一个micro kernel来连接所有的subservice,这些subservice有:
ZookeeperStore、hdfsStore
YarnClient、LauncherClient
AMRMClientAsync、NMClientAsync
StatusManager、RequestManager
RMResyncHandler、SelectionManager
在下面的流程中,主要用到的subService有AMRMClientAsync、NMClientAsync、StatusManager及RequestManager。
除了上面的subservice外,ApplicationMaster中还有的成员是:
conf -- 一些配置
SystemTaskQueue -- 是一个只有一个线程的SchedualThreadPool类型的线程池
Map<String, Container> allocatedContainers -- 记录所有被分到的container
Map<String, > -- 记录每个container的超出数???
由于每个自定义的ApplicationMaster端都需要实现AMCallbackHandler和NMCallbackHandler,而在AMCallbackHandler中有onContainersAllocated()方法,我们进入到这个方法中看container分配下来后,AM做了些什么。
从下面代码中可以看到,是把allocateContainers(containers) 这个任务扔给上面说的SystemTaskQueue类型的线程池去执行。当然扔完之后会检查一下等待队列的任务是否超出分配值了。
public void onContainersAllocated(List<Container> containers) { if (containers.size() <= 0) { return; } LOGGER.logInfo( "onContainersAllocated: Allocated Containers: %s.", containers.size()); transitionTaskStateQueue.queueSystemTask(() -> { allocateContainers(containers); }); }
接下来看allocateContaienrs(containers)。这是一批containers,那么用一个for循环来对每一个container调用allocateContainer(container)方法。
这其中对container的启动方式有两种:一种是一个Task分到一个Container就起一个,另一种是等所有Task都分配到container了,再一起起。如果是第二种的情况,那么在每个container的working directory下面会有一个文件存放其他所有Task所在container的IP地址。
Boolean generateContainerIpList 就是判断是采用上面的哪一种情况。
下面的代码是具体的流程:
private void allocateContainer(Container container) throws Exception { String containerId = container.getId().toString(); Boolean generateContainerIpList = requestManager.getPlatParams().getGenerateContainerIpList(); LOGGER.logInfo( "[%s]: allocateContainer: Try to Allocate Container to Task: Container: %s", containerId, HadoopExts.toString(container)); // 0. findTask TaskStatus taskStatus = findTask(container); if (taskStatus == null) { LOGGER.logDebug( "[%s]: Cannot find a suitable Task to accept the Allocate Container. It should be exceeded.", containerId); tryToReleaseContainer(containerId); return; } String taskRoleName = taskStatus.getTaskRoleName(); TaskStatusLocator taskLocator = new TaskStatusLocator(taskRoleName, taskStatus.getTaskIndex()); // 1. removeContainerRequest removeContainerRequest(taskStatus); // 2. testContainer if (!testContainer(container)) { LOGGER.logInfo( "%s[%s]: Container is Rejected, Release Container and Request again", taskLocator, containerId); tryToReleaseContainer(containerId); statusManager.transitionTaskState(taskLocator, TaskState.TASK_WAITING); addContainerRequest(taskStatus); return; } // 3. allocateContainer try { statusManager.transitionTaskState(taskLocator, TaskState.CONTAINER_ALLOCATED, new TaskEvent().setContainer(container)); LOGGER.logInfo("%s[%s]: Succeeded to Allocate Container to Task", taskLocator, containerId); if (containerConnectionExceedCount.containsKey(containerId)) { // Pending Exceed Container now is settled to live associated Container containerConnectionExceedCount.remove(containerId); } } catch (Exception e) { LOGGER.logWarning(e, "%s[%s]: Failed to Allocate Container to Task, Release Container and Request again", taskLocator, containerId); tryToReleaseContainer(containerId); statusManager.transitionTaskState(taskLocator, TaskState.TASK_WAITING); addContainerRequest(taskStatus); return; } // 4. launchContainer if (!generateContainerIpList) { launchContainer(taskStatus, container); } else { allocatedContainers.put(containerId, container); int neverBeenAllocatedTaskCount = statusManager.getTaskCount( new HashSet<>(Arrays.asList(TaskState.TASK_WAITING, TaskState.CONTAINER_REQUESTED))); if (neverBeenAllocatedTaskCount == 0) { launchContainersTogether(); } else { LOGGER.logInfo( "Waiting for %s never been CONTAINER_ALLOCATED Tasks to become CONTAINER_ALLOCATED, " + "since GenerateContainerIpList enabled", neverBeenAllocatedTaskCount); } } }
跟着注释的说明四步走:
0、给这个container找到一个合适的Task,如果找到了,那么返回一个TaskStatus对象,稍后设置container运行的环境、command、本地资源等都有这个TaskStatus去帮助取得。如果找不到,那么释放掉container。
1、删除这个Task的container请求。
2、检查container是否可用。(不明白这个顺序,为什么不先检查呢?这里检查跟Task看起来没有关系呀)
3、分配container,这里StatusManager负责将该Task的状态标记为CONTAINER_ALLOCATED,同时new 一个 TaskEvent对象来记录Task和该Container的一些信息。
4、运行container。分上述两种情况:
1) 不用产生ContaienrIpList,那就直接调用launchContainer()方法跑起这个container。
2) 需要产生ContainerIpList,那么先把这个container记录到allocatedContainers中,再去看看其他人情况怎么样了,即查看状态是TASK_WAITING和CONTAINER_REQUESTED得Task数量。如果数量为0,那么launchContaienrsTogether,一起跑。数量不为0,那么代码中只是日志记录了一下,不做其他事情了。
方法的传入参数是TaskStatus和Container两个对象。设置好运行环境后通过NMClientAsync启动container,然后StatusManager将Task状态变为CONTAINER_LAUNCHED.
private void launchContainer(TaskStatus taskStatus, Container container) throws Exception { String taskRoleName = taskStatus.getTaskRoleName(); TaskStatusLocator taskLocator = new TaskStatusLocator(taskRoleName, taskStatus.getTaskIndex()); String containerId = container.getId().toString(); assert containerId.equals(taskStatus.getContainerId()); LOGGER.logInfo("%s[%s]: launchContainer", taskLocator, containerId); ContainerLaunchContext launchContext = setupContainerLaunchContext(taskStatus); nmClient.startContainerAsync(container, launchContext); statusManager.transitionTaskState(taskLocator, TaskState.CONTAINER_LAUNCHED); }
设置运行环境ContainerLaunchContext,三样:command(也就是entryPoint)、localresources和envrioments。
private ContainerLaunchContext setupContainerLaunchContext(TaskStatus taskStatus) throws Exception { String taskRoleName = taskStatus.getTaskRoleName(); Integer taskIndex = taskStatus.getTaskIndex(); Integer serviceVersion = getServiceVersion(taskRoleName); UserDescriptor user = requestManager.getUser(); Boolean generateContainerIpList = requestManager.getPlatParams().getGenerateContainerIpList(); List<String> sourceLocations = requestManager.getTaskServices().get(taskRoleName).getSourceLocations(); String entryPoint = requestManager.getTaskServices().get(taskRoleName).getEntryPoint(); // SetupLocalResources Map<String, LocalResource> localResources = new HashMap<>(); try { for (String location : sourceLocations) { HadoopUtils.addToLocalResources(localResources, location); } } catch (Exception e) { // User is likely to set an invalid SourceLocations, and it contains HDFS OP, // so handle the corresponding Exception ASAP handleException(e); } if (generateContainerIpList) { String location = hdfsStore.getHdfsStruct().getContainerIpListFilePath(conf.getFrameworkName()); HadoopUtils.addToLocalResources(localResources, location); } // SetupLocalEnvironment Map<String, String> localEnvs = new HashMap<>(); localEnvs.put(GlobalConstants.ENV_VAR_HADOOP_USER_NAME, user.getName()); localEnvs.put(GlobalConstants.ENV_VAR_FRAMEWORK_NAME, conf.getFrameworkName()); localEnvs.put(GlobalConstants.ENV_VAR_FRAMEWORK_VERSION, conf.getFrameworkVersion().toString()); localEnvs.put(GlobalConstants.ENV_VAR_TASK_ROLE_NAME, taskRoleName); localEnvs.put(GlobalConstants.ENV_VAR_TASK_INDEX, taskIndex.toString()); localEnvs.put(GlobalConstants.ENV_VAR_SERVICE_VERSION, serviceVersion.toString()); localEnvs.put(GlobalConstants.ENV_VAR_ZK_CONNECT_STRING, conf.getZkConnectString()); localEnvs.put(GlobalConstants.ENV_VAR_ZK_ROOT_DIR, conf.getZkRootDir()); localEnvs.put(GlobalConstants.ENV_VAR_ZK_COMPRESSION_ENABLE, conf.getZkCompressionEnable().toString()); localEnvs.put(GlobalConstants.ENV_VAR_AM_VERSION, conf.getAmVersion().toString()); localEnvs.put(GlobalConstants.ENV_VAR_APP_ID, conf.getApplicationId()); localEnvs.put(GlobalConstants.ENV_VAR_ATTEMPT_ID, conf.getAttemptId()); localEnvs.put(GlobalConstants.ENV_VAR_CONTAINER_GPUS, taskStatus.getContainerGpus().toString()); if (generateContainerIpList) { // Since one machine may have many external IPs, we assigned a specific one to // help the UserService to locate itself in CONTAINER_IP_LIST_FILE localEnvs.put(GlobalConstants.ENV_VAR_CONTAINER_IP, taskStatus.getContainerIp()); } // SetupEntryPoint String command = String.format( "%1$s 1>%2$sstdout 2>%2$sstderr", entryPoint, ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator); ContainerLaunchContext launchContext = Records.newRecord(ContainerLaunchContext.class); launchContext.setLocalResources(localResources); launchContext.setCommands(Collections.singletonList(command)); launchContext.setServiceData(new HashMap<>()); launchContext.setEnvironment(localEnvs); return launchContext; }
可以看到,command、localResources都是用requestManager根据Task的TaskRoleName获取到的。
而且如果有需要ContaienrIpList的话,还会去hdfs上把这个文件下到本地中。
比上面的多了一步,就是通过StatusManager获取到所有状态为CONTAINER_ALLOCATED的Task,将他们的container IP写入一个文件,并上传到hdfs。
随后,用for循环对每个Task调用launchContainer()方法。
代码如下:
private void launchContainersTogether() throws Exception { List<TaskStatus> taskStatuses = statusManager.getTaskStatus( new HashSet<>(Collections.singletonList(TaskState.CONTAINER_ALLOCATED))); Boolean generateContainerIpList = requestManager.getPlatParams().getGenerateContainerIpList(); LOGGER.logInfo("launchContainersTogether: %s Tasks", taskStatuses.size()); if (generateContainerIpList) { StringBuilder fileContent = new StringBuilder(); for (TaskStatus taskStatus : taskStatuses) { fileContent.append(taskStatus.getContainerIp()); fileContent.append("\n"); } CommonUtils.writeFile(GlobalConstants.CONTAINER_IP_LIST_FILE, fileContent.toString()); try { hdfsStore.uploadContainerIpListFile(conf.getFrameworkName()); HadoopUtils.invalidateLocalResourcesCache(); } catch (Exception e) { // It contains HDFS OP, so handle the corresponding Exception ASAP handleException(e); } } for (TaskStatus taskStatus : taskStatuses) { String containerId = taskStatus.getContainerId(); assert allocatedContainers.containsKey(containerId); launchContainer(taskStatus, allocatedContainers.get(taskStatus.getContainerId())); } }
over。感觉理解AM重点是要知道里面各个SubService的分工。
PAI FrameworkLauncher(2) -- AM简介 + container分配下来后,AM做了什么?
标签:tar integer strong use esc eject 类型 请求 cep
原文地址:https://www.cnblogs.com/chenqy930/p/9468810.html