标签:告诉 spring框架 加载 功能 书签 产生 manifest stun ogg
在第一节中,我们简单讲了一下Ambari的系统架构。我们这一节主要分析Ambari的源代码,总览Ambari的具体实现方式及其工作细节。
一、Ambari-Server启动
Ambari-Server是一个WEB Server,提供统一的REST API接口,同时向web和agent开放了两个不同的端口(默认前者是8080, 后者是8440或者8441)。它是由Jetty Server容器构建起来的,通过Spring Framework构建出来的WEB服务器,其中大量采用了google提供的Guice注解完成spring框架所需要的注入功能(想一想,之前spring框架需要加载一个applicationcontext.xml文件来把bean注入进来,现在可以用Guice注解的方式就可以轻松完成)。 REST框架由JAX-RS标准来构建。
Ambari-Server接受来自两处的REST请求,Agent过来的请求处理逻辑由包org.apache.ambari.server.agent处理, 而API所的处理逻辑来自org.apache.ambari.server.api。详见如下代码:
- ServletHolder sh = new ServletHolder(ServletContainer.class);
-
- sh.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
- "com.sun.jersey.api.core.PackagesResourceConfig");
-
- sh.setInitParameter("com.sun.jersey.config.property.packages",
- "org.apache.ambari.server.api.rest;" +
- "org.apache.ambari.server.api.services;" +
- "org.apache.ambari.eventdb.webservice;" +
- "org.apache.ambari.server.api");
- sh.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
- "true");
- root.addServlet(sh, "/api/v1/*");
- sh.setInitOrder(2);
-
-
- ServletHolder agent = new ServletHolder(ServletContainer.class);
- agent.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
- "com.sun.jersey.api.core.PackagesResourceConfig");
- agent.setInitParameter("com.sun.jersey.config.property.packages",
- "org.apache.ambari.server.agent.rest;" + "org.apache.ambari.server.api");
- agent.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
- "true");
- agentroot.addServlet(agent, "/agent/v1/*");
- agent.setInitOrder(3);
-
-
- ServletHolder cert = new ServletHolder(ServletContainer.class);
- cert.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
- "com.sun.jersey.api.core.PackagesResourceConfig");
- cert.setInitParameter("com.sun.jersey.config.property.packages",
- "org.apache.ambari.server.security.unsecured.rest;" + "org.apache.ambari.server.api");
- cert.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
- "true");
- agentroot.addServlet(cert, "/*");
- cert.setInitOrder(4);
-
-
- ServletHolder resources = new ServletHolder(ServletContainer.class);
- resources.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
- "com.sun.jersey.api.core.PackagesResourceConfig");
- resources.setInitParameter("com.sun.jersey.config.property.packages",
- "org.apache.ambari.server.resources.api.rest;" + "org.apache.ambari.server.api");
- resources.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
- "true");
- root.addServlet(resources, "/resources/*");
- resources.setInitOrder(6)
正如上一节所述,Ambari-Server有一个状态机管理模块,所有节点的状态信息更改都最终提供给状态机进行更改操作,因此状态机是一个很忙的组件。在Ambari-Server里面,把每一次更改操作都把它当作是一类事件,采用事件驱动机制完成对应的任务。这种思想有点借鉴已经运用在hadoop 2.x YARN里面的事件驱动机制。事件驱动机制能够一种高效的异步RPC请求方式,直接调用需要执行相应的代码逻辑,而事件驱动只需要产生事件统一提交给事件处理器,因此事件驱动需要一个更复杂的有限状态机结合起来一同使用。
二、Ambari-Server处理Ambari-Agent请求
Agent发送过来的心跳请求由org.apache.ambari.server.agent.HeartBeatHandler.handleHeartBeat(HeartBeat)来处理,执行完后,同时会返回org.apache.ambari.server.agent.HeartBeatResponse给agent。 org.apache.ambari.server.agent.HeartBeat里面主要含了两类信息:节点的状态信息nodeStatus和服务状态信息componentStatus。
- public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
- throws AmbariException {
- String hostname = heartbeat.getHostname();
- Long currentResponseId = hostResponseIds.get(hostname);
- HeartBeatResponse response;
- if (currentResponseId == null) {
-
- LOG.error("CurrentResponseId unknown - send register command");
- return createRegisterCommand();
- }
-
- LOG.info("Received heartbeat from host"
- + ", hostname=" + hostname
- + ", currentResponseId=" + currentResponseId
- + ", receivedResponseId=" + heartbeat.getResponseId());
-
- if (heartbeat.getResponseId() == currentResponseId - 1) {
- LOG.warn("Old responseId received - response was lost - returning cached response");
- return hostResponses.get(hostname);
- } else if (heartbeat.getResponseId() != currentResponseId) {
- LOG.error("Error in responseId sequence - sending agent restart command");
- return createRestartCommand(currentResponseId);
-
- }
-
- response = new HeartBeatResponse();
- response.setResponseId(++currentResponseId);
- Host hostObject = clusterFsm.getHost(hostname);
-
- if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
-
- LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
- return createRegisterCommand();
- }
-
- hostResponseIds.put(hostname, currentResponseId);
- hostResponses.put(hostname, response);
-
- long now = System.currentTimeMillis();
- HostState hostState = hostObject.getState();
-
- if (heartbeat.componentStatus.size() > 0
- && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
- try {
- LOG.debug("Got component status updates");
- hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
- } catch (InvalidStateTransitionException e) {
- LOG.warn("Failed to notify the host about component status updates", e);
- }
- }
-
- try {
- if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
- hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
- heartbeat.getAgentEnv()));
- } else {
- hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now,
- null));
- }
- if (hostState != hostObject.getState()) scanner.updateHBaseMaster(hostObject);
- } catch (InvalidStateTransitionException ex) {
- LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
- hostObject.setState(HostState.INIT);
- return createRegisterCommand();
- }
-
-
- processCommandReports(heartbeat, hostname, clusterFsm, now);
-
-
- processStatusReports(heartbeat, hostname, clusterFsm);
-
-
- if (hostObject.getState().equals(HostState.HEALTHY)) {
- sendCommands(hostname, response);
- }
- return response;
- }
下面我们学习一下Ambari-Agent是如何处理heartbeat请求的。agent是由Python代码所写,每个节点上都会有一个python的daemon进程与server进行交互。
三、Ambari-Agent执行流程
安装ambari-agent 服务时会把相应在的python代码置于python执行的环境上下文中,例如其入口代码可能是/usr/lib/python2.6/site-packages/ambari_agent/main.py,并且进行相关初始化工作(例如验证参数,与server建立连接,初始化安全验证证书),最后会产生一个新的控制器Controller子线程来统一管理节点的状态。Controller线程里面有一个动作队列ActionQueue线程,并且开启向Server注册和发心跳服务。可以看出来,ambari-agent主要由两个线程组成,Controller线程向Server发送注册或心跳请求,请求到的Action数据放到ActionQueue线程里面,ActionQueue线程维护着两个队列:commandQueue和resultQueue。ActionQueue线程会监听commandQueue的状况。
- class Controller(threading.Thread):
- def __init__(self, config, range=30): // 在初始化Controller之前,ambari-agent就会在main.py里面进行判断:ambari-server是否正常,正常才会初始化Controller
- // 省略初始化代码
- def run(self):
- self.actionQueue = ActionQueue(self.config) // 初始化队列线程
- self.actionQueue.start()
- self.register = Register(self.config) // 初始化注册类
- self.heartbeat = Heartbeat(self.actionQueue) // 初始化心跳类
-
- opener = urllib2.build_opener()
- urllib2.install_opener(opener)
-
- while True:
- self.repeatRegistration = False
- self.registerAndHeartbeat() //开始注册 并且 定时发心跳
- if not self.repeatRegistration:
- break
-
- pass
CommandQueue队列主要有3类command:
- REGISTER_COMMAND:该类命令主要通知agent重新向server发送注册请求。
- STATUS_COMMAND:该类命令主要告诉agent需要向server发送某组件的状态信息。
- EXECUTION_COMMAND:要求agent执行puppet或者软件集升级任务
ActionQueue线程在执行STATUS_COMMAND时,会通过LiveStatus类构建一个StatusCheck检测器,并且通过ps命令来检测该组件是否是活着。
- def getIsLive(self, pidPath):
- // ....
- //检测该组件pid文件是否存在...
- res = self.sh.run([‘ps -p‘, str(pid), ‘-f‘]) //运行shell命令,检测该进程是否存在
- lines = res[‘output‘].strip().split(os.linesep)
- try:
- procInfo = lines[1]
- isLive = not procInfo == None
- except IndexError:
- logger.info(‘Process is dead‘)
- return isLive
ActionQueue线程在执行EXECUTION_COMMAND任务时,通常是用于执行相关Puppet任务,它会在[agent].prefix目录下产生一个puppet文件,然后执行puppet apply命令执行一批puppet module文件完成配置更改和节点管理任务。
- def runCommand(self, command, tmpoutfile, tmperrfile):
- taskId = 0
- if command.has_key("taskId"):
- taskId = command[‘taskId‘]
- siteppFileName = os.path.join(self.tmpDir, "site-" + str(taskId) + ".pp") // self.tmpdir是ambari-agent.ini里面配置的agent.prefix参数, site-{taskId:int}.pp文件里面主要是一组服务的配置参数
- generateManifest(command, siteppFileName, self.modulesdir, self.config) //生成一个puppet配置文件
- result = self.run_manifest(command, siteppFileName, tmpoutfile, tmperrfile) // 会根据command命令里repo_info参数值,执行相应的puppet命令
- return result
Ambari深入学习(II)-实现细节
标签:告诉 spring框架 加载 功能 书签 产生 manifest stun ogg
原文地址:https://www.cnblogs.com/felixzh/p/10899575.html