码迷,mamicode.com
首页 > 其他好文 > 详细

文章标题

时间:2016-05-05 02:05:53      阅读:205      评论:0      收藏:0      [点我收藏+]

标签:

Spark 定制版~Spark Streaming(一)

本讲内容:

a. Spark Streaming在线另类实验
b. 瞬间理解Spark Streaming的本质

源码定制为什么从Spark Streaming切入?

a. Spark 最初只有Spark Core,通过逐步的发展,扩展出了Spark SQL、Spark Streaming、Spark MLlib(machine learning)、GraphX(graph)、Spark R等。 而Spark Streaming本是Spark Core上的一个子框架,如果我们试着去精通这个子框架,不仅仅能写出非常复杂的应用程序,还能够很好的驾驭Spark,进而研究并达到精通Spark的地步,及其寻找到Spark问题的解决之道。
b. 由于Spark SQL涉及到很多SQL语法解析和优化的细节,对于我们集中精力研究Spark有所干扰。Spark R还不是很成熟,支持功能有限。GraphX最近几个版本基本没有改进,里面有许多数学算法。MLlib也涉及到相当多的数学知识。
c. Spark Streaming的优势是在于可以结合SparkSQL、图计算、机器学习,使其功能更加强大。同时,在Spark中Spark Streaming也是最容易出现问题的,因为它是不断的运行,内部比较复杂。掌握好Spark Streaming,可以去窥视Spark的一切!

对Spark Streaming的理解。

a. Spark Streaming是流式计算。
b. Spark Streaming可以在线的直接使用机器学习、图计算、SparkSQL、Spark R。
c. 整个Spark的程序中,Spark Streaming内部复杂、难易掌控。
d. Spark Streaming很像基于Spark Core之上的应用程序。
e. 如果说Spark是DT的龙脉,那么Spark Streaming就是龙脉中的穴位。

实验源码

技术分享
技术分享
技术分享

案例过程及其分析

a、打包程序,上传到集群上

技术分享
技术分享
技术分享
技术分享
技术分享

b、 集群中需要先执行nc,启动 9999端口

nc -lk 9999

技术分享

c、执行shell代码

sh内容
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit –class com.dt.spark.sparkstreaming.OnlineBlackListFilter –master spark://Master:7077 /root/Documents/SparkApps/WordCount.jar

技术分享

我们运行完程序,看到过滤结果以后,停止程序,打开HistoryServer http://master:18080/

技术分享

点击App ID进去,打开,会看到如下图所示的4个Job,从实际执行的Job是1个Job,但是图中显示有4个Job,从这里可以看出Spark Streaming运行的时候自己会启动一些Job。

技术分享

先看看job id 为0 的详细信息

技术分享

很明显是我们定义的blackListRDD数据的生成。对应的代码为
val blackList = Array((“Hadoop”, true), (“Mathou”, true))
//把Array变成RDD
val blackListRDD = ssc.sparkContext.parallelize(blackList)
并且它做了reduceBykey的操作(代码中并没有此步操作,SparkStreaming框架自行生成的)。
这里有两个Stage,Stage 0和Stage 1 。

Job 1的详细信息

技术分享

一个makeRDD,这个RDD是receiver不断的接收数据流中的数据,在时间间隔达到batchInterval后,将所有数据变成一个RDD。并且它的耗时也是最长的,59s 。

特别说明:此处可以看出,receiver也是一个独立的job。由此我们可以得出一个结论:我们在应用程序中,可以启动多个job,并且不用的job之间可以相互配合,这就为我们编写复杂的应用程序打下了基础。
我们点击上面的start at OnlineBlackListFilter.scala:64查看详细信息

技术分享

根据上图的信息,只有一个Executor在接收数据,最最重要的是红色框中的数据本地性为PROCESS_LOCAL,由此可以知道receiver接收到数据后会保存到内存中,只要内存充足是不会写到内存中的。
即便在创建receiver时,指定的存储默认策略为MEMORY_AND_DISK_SER_2
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope(“socket text stream”) {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

job 2的详细信息

技术分享
技术分享

Job 2 将前两个job生成的RDD进行leftOuterJoin操作。
从Stage Id的编号就可以看出,它是依赖于上两个Job的。
Receiver接收数据时是在spark-master节点上,但是Job 2在处理数据时,数据已经到了spark-worker1上了(因为我的环境只有两个worker,数据并没有分散到所有worker节点,worker节点如果多一点,情况可能不一样,每个节点都会处理数据)
点击上面的Stage Id 3查看详细信息:
技术分享
Executor上运行,并且有5个Task 。

Job 3的详细信息

技术分享
技术分享

总结:我们可以看出,一个batchInterval并不是仅仅触发一个Job。

根据上面的描述,我们更细致的了解了DStream和RDD的关系了。DStream就是一个个batchInterval时间内的RDD组成的。只不过DStream带上了时间维度,是一个无边界的集合。

技术分享

对DStream的操作会构建成DStream Graph

技术分享

在每到batchInterval时间间隔后,Job被触发,DStream Graph将会被转换成RDD Graph

技术分享

备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、Spark大神级专家:王家林
3、新浪微博: http://www.weibo.com/ilovepains

文章标题

标签:

原文地址:http://blog.csdn.net/zisheng_wang_data/article/details/51319176

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!