码迷,mamicode.com
首页 > 其他好文 > 详细

Spark系列(七)Master中的资源调度

时间:2016-04-07 06:56:20      阅读:295      评论:0      收藏:0      [点我收藏+]

标签:

资源调度

技术分享

说明:

Application的调度算法有两种,分别为spreadOutApps和非spreadOutApps

spreadOutApps

  • 在spark-submit脚本中,可以指定要多少个executor,executor需要多少个cpu及多少内存,基于该机制,最后executor的实际数量,以及每个executor的cpu可能与配置是不一样的。
  • 因为spreadOutApps调度算法的总是基于总CPU总和来分配,比如要求3个executor每个要3个CPU,如果有9个worker每个有1个CPU,因为总共要分配9个core,所以每个worker分配一个core然后每个worker启动一个executor
  • 最后启动9个executor每个executor1个cput core

非spreadOutApps

  • 每个application都尽可能分配到尽量少的worker上,比如总共有10个worker,每个有10个core app总共要分配20个core,那么其实只会分配到两个worker上,每个worker都占满10个core.

 

Schdule方法源码分析

/**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule() {
    // 判断master状态,不为ALIVE时直接返回
    if (state != RecoveryState.ALIVE) { return }
 
    // First schedule drivers, they take strict precedence over applications
10      // Randomization helps balance drivers
11      // 获取状态为ALIVE的worker,并且随机打乱
12      val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
13      // 可用worker数量
14      val numWorkersAlive = shuffledAliveWorkers.size
15      var curPos = 0
16   
17      // diriver调度过程(yarn-client模式下)
18      for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
19        // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
20        // start from the last worker that was assigned a driver, and continue onwards until we have
21        // explored all alive workers.
22        var launched = false
23        var numWorkersVisited = 0
24        // 判读还有可用的worker且Driver还未启动
25        while (numWorkersVisited < numWorkersAlive && !launched) {
26          val worker = shuffledAliveWorkers(curPos)
27          numWorkersVisited += 1
28          // 判断当前worker空闲内存是否大于等于driver需要的内存,且Worker空闲的core数量大于等于dirver需要的core的数量
29          if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
30            // 启动driver
31            launchDriver(worker, driver)
32            waitingDrivers -= driver
33            launched = true
34          }
35          curPos = (curPos + 1) % numWorkersAlive
36        }
37      }
38   
39      // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
40      // in the queue, then the second app, etc.
41      // spreadOutApps调度方式
42      if (spreadOutApps) {
43        // Try to spread out each app among all the nodes, until it has all its cores
44        // 遍历需要调度的app(Application),且该app中的core还需要调度
45        for (app <- waitingApps if app.coresLeft > 0) {
46          val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
47            .filter(canUse(app, _)).sortBy(_.coresFree).reverse
48          // 可用worker的数量
49          val numUsable = usableWorkers.length
50          // 存放app 需要分配core的结果
51          val assigned = new Array[Int](numUsable) // Number of cores to give on each node
52          // 获取Application剩余需要分配的cpu数量与worker总共可用cpu数量中的最小值
53          var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
54          var pos = 0
55          while (toAssign > 0) {
56            // 如果worker空闲的cpu数量大于已经分配出去的cpu数量,那么woker还可继续分配cpu
57            if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
58              // 还需分配core的总数量减1
59              toAssign -= 1
60              // 在已分配app core结果集中加1
61              assigned(pos) += 1
62            }
63            pos = (pos + 1) % numUsable
64          }
65          // Now that we‘ve decided how many cores to give on each node, let‘s actually give them
66          for (pos <- 0 until numUsable) {
67            if (assigned(pos) > 0) {
68              // 根据WorkerInfo和所需的core构建ExecutorDesc
69              val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
70              // 启动Executor
71              launchExecutor(usableWorkers(pos), exec)
72              app.state = ApplicationState.RUNNING
73            }
74          }
75        }
76      } 
77      // 非spreadOutApps调度方式
78      else {
79        // Pack each app into as few nodes as possible until we‘ve assigned all its cores
80        // 过滤出可用的worker
81        for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
82          // 获取需要分配core的app
83          for (app <- waitingApps if app.coresLeft > 0) {
84            // 判读app是否可以使用该worker
85            if (canUse(app, worker)) {
86              // 取worker空闲core与app需分配core中的最小值
87              val coresToUse = math.min(worker.coresFree, app.coresLeft)
88              if (coresToUse > 0) {
89                // 根据WorkerInfo和所需的core构建ExecutorDesc
90                val exec = app.addExecutor(worker, coresToUse)
91                // 启动Executor
92                launchExecutor(worker, exec)
93                app.state = ApplicationState.RUNNING
94              }
95            }
96          }
97        }
98      }
99    }

Spark系列(七)Master中的资源调度

标签:

原文地址:http://www.cnblogs.com/jianyuan/p/Spark系列之Master资源调度.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!