标签:des style blog http io os ar 使用 java
NodeManager(NM)是YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务。
NodeManager整体架构:
接下来将按照启动NodeManager时代码执行的顺序为主线进行代码分析。
主要代码:
public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); //打印Nodemangager启动和关闭时的日志信息 StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); //创建NodeManager对象 NodeManager nodeManager = new NodeManager(); //加载配置文件初始化 Configuration conf = new YarnConfiguration(); //初始化并启动NodeManager nodeManager.initAndStartNodeManager(conf, false); }
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) { try { // Remove the old hook if we are rebooting. if (hasToReboot && null != nodeManagerShutdownHook) { ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook); } //增加nodeManagerShutdownHook为了在NodeManager关闭或重启时关闭compositeService nodeManagerShutdownHook = new CompositeServiceShutdownHook(this); ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook, SHUTDOWN_HOOK_PRIORITY); //调用init()函数,进行初始化(init方法调用被重写的serviceInit方法进行初始化) this.init(conf); //启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务) this.start(); } catch (Throwable t) { LOG.fatal("Error starting NodeManager", t); System.exit(-1); } }
(1)init方法是从Service接口,在AbstractService抽象类中得到实现。在AbstractService类中的init方法调用protected 类型的serviceInit。在其子类NodeMananger中重写了serviceInit方法。
AbstractService抽象类中init方法实现:
@Override public void init(Configuration conf) { if (conf == null) { throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); } if (isInState(STATE.INITED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.INITED) != STATE.INITED) { setConfig(conf); try { serviceInit(config); if (isInState(STATE.INITED)) { //if the service ended up here during init, //notify the listeners notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } }
(2)NodeManager类中serviceInit方法中是添加一些服务。具体如下:
主要代码:
@Override protected void serviceInit(Configuration conf) throws Exception { conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); NMContainerTokenSecretManager containerTokenSecretManager = new NMContainerTokenSecretManager(conf); NMTokenSecretManagerInNM nmTokenSecretManager = new NMTokenSecretManagerInNM(); this.aclsManager = new ApplicationACLsManager(conf); //始化ContainerExecutor,ContainerExecutor封装了nodeManager对Container操作的各种方法, //包括启动container, 查询指定id的container是否活着,等操作. 根据配置yarn.nodemanager.container-executor.class //决定ContainerExecutor的实例, 默认为DefaultContainerExecutor. ContainerExecutor exec = ReflectionUtils.newInstance( conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, DefaultContainerExecutor.class, ContainerExecutor.class), conf); try { exec.init(); } catch (IOException e) { throw new YarnRuntimeException("Failed to initialize container executor", e); } DeletionService del = createDeletionService(exec); addService(del); // NodeManager level dispatcher 异步分发器 this.dispatcher = new AsyncDispatcher(); //可以通过此服务查询node是否健康, 当前node的健康状态包括nodeHealthScriptRunner.isHealthy和dirsHandler.areDisksHealthy nodeHealthChecker = new NodeHealthCheckerService(); addService(nodeHealthChecker); dirsHandler = nodeHealthChecker.getDiskHandler(); this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager); //创建NodeStatusUpdater线程, 负责向RM注册和发送心跳(更新状态). //这里使用ResourceTracker协议向RM通信, 底层为YarnRPC. ResourceTracker接口提供了两个方法; 提供注册和心跳功能 nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); // 监控node的资源(即资源是否可用, 四种状态, stopped, inited, notinited, started) NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); //创建ContainerManagerImpl服务, 管理container,使用ContainerManager协议, ContainerManager协议为APP向NodeManager通信的协议 containerManager = createContainerManager(context, exec, del, nodeStatusUpdater, this.aclsManager, dirsHandler); addService(containerManager); ((NMContext) context).setContainerManager(containerManager); // 创建webServer, 启动NodeManager的web服务. 通过yarn.nodemanagerwebapp.address设置地址, 默认端口为8042 WebServer webServer = createWebServer(context, containerManager .getContainersMonitor(), this.aclsManager, dirsHandler); addService(webServer); ((NMContext) context).setWebServer(webServer); dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); addService(dispatcher); //初始化监控 DefaultMetricsSystem.initialize("NodeManager"); // StatusUpdater should be added last so that it get started last // so that we make sure everything is up before registering with RM. addService(nodeStatusUpdater); super.serviceInit(conf); // TODO add local dirs to del }
AbstractService中start方法的具体实现:
@Override public void start() { if (isInState(STATE.STARTED)) { return; } //enter the started state synchronized (stateChangeLock) { if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) { try { startTime = System.currentTimeMillis(); //会调用子类NN中重写的同名方法 serviceStart(); if (isInState(STATE.STARTED)) { //if the service started (and isn't now in a later state), notify if (LOG.isDebugEnabled()) { LOG.debug("Service " + getName() + " is started"); } notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } }
NodeManager中重写的serviceStart方法的主要代码:
@Override protected void serviceStart() throws Exception { try { doSecureLogin(); } catch (IOException e) { throw new YarnRuntimeException("Failed NodeManager login", e); } super.serviceStart(); }
http://www.technology-mania.com/2014/05/an-insight-into-hadoop-yarn-nodemanager.html
http://www.cnblogs.com/biyeymyhjob/archive/2012/08/18/2645576.html
NodeManager代码分析之NodeManager启动过程
标签:des style blog http io os ar 使用 java
原文地址:http://blog.csdn.net/wuwenxiang91322/article/details/40384471