标签:目的 connect 内存 除了 out 时间 scheduler 检查 array
Spark start-all>>
"""Master启动流程"""
Master类
class Master(
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf) extends Actor with ActorLogReceive with Logging with LeaderElectable
Master端
def main(){
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
actorSystem.awaitTermination()
}
Master端
def startSystemAndActor(System, Int, Int, Option[Int]) = {
//调用AkkaUtils创建ActorSystem
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
securityManager = securityMgr)
//创建属于Master的actor, 在创建actor的同时, 会使用classOf[Master]初始化Master
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
}
Master端
"""初始化Master时由于Master继承了 trait Actor 重写了preStart方法,
Actor的初始化会启动preStart方法 因此找到Master的 override def preStart()
preStart属于生命周期方法, 在构造器之后, receiver之前"""
override def preStart() {
// 启动一个定时器, 定时检查超时的Worker, WORKER_TIMEOUT:每六十秒检查一次,
// self:先对着自己来一下(检查)试试
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
// 调用 timeOutDeadWorkers() 方法,
override def receiveWithLogging = {
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
}
// 用来检查并移除所有超时的workers
def timeOutDeadWorkers(){
// 事实上是移除了一个存有WorkInfo的HashSet[WrokInfo]中的对象
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
removeWorker(worker)
}
}
}
def removeWorker(worker: WorkerInfo){
// 删除内存里的workInfo
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
}
}
"""之后执行receive方法(1.3版本), 在后来的1.6版本中叫 def receive: PartialFunction[Any, Unit]"""
Master端
override def receiveWithLogging () {}
会不断的接收actor发送过来的请求
"""Worker启动流程"""
Worker类
class Worker(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterAkkaUrls: Array[String],
actorSystemName: String,
actorName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends Actor
def preStart() => {
registerWithMaster()
}
// 向Master注册的方法
def registerWithMaster() {
// 向所有的Master注册Worker
tryRegisterAllMasters()
// 其中内容
def tryRegisterAllMasters()=>{
// 通过Master的Url获取Master的actor
val actor = context.actorSelection(masterAkkaUrl)
// 向Master发送注册信息
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}
Master端
// 接收Worker发送的注册信息
override def receiveWithLogging = {
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>{
// 判断是否是StandBy状态, doNothing
idToWorker.contains(id), 已经注册过, doNothing
正常情况下(Active状态, 且没有注册过):{
// 把发送来的 WorkerInfo 添加到 Master的 WorkerInfo中
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress)
}
// 如果将Worker Info存入内存成功, 则调用持久化引擎, 将信息存入磁盘中,
// 目的是防止数据丢失. 如果Master宕机, 内存中会丢失数据,
// 切换状态(Standby和Active)后, 需要切换的节点拿不到WorkerInfo, Worker会再次注册, 非常消耗资源, 存在磁盘则可以直接去磁盘拿取数据不需要重新注册
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
}
// 向worker响应注册成功信息
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
// 开始调度资源, 调度资源不仅仅是集群启动的时候调动资源, 运行Job的时候也会调度资源, 其有两种方式 一种是尽量分散, 一种是尽量集中
schedule()
}
}
Worker端
// 接收注册成功的信息, 其实是将 Active Master 的Url和rWebUiUrl传回并更新, 之后向他发送心跳~
def receiveWithLogging() = {
case RegisteredWorker(masterUrl, masterWebUiUrl) =>{
//更新MasterUrl
changeMaster(masterUrl, masterWebUiUrl)
//向Master发送心跳信息, HEARTBEAT_MILLIS =15秒, 每十五秒发送一次心跳信息, 发送逻辑为 SendHeartbeat
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
}
//向Master发送心跳信息, 实际上是将自己的WorkerId发送给Master
case SendHeartbeat =>
if (connected) { master ! Heartbeat(workerId) }
}
Master端
def receiveWithLogging() = {
case Heartbeat(workerId) => {
//正常情况下, 更新上次心跳时间
workerInfo.lastHeartbeat = System.currentTimeMillis()
//启动完成
}
}
Spark-源码-Spark-StartAll Master Worler启动流程
标签:目的 connect 内存 除了 out 时间 scheduler 检查 array
原文地址:https://www.cnblogs.com/chinashenkai/p/9977672.html