码迷,mamicode.com
首页 > 其他好文 > 详细

executor源码分析

时间:2018-08-26 20:54:22      阅读:209      评论:0      收藏:0      [点我收藏+]

标签:没有   ===   park   exp   got   isl   start   bst   win   


//worker中为Application启动的executor,实际上是启动了这个CoarseGrainedExecutorBackend进程
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
extends Actor with ActorLogReceive with ExecutorBackend with Logging {

Utils.checkHostPort(hostPort, "Expected hostport")

var executor: Executor = null
var driver: ActorSelection = null

//因为继承Actor,所以启动时,首先启动初始化方法
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)

//在初始化方法中,获取driver的actor
driver = context.actorSelection(driverUrl)

//向driver发送RegisterExecutor消息
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}

def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
}

override def receiveWithLogging = {

//向driver注册成功之后,会返回RegisteredExecutor消息
//接收到RegisteredExecutor之后,会创建Executor对象,作为执行句柄
//其实大部分功能,都是通过Executor实现的
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)

//启动Task
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {

//反序列化task
val ser = env.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)

//用内部的执行句柄,Executor,的launchTask()方法,来启动一个task
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}

case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
System.exit(1)
} else {
executor.killTask(taskId, interruptThread)
}

case x: DisassociatedEvent =>
if (x.remoteAddress == driver.anchorPath.address) {
logError(s"Driver $x disassociated! Shutting down.")
System.exit(1)
} else {
logWarning(s"Received irrelevant DisassociatedEvent $x")
}

case StopExecutor =>
logInfo("Driver commanded a shutdown")
executor.stop()
context.stop(self)
context.system.shutdown()
}

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}
}
===============================================================================
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer) {

//对于每一个task,都会创建一个TaskRunner
//TaskRunner继承的是Java的多线程的Run那不了接口
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)

//将TaskRunner放入内存缓存
runningTasks.put(taskId, tr)

//threadPool,Executor内部的Java线程池。
//将task封装在一个线程内(TaskRunner),直接将线程丢入线程池,进行执行。
//线程池是自动实现排队机制的,如果线程池内的线程暂时没有空闲,那丢进来的线程是要排队的
threadPool.execute(tr)
}

executor源码分析

标签:没有   ===   park   exp   got   isl   start   bst   win   

原文地址:https://www.cnblogs.com/jackshen00/p/9538462.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!