在Worker Actor中,每次LaunchExecutor会创建一个CoarseGrainedExecutorBackend进程,Executor和CoarseGrainedExecutorBackend是1对1的关系。也就是说集群里启动多少Executor实例就有多少CoarseGrainedExecutorBackend进程。
那么到底是如何分配Executor的呢?怎么控制调节Executor的个数呢?
下面主要介绍一下Spark Executor分配策略:
我们仅看,当Application提交注册到Master后,Master会返回RegisteredApplication,之后便会调用schedule()这个方法,来分配Driver的资源,和启动Executor的资源。
schedule()方法是来调度当前可用资源的调度方法,它管理还在排队等待的Apps资源的分配,这个方法是每次在集群资源发生变动的时候都会调用,根据当前集群最新的资源来进行Apps的资源分配。
    // First schedule drivers, they take strict precedence over applications
    val shuffledWorkers = Random.shuffle(workers) // 把当前workers这个HashSet的顺序随机打乱
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { //遍历活着的workers
      for (driver <- waitingDrivers) { //在等待队列中的Driver们会进行资源分配
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { //当前的worker内存和cpu均大于当前driver请求的mem和cpu,则启动
          launchDriver(worker, driver) //启动Driver 内部实现是发送启动Driver命令给指定Worker,Worker来启动Driver。
          waitingDrivers -= driver //把启动过的Driver从队列移除
        }
      }
    }
val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)  /**
   * Can an app use the given worker? True if the worker has enough memory and we haven't already
   * launched an executor for the app on it (right now the standalone backend doesn't like having
   * two executors on the same worker).
   */
  def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
    worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
  }
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    if (spreadOutApps) {
      // Try to spread out each app among all the nodes, until it has all its cores
      for (app <- waitingApps if app.coresLeft > 0) { //对还未被完全分配资源的apps处理
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse //根据core Free对可用Worker进行降序排序。
        val numUsable = usableWorkers.length //可用worker的个数 eg:可用5个worker
        val assigned = new Array[Int](numUsable) //候选Worker,每个Worker一个下标,是一个数组,初始化默认都是0
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//还要分配的cores = 集群中可用Worker的可用cores总和(10), 当前未分配core(5)中找最小的
        var pos = 0
        while (toAssign > 0) { 
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在所有可用Worker里判断当前worker空闲cpu是否大于当前数组已经分配core值
            toAssign -= 1
            assigned(pos) += 1 //当前下标pos的Worker分配1个core +1
          }
          pos = (pos + 1) % numUsable //round-robin轮询寻找有资源的Worker
        }
        // Now that we've decided how many cores to give on each node, let's actually give them
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) { //如果assigned数组中的值>0,将启动一个executor在,指定下标的机器上。
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app里的Executor信息
            launchExecutor(usableWorkers(pos), exec)  //通知可用Worker去启动Executor
            app.state = ApplicationState.RUNNING
          }
        }
      }
    } else {
 ,跑个题,有些困了。。
,跑个题,有些困了。。 } else {
      // Pack each app into as few nodes as possible until we've assigned all its cores
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        for (app <- waitingApps if app.coresLeft > 0) {
          if (canUse(app, worker)) { //直接问当前worker是有空闲的core
            val coresToUse = math.min(worker.coresFree, app.coresLeft) //有则取,不管多少
            if (coresToUse > 0) { //有
              val exec = app.addExecutor(worker, coresToUse) //直接启动
              launchExecutor(worker, exec)
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
    }
  }2、针对同一个App,每个Worker里只能有一个针对该App的Executor存在,切记。如果想让整个App的Executor变多,设置SPARK_WORKER_INSTANCES,让Worker变多。
3、Executor的资源分配有2种策略:
3.1、SpreadOut :一种轮询集群各个Worker,为Executor比较平均的分配Worker资源,来启动创建Executor的策略,好处是负载均衡,坏处是会造成启动等待。
3.2、非SpreadOut:会尽可能的根据每个Worker的剩余资源来启动Executor,这样启动的Executor的core是不均匀的。好处是加快了App的Executor启动,坏处,每个Executor的并行度和负载均衡就不能够保证了。
行文仓促,如有不正之处,请指出,欢迎讨论 :)
——EOF——
原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/38763985
原文地址:http://blog.csdn.net/oopsoom/article/details/38763985