Fair Scheduler中的Delay Schedule分析

  延迟调度的主要目的是提高数据本地性(data locality),减少数据在网络中的传输。对于那些输入数据不在本地的MapTask,调度器将会延迟调度他们,而把slot分配给那些具备本地性的MapTask。




  否则等待超过nodeLocalityDelay + rackLocalityDelay之后,重新寻找一个off-switch的MapTask并返回。



1 long nodeLocalityDelay://node-local已经等待的时间
2 long rackLocalityDelay: //rack-local已经等待的时间
3 boolean skippedAtLastHeartbeat://该job是否被延迟调度(是否被跳过)
4 timeWaitedForLocalMap://自从上次MapTask被分配以来等待的时间
5 LocalityLevel lastMapLocalityLevel://上次分配的MapTask对应的本地级别
6 nodeLocalityDelay = rackLocalityDelay =
7   Math.min(15000 ,  (long) (1.5 * jobTracker.getNextHeartbeatInterval()));


  在fair scheduler中,每个job维护了两个变量用来完成延迟调度:最后一个被调度的MapTask的本地性级别(lastMapLocalityLevel)与自从这个job被跳过以来所等待的时间(timeWaitedForLocalMap)。工作流程如下(具体工作在FairScheduler.java的getAllowedLocalityLevel ()方法中完成):

 1 /**
 2    * Get the maximum locality level at which a given job is allowed to
 3    * launch tasks, based on how long it has been waiting for local tasks.
 4    * This is used to implement the "delay scheduling" feature of the Fair
 5    * Scheduler for optimizing data locality.
 6    * If the job has no locality information (e.g. it does not use HDFS), this 
 7    * method returns LocalityLevel.ANY, allowing tasks at any level.
 8    * Otherwise, the job can only launch tasks at its current locality level
 9    * or lower, unless it has waited at least nodeLocalityDelay or
10    * rackLocalityDelay milliseconds depends on the current level. If it
11    * has waited (nodeLocalityDelay + rackLocalityDelay) milliseconds,
12    * it can go to any level.
13    */
14   protected LocalityLevel getAllowedLocalityLevel(JobInProgress job,
15       long currentTime) {
16     JobInfo info = infos.get(job);
17     if (info == null) { // Job not in infos (shouldn‘t happen)
18       LOG.error("getAllowedLocalityLevel called on job " + job
19           + ", which does not have a JobInfo in infos");
20       return LocalityLevel.ANY;
21     }
22     if (job.nonLocalMaps.size() > 0) { // Job doesn‘t have locality information
23       return LocalityLevel.ANY;
24     }
25     // Don‘t wait for locality if the job‘s pool is starving for maps
26     Pool pool = poolMgr.getPool(job);
27     PoolSchedulable sched = pool.getMapSchedulable();
28     long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool.getName());
29     long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
30     if (currentTime - sched.getLastTimeAtMinShare() > minShareTimeout ||
31         currentTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
32       eventLog.log("INFO", "No delay scheduling for "
33           + job.getJobID() + " because it is being starved");
34       return LocalityLevel.ANY;
35     }
36     // In the common case, compute locality level based on time waited
37     switch(info.lastMapLocalityLevel) {
38     case NODE: // Last task launched was node-local
39       if (info.timeWaitedForLocalMap >=
40           nodeLocalityDelay + rackLocalityDelay)
41         return LocalityLevel.ANY;
42       else if (info.timeWaitedForLocalMap >= nodeLocalityDelay)
43         return LocalityLevel.RACK;
44       else
45         return LocalityLevel.NODE;
46     case RACK: // Last task launched was rack-local
47       if (info.timeWaitedForLocalMap >= rackLocalityDelay)
48         return LocalityLevel.ANY;
49       else
50         return LocalityLevel.RACK;
51     default: // Last task was non-local; can launch anywhere
52       return LocalityLevel.ANY;
53     }
54   }

1. 若lastMapLocalityLevel为Node:

  1)若timeWaitedForLocalMap >= nodeLocalityDelay + rackLocalityDelay,则可以调度off-switch及以下级别的MapTask;

  2)若timeWaitedForLocalMap >= nodeLocalityDelay,则可以调度rack-local及以下级别的MapTask;


2. 若lastMapLocalityLevel为Rack:

  1)若timeWaitedForLocalMap >= rackLocalityDelay,则调度off-switch及以下级别的MapTask;


