码迷,mamicode.com
首页 > 其他好文 > 详细

Azkaban源码学习笔记(一)

时间:2016-07-29 18:51:30      阅读:209      评论:0      收藏:0      [点我收藏+]

标签:

1. ConnectorParams (interface): 定义了各种常量参数,没有声明任何方法。

2. ExecutorServlet.java类
  2.1 继承类HttpServlet和接口ConnectorParams,用于处理Http请求,主要是Get请求,处理方式都写在doGet方法中。
  2.2 init()方法:创建AzkabanExecutorServer实例,通过该executor server实例获取flowRunnerManager,以及jobRunnerManager。
  2.3 doGet(HttpServletRequest req, HttpServletResponse resp)方法:处理具体的请求,并返回resp。 针对不同的action,进行不同的处理。
        action可以分为两类:
        第一类不需要获取execid和user,有三个action,分别是:update,ping,reloadJobTypePlugins;
        第二类action,会先获取execid和user,包含:metadata,metadata_jobRunnerMgr,log,attachments,execute,status,cancel,pause,resume,modifyExecution,job_execute,job_cancel
 
        action=execute时,ExecutorServlet类调handleAjaxExecute()方法去调flowRunnerManager.submitFlow(execId)来执行该工作流。 
 
3. FlowRunnerManager.java类
    private Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<Integer, FlowRunner>();   记录当前正在执行的Flows,key是execId
    private Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
    void submitFlow(int execId) 方法:
        先判断runningFlows是不是已经包含该execId对应的作业流,获取execId对应的executableFlow实例flow,然后执行setupFlow(flow)配置flow(创建项目和执行的目录等)。
        
 
4. EventListener接口,声明了一个方法:void handleEvent(Event event)
5. EventHandler.java类:包含一个HashSet<EventListener>,包含方法addListener,fireEventListener(该方法调用每个listener.handleEvent()),removeListener。
6. ExecutableFlow.java: 包含可执行流的相关信息和设置信息的方法。
 
7.pipelineExecId:pipeline就是并发策略里的流水线,而pipelineExecId指该次execId对应的flow所有正在执行的其他实例中最后次提交的实例的execId
8. ExecutorManager.java类:
    8.1 public String submitExecutableFlow(ExecutableFlow exflow, String userId)方法:将作业流提交到执行队列
          方法过程:
          根据exflow获取flowId -》判断queuedFlows是否满-》
            queuedFlows满了-》提交失败,打出log提示error
            queuedFlows未满-》获取该流正在跑的runningflows,获取流执行设置(ExecutionOptions)-》获取流的执行参数(是否enable,如果enable则将参数生效)-》判断runningflows是否为空,如果不为空-》获取并发设置
            并发设置:流水线(pipeline)-》设置流水线执行Id(PipelineExecutionId)为正在执行的最后次提交的执行流id,获取pipeline level
            并发设置:忽略本次执行(skip)-》抛出异常ExecutorManagerException,给出提示该流已经有实例已经在执行,本次执行被skip了
            并发设置:并行执行-》仅修改message提示
            -》白名单设置?options.setMemoryCheck(memoryCheck);-》判断是否多节点模式(isMultiExecutorMode)
            多节点模式:是-》将该flow记录为正在执行的flow(executorLoader.addActiveExecutableReference(reference);),将作业流放入队列queuedFlows.enqueue(exflow, reference);
            多节点模式:否-》将该flow记录为正在执行的flow,选择本地executor,下发作业流dispatch(reference, exflow, choosenExecutor);
 
9. ExecutableFlow.java类:一个可执行流的相关信息。                  
10.ExecutionReference.java类:存储execId,executor,updateTime,nextCheckTime,numErrors信息;一个具体的执行实例
11. FlowWatcher.java类:检测某个execId的各个作业的执行状态。
    private int execId;
    private ExecutableFlow flow;
    private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>();   该map用于存储各个job的执行状态,key为jobId,value为job的状态
12. BlockingStatus.java类:管理特定作业的状态,以同步的方式改变作业的状态。当状态处于block状态时,线程会处于等待状态,等待其他线程的通知(notify),最多等待时长为5分钟。
      private static final long WAIT_TIME = 5 * 60 * 1000;
      private final int execId;
      private final String jobId;
      private Status status;

Azkaban源码学习笔记(一)

标签:

原文地址:http://www.cnblogs.com/znicy/p/5719304.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!