标签:
Spark 定制版~Spark Streaming(一)
本讲内容:
a. Spark Streaming在线另类实验
b. 瞬间理解Spark Streaming的本质
源码定制为什么从Spark Streaming切入?
a. Spark 最初只有Spark Core,通过逐步的发展,扩展出了Spark SQL、Spark Streaming、Spark MLlib(machine learning)、GraphX(graph)、Spark R等。 而Spark Streaming本是Spark Core上的一个子框架,如果我们试着去精通这个子框架,不仅仅能写出非常复杂的应用程序,还能够很好的驾驭Spark,进而研究并达到精通Spark的地步,及其寻找到Spark问题的解决之道。
b. 由于Spark SQL涉及到很多SQL语法解析和优化的细节,对于我们集中精力研究Spark有所干扰。Spark R还不是很成熟,支持功能有限。GraphX最近几个版本基本没有改进,里面有许多数学算法。MLlib也涉及到相当多的数学知识。
c. Spark Streaming的优势是在于可以结合SparkSQL、图计算、机器学习,使其功能更加强大。同时,在Spark中Spark Streaming也是最容易出现问题的,因为它是不断的运行,内部比较复杂。掌握好Spark Streaming,可以去窥视Spark的一切!
对Spark Streaming的理解。
a. Spark Streaming是流式计算。
b. Spark Streaming可以在线的直接使用机器学习、图计算、SparkSQL、Spark R。
c. 整个Spark的程序中,Spark Streaming内部复杂、难易掌控。
d. Spark Streaming很像基于Spark Core之上的应用程序。
e. 如果说Spark是DT的龙脉,那么Spark Streaming就是龙脉中的穴位。
实验源码
案例过程及其分析
a、打包程序,上传到集群上
b、 集群中需要先执行nc,启动 9999端口
nc -lk 9999
c、执行shell代码
sh内容
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit –class com.dt.spark.sparkstreaming.OnlineBlackListFilter –master spark://Master:7077 /root/Documents/SparkApps/WordCount.jar
我们运行完程序,看到过滤结果以后,停止程序,打开HistoryServer http://master:18080/
点击App ID进去,打开,会看到如下图所示的4个Job,从实际执行的Job是1个Job,但是图中显示有4个Job,从这里可以看出Spark Streaming运行的时候自己会启动一些Job。
先看看job id 为0 的详细信息
很明显是我们定义的blackListRDD数据的生成。对应的代码为
val blackList = Array((“Hadoop”, true), (“Mathou”, true))
//把Array变成RDD
val blackListRDD = ssc.sparkContext.parallelize(blackList)
并且它做了reduceBykey的操作(代码中并没有此步操作,SparkStreaming框架自行生成的)。
这里有两个Stage,Stage 0和Stage 1 。
Job 1的详细信息
一个makeRDD,这个RDD是receiver不断的接收数据流中的数据,在时间间隔达到batchInterval后,将所有数据变成一个RDD。并且它的耗时也是最长的,59s 。
特别说明:此处可以看出,receiver也是一个独立的job。由此我们可以得出一个结论:我们在应用程序中,可以启动多个job,并且不用的job之间可以相互配合,这就为我们编写复杂的应用程序打下了基础。
我们点击上面的start at OnlineBlackListFilter.scala:64查看详细信息
根据上图的信息,只有一个Executor在接收数据,最最重要的是红色框中的数据本地性为PROCESS_LOCAL,由此可以知道receiver接收到数据后会保存到内存中,只要内存充足是不会写到内存中的。
即便在创建receiver时,指定的存储默认策略为MEMORY_AND_DISK_SER_2
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope(“socket text stream”) {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
job 2的详细信息
Job 2 将前两个job生成的RDD进行leftOuterJoin操作。
从Stage Id的编号就可以看出,它是依赖于上两个Job的。
Receiver接收数据时是在spark-master节点上,但是Job 2在处理数据时,数据已经到了spark-worker1上了(因为我的环境只有两个worker,数据并没有分散到所有worker节点,worker节点如果多一点,情况可能不一样,每个节点都会处理数据)
点击上面的Stage Id 3查看详细信息:
Executor上运行,并且有5个Task 。
Job 3的详细信息
总结:我们可以看出,一个batchInterval并不是仅仅触发一个Job。
根据上面的描述,我们更细致的了解了DStream和RDD的关系了。DStream就是一个个batchInterval时间内的RDD组成的。只不过DStream带上了时间维度,是一个无边界的集合。
对DStream的操作会构建成DStream Graph
在每到batchInterval时间间隔后,Job被触发,DStream Graph将会被转换成RDD Graph
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、Spark大神级专家:王家林
3、新浪微博: http://www.weibo.com/ilovepains
标签:
原文地址:http://blog.csdn.net/zisheng_wang_data/article/details/51319176