3. 否则调度off-switch及以下级别的MapTask;



  1 @Override
  2   public synchronized List<Task> assignTasks(TaskTracker tracker)
  3       throws IOException {
  4     if (!initialized) // Don‘t try to assign tasks if we haven‘t yet started up
  5       return null;
  6     String trackerName = tracker.getTrackerName();
  7     eventLog.log("HEARTBEAT", trackerName);
  8     long currentTime = clock.getTime();
 10     // Compute total runnable maps and reduces, and currently running ones
 11     int runnableMaps = 0;
 12     int runningMaps = 0;
 13     int runnableReduces = 0;
 14     int runningReduces = 0;
 15     for (Pool pool: poolMgr.getPools()) {
 16       runnableMaps += pool.getMapSchedulable().getDemand();
 17       runningMaps += pool.getMapSchedulable().getRunningTasks();
 18       runnableReduces += pool.getReduceSchedulable().getDemand();
 19       runningReduces += pool.getReduceSchedulable().getRunningTasks();
 20     }
 22     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
 23     // Compute total map/reduce slots
 24     // In the future we can precompute this if the Scheduler becomes a 
 25     // listener of tracker join/leave events.
 26     int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
 27     int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
 29     eventLog.log("RUNNABLE_TASKS", 
 30         runnableMaps, runningMaps, runnableReduces, runningReduces);
 32     // Update time waited for local maps for jobs skipped on last heartbeat
 33     //备注一
 34     updateLocalityWaitTimes(currentTime);
 36     // Check for JT safe-mode
 37     if (taskTrackerManager.isInSafeMode()) {
 38       LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
 39       return null;
 40     } 
 42     TaskTrackerStatus tts = tracker.getStatus();
 44     int mapsAssigned = 0; // loop counter for map in the below while loop
 45     int reducesAssigned = 0; // loop counter for reduce in the below while
 46     int mapCapacity = maxTasksToAssign(TaskType.MAP, tts);
 47     int reduceCapacity = maxTasksToAssign(TaskType.REDUCE, tts);
 48     boolean mapRejected = false; // flag used for ending the loop
 49     boolean reduceRejected = false; // flag used for ending the loop
 51     // Keep track of which jobs were visited for map tasks and which had tasks
 52     // launched, so that we can later mark skipped jobs for delay scheduling
 53     Set<JobInProgress> visitedForMap = new HashSet<JobInProgress>();
 54     Set<JobInProgress> visitedForReduce = new HashSet<JobInProgress>();
 55     Set<JobInProgress> launchedMap = new HashSet<JobInProgress>();
 57     ArrayList<Task> tasks = new ArrayList<Task>();
 58     // Scan jobs to assign tasks until neither maps nor reduces can be assigned
 59     //备注二
 60     while (true) {
 61       // Computing the ending conditions for the loop
 62       // Reject a task type if one of the following condition happens
 63       // 1. number of assigned task reaches per heatbeat limit
 64       // 2. number of running tasks reaches runnable tasks
 65       // 3. task is rejected by the LoadManager.canAssign
 66       if (!mapRejected) {
 67         if (mapsAssigned == mapCapacity ||
 68             runningMaps == runnableMaps ||
 69             !loadMgr.canAssignMap(tts, runnableMaps,
 70                 totalMapSlots, mapsAssigned)) {
 71           eventLog.log("INFO", "Can‘t assign another MAP to " + trackerName);
 72           mapRejected = true;
 73         }
 74       }
 75       if (!reduceRejected) {
 76         if (reducesAssigned == reduceCapacity ||
 77             runningReduces == runnableReduces ||
 78             !loadMgr.canAssignReduce(tts, runnableReduces,
 79                 totalReduceSlots, reducesAssigned)) {
 80           eventLog.log("INFO", "Can‘t assign another REDUCE to " + trackerName);
 81           reduceRejected = true;
 82         }
 83       }
 84       // Exit while (true) loop if
 85       // 1. neither maps nor reduces can be assigned
 86       // 2. assignMultiple is off and we already assigned one task
 87       if (mapRejected && reduceRejected ||
 88           !assignMultiple && tasks.size() > 0) {
 89         break; // This is the only exit of the while (true) loop
 90       }
 92       // Determine which task type to assign this time
 93       // First try choosing a task type which is not rejected
 94       TaskType taskType;
 95       if (mapRejected) {
 96         taskType = TaskType.REDUCE;
 97       } else if (reduceRejected) {
 98         taskType = TaskType.MAP;
 99       } else {
100         // If both types are available, choose the task type with fewer running
101         // tasks on the task tracker to prevent that task type from starving
102         if (tts.countMapTasks() + mapsAssigned <=
103             tts.countReduceTasks() + reducesAssigned) {
104           taskType = TaskType.MAP;
105         } else {
106           taskType = TaskType.REDUCE;
107         }
108       }
110       // Get the map or reduce schedulables and sort them by fair sharing
111       List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
112       //对job进行排序
113       Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
114       boolean foundTask = false;
115       //备注三
116       for (Schedulable sched: scheds) { // This loop will assign only one task
117         eventLog.log("INFO", "Checking for " + taskType +
118             " task in " + sched.getName());
119         //备注四
120         Task task = taskType == TaskType.MAP ? 
121                     sched.assignTask(tts, currentTime, visitedForMap) : 
122                     sched.assignTask(tts, currentTime, visitedForReduce);
123         if (task != null) {
124           foundTask = true;
125           JobInProgress job = taskTrackerManager.getJob(task.getJobID());
126           eventLog.log("ASSIGN", trackerName, taskType,
127               job.getJobID(), task.getTaskID());
128           // Update running task counts, and the job‘s locality level
129           if (taskType == TaskType.MAP) {
130             launchedMap.add(job);
131             mapsAssigned++;
132             runningMaps++;
133             //备注五
134             updateLastMapLocalityLevel(job, task, tts);
135           } else {
136             reducesAssigned++;
137             runningReduces++;
138           }
139           // Add task to the list of assignments
140           tasks.add(task);
141           break; // This break makes this loop assign only one task
142         } // end if(task != null)
143       } // end for(Schedulable sched: scheds)
145       // Reject the task type if we cannot find a task
146       if (!foundTask) {
147         if (taskType == TaskType.MAP) {
148           mapRejected = true;
149         } else {
150           reduceRejected = true;
151         }
152       }
153     } // end while (true)
155     // Mark any jobs that were visited for map tasks but did not launch a task
156     // as skipped on this heartbeat
157     for (JobInProgress job: visitedForMap) {
158       if (!launchedMap.contains(job)) {
159         infos.get(job).skippedAtLastHeartbeat = true;
160       }
161     }
163     // If no tasks were found, return null
164     return tasks.isEmpty() ? null : tasks;
165   }

  备注一:updateLocalityWaitTimes()。首先更新自上次心跳以来,timeWaitedForLocalMap的时间,并将所有job 的skippedAtLastHeartbeat设为false;代码如下:

 1 /**
 2    * Update locality wait times for jobs that were skipped at last heartbeat.
 3    */
 4   private void updateLocalityWaitTimes(long currentTime) {
 5     long timeSinceLastHeartbeat = 
 6       (lastHeartbeatTime == 0 ? 0 : currentTime - lastHeartbeatTime);
 7     lastHeartbeatTime = currentTime;
 8     for (JobInfo info: infos.values()) {
 9       if (info.skippedAtLastHeartbeat) {
10         info.timeWaitedForLocalMap += timeSinceLastHeartbeat;
11         info.skippedAtLastHeartbeat = false;
12       }
13     }
14   }



 1 @Override
 2   public Task assignTask(TaskTrackerStatus tts, long currentTime,
 3       Collection<JobInProgress> visited) throws IOException {
 4     if (isRunnable()) {
 5       visited.add(job);
 6       TaskTrackerManager ttm = scheduler.taskTrackerManager;
 7       ClusterStatus clusterStatus = ttm.getClusterStatus();
 8       int numTaskTrackers = clusterStatus.getTaskTrackers();
10       // check with the load manager whether it is safe to 
11       // launch this task on this taskTracker.
12       LoadManager loadMgr = scheduler.getLoadManager();
13       if (!loadMgr.canLaunchTask(tts, job, taskType)) {
14         return null;
15       }
16       if (taskType == TaskType.MAP) {
17           //确定应该调度的级别
18         LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(
19             job, currentTime);
20         scheduler.getEventLog().log(
21             "ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel);
22         switch (localityLevel) {
23           case NODE:
24             return job.obtainNewNodeLocalMapTask(tts, numTaskTrackers,
25                 ttm.getNumberOfUniqueHosts());
26           case RACK:
27             return job.obtainNewNodeOrRackLocalMapTask(tts, numTaskTrackers,
28                 ttm.getNumberOfUniqueHosts());
29           default:
30             return job.obtainNewMapTask(tts, numTaskTrackers,
31                 ttm.getNumberOfUniqueHosts());
32         }
33       } else {
34         return job.obtainNewReduceTask(tts, numTaskTrackers,
35             ttm.getNumberOfUniqueHosts());
36       }
37     } else {
38       return null;
39     }
40   }



 1   /**
 2    * Update a job‘s locality level and locality wait variables given that that 
 3    * it has just launched a map task on a given task tracker.
 4    */
 5   private void updateLastMapLocalityLevel(JobInProgress job,
 6       Task mapTaskLaunched, TaskTrackerStatus tracker) {
 7     JobInfo info = infos.get(job);
 8     boolean isNodeGroupAware = conf.getBoolean(
 9         "net.topology.nodegroup.aware", false);
10     LocalityLevel localityLevel = LocalityLevel.fromTask(
11         job, mapTaskLaunched, tracker, isNodeGroupAware);
12     info.lastMapLocalityLevel = localityLevel;
13     info.timeWaitedForLocalMap = 0;
14     eventLog.log("ASSIGNED_LOC_LEVEL", job.getJobID(), localityLevel);
15   }



  参考文章: 《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成



