标签:style blog color io os 使用 ar for 2014
package org.apache.hadoop.event; public enum JobEventType { JOB_KILL, JOB_INIT, JOB_START }
package org.apache.hadoop.event; public enum TaskEventType { T_KILL, T_SCHEDULE }
package org.apache.hadoop.event; import org.apache.hadoop.yarn.event.AbstractEvent; public class JobEvent extends AbstractEvent<JobEventType> { private String jobID; public JobEvent(JobEventType type,String jobID) { super(type); this.jobID=jobID; } public String getJobID() { return jobID; } }
package org.apache.hadoop.event; import org.apache.hadoop.yarn.event.AbstractEvent; public class TaskEvent extends AbstractEvent<TaskEventType> { private String taskID; //TASkID public TaskEvent(TaskEventType type,String taskID) { super(type); this.taskID=taskID; } public String getTaskID() { return taskID; } }
package org.apache.hadoop.event; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; public class SimpleMRAppMaster extends CompositeService { private Dispatcher dispatcher; //中央异步调度器 private String jobID; private int taskNumber; //作业中包含的任务数 private String[] taskIDS; //该作业中包含的所有任务 public SimpleMRAppMaster(String name,String jobID,int taskNumber) { super(name); this.jobID=jobID; this.taskNumber=taskNumber; this.taskIDS=new String[taskNumber]; for(int i=0;i<taskNumber;i++){ this.taskIDS[i]=new String(jobID+"_task_"+i); } } @Override protected void serviceInit(Configuration conf) throws Exception { dispatcher=new AsyncDispatcher(); dispatcher.register(JobEventType.class, new JobEventHandller()); dispatcher.register(TaskEventType.class, new TaskEventHandller()); addService((Service)dispatcher); super.serviceInit(conf); } public Dispatcher getDispatcher(){ return dispatcher; } private class JobEventHandller implements EventHandler<JobEvent>{ @Override public void handle(JobEvent event) { //若收到 杀死 作业 事件 if(event.getType() == JobEventType.JOB_KILL){ System.out.println("收到 杀死作业事件 ,要 杀掉作业"+event.getJobID()+"下的所有任务"); for(int i=0;i<=taskNumber;i++){ dispatcher.getEventHandler().handle(new TaskEvent(TaskEventType.T_KILL, taskIDS[i])); } }else if(event.getType()== JobEventType.JOB_INIT){ System.out.println("收到 启动作业事件 ,要启动 作业"+event.getJobID()+"下的所有任务"); for(int i=0;i<=taskNumber;i++){ dispatcher.getEventHandler().handle(new TaskEvent(TaskEventType.T_SCHEDULE, taskIDS[i])); } } } } private class TaskEventHandller implements EventHandler<TaskEvent>{ @Override public void handle(TaskEvent event) { if(event.getType()==TaskEventType.T_KILL){ System.out.println("收到杀死任务命令,开始杀死任务"+event.getTaskID()); }else if(event.getType()==TaskEventType.T_SCHEDULE){ System.out.println("收到启动任务命令,开始启动任务"+event.getTaskID()); } } } }
package org.apache.hadoop.event; import static org.junit.Assert.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Test; /** * 自己写的 关于事件库 和 服务 库的使用 * @author joqk * */ public class SimpleMRAppMasterTest { @Test public void test() throws Exception { String jobID="job_20140912_01"; SimpleMRAppMaster appMaster=new SimpleMRAppMaster("作业测试", jobID, 10); YarnConfiguration conf = new YarnConfiguration(new Configuration()); appMaster.serviceInit(conf); appMaster.start(); appMaster.getDispatcher().getEventHandler().handle(new JobEvent(JobEventType.JOB_INIT, jobID)); } }
标签:style blog color io os 使用 ar for 2014
原文地址:http://www.cnblogs.com/joqk/p/3968912.html