标签:摘要 table 监听 tps collect 术语 任务 ati doc
摘要:
1.基本术语
2.运行架构
2.1基本架构
2.2运行流程
2.3相关的UML类图
2.4调度模块:
2.4.1作业调度简介
2.4.2任务调度简介
3.运行模式
3.1 standalone模式
4.RDD实战
总结:
2.运行架构
2.1基本架构:
图示:
Spark Application在集群中以一组独立的进程运行,通过你的驱动程序(driver program)中的SparkContext 对象进行协作。
具体来说,SparkContext可以连接到多种类型的集群管理器 cluster managers (standalone cluster manager, Mesos ,YARN),这些 cluster managers 负责跨应用程序分配资源。一旦连接,Spark就获得集群中的节点上的executors,接下来,它会将应用程序代码发送到executors。最后,SparkContext发送tasks到executors运行。
注意:该驱动程序会一直监听并接受其executor传入的连接(spark.driver.port在网络配置部分)。这样,driver program必须可以寻找到工作节点的网络地址。数据不能跨应用程序(SparkContext)访问,除非写入外部系统
2.1.1 SparkContext类(代表连接到spark集群,现在一个jvm只能有一个sc,以后会取消):
几个重要的属性(包含DAGScheduler,TaskScheduler调度,获取executor,心跳与监听等):
说明:这里的下划线_代表默认值,比如Int 默认值就是0,String默认值就是None 参考知乎
2.1.2 Executor(一个运行任务的线程池,通过RPC与Driver通信):
心跳报告(心跳进程,记录心跳失败次数和接受task的心跳):
这里有两个参数:spark.executor.heartbeat.maxFailures = 60,spark.executor.heartbeatInterval = 10s,意味着最多每隔10min会重新发送一次心跳
Task管理(taskRunner类的启动,停止)
1
2
|
// Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] |
下面是TaskRunner 的run方法,贴出来,以后研究
2.2运行流程:
图示:
注意这里的StandaloneExecutorBackend是一个概念(我在spark项目中没找到),实际上的spark standalone的资源调度类是 CoarseGrainedExecutorBackend
1.构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(ClusterManager)(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
2.资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
3.SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。
Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。
4.Task在Executor上运行,运行完毕释放所有资源。
2.3相关的类:
ExecutorBackend:
特质签名(Executor用来向集群调度发送更新的插件)
各种运行模式的类图:
其中standalone是用SparkDeploySchedulerBackend配合TeskSchedulerImpl工作,相关类图应该是:
SchedulerBackend特质(核心函数:reviveOffers())
CoarseGrainedExecutorBackend(receive方法里是若干模式匹配,类似于switch case,根据相关模式执行相应操作。主要有注册Executor,运行Task等)
最后一个类SparkDeploySchedulerBackend(start):
2.4调度模块:
2.4.1作业调度简介
DAGScheduler: 根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),并提交Stage给TASkScheduler。 其划分Stage的依据是RDD之间的依赖的关系找出开销最小的调度方法,如下图
注:从最后一个Stage开始倒推,如果有依赖关系 就先解决父节点,如果没有依赖关系 就直接运行;这里做了一个简单的实验:Spark DAGSheduler生成Stage过程分析实验
2.4.2 任务调度简介:
TaskSchedulter: 将TaskSet提交给worker运行,每个Executor运行什么Task就是在此处分配的. TaskScheduler维护所有TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。
另外TaskScheduler还维护着所有Task的运行标签,重试失败的Task。下图展示了TaskScheduler的作用
在不同运行模式中任务调度器具体为:
3.运行模式
3.1 standalone模式
4 RDD实战
1
2
3
4
5
|
sc.makeRDD(Seq( "arachis" , "tony" , "lily" , "tom" )).map{ name = > (name.charAt( 0 ),name) }.groupByKey().mapValues{ names = > names.toSet.size //unique and count }.collect() |
提交Job collect
划分Stage
提交Stage , 开始Task 运行调度
Stage0的DAG图,makeRDD => map ; 相应生成了两个RDD:ParallelCollectionRDD,MapPartitionsRDD
Stage1 的DAG图,groupByKey => mapValues; 相应生成两个RDD:ShuffledRDD, MapPartitionsRDD
链接:
Spark官网:http://spark.apache.org/docs/latest/cluster-overview.html
http://www.cnblogs.com/tgzhu/p/5818374.html
标签:摘要 table 监听 tps collect 术语 任务 ati doc
原文地址:http://www.cnblogs.com/yechanglv/p/6938019.html