标签:task 过程 情况下 词频统计 reduce 高效 info python语言 工作原理
大纲:
Spark Streaming简介
Spark Streaming的原理和架构
Spark Streaming之基础抽象DStream
DStream相关操作
Spark Streaming与flume整合
Spark Streaming与kafka整合
Spark Streaming可以很容易的构建高吞吐量和容错能力强的流式应用.Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据源有很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象操作如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。
Spark Streaming是核心Spark Core API的扩展,可以实现实时数据流的可扩展,高吞吐,容错流处理.数据可以来自很多数据源(例如kafka,Flume或者TCP套接字)中获取,并且可以使用高级函数表示的复杂算法进行处理.最后,处理后的数据可以推送到文件系统,数据库和实时仪表板.
可以像编写离线批处理一样去编写流式程序,支持java/scala/python语言。
Spark Streaming在没有额外代码和配置的情况下可以恢复丢失的工作。
流式处理与批处理和交互式查询相结合。
Spark Streaming | Storm |
---|---|
开发语言:Scala/Java/Python | 开发语言:Clojure |
编程模型:DStream | 编程模型:Spout/Bolt |
Spark Streaming是基于spark的流式批处理引擎,其基本原理是把输入数据以某一段时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流.也就是说,在内部,它的工作原理如下图所示,Spark Streaming接收实时输入数据流并将数据分成批处理,然后由Spark引擎处理,以批量生成最终结果流
Spark Streaming是将流式计算分解成一系列短小的批处理作业.这里的批处理引擎是Spark Core.也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的transformation操作变为针对Spark中对RDD的transformation操作,将RDD经过操作变成中间结果保存在内存中.整个流式计算根据业务的需求可以对中间的结果进行缓存或者存储到外部设备.下图显示了Spark Streaming的整个流程。
对于流式计算来说,容错性至关重要。首先我们要明确一下Spark中RDD的容错机制。每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。对于Spark Streaming来说,其RDD的传承关系如下图所示:
图中的每一个椭圆形表示一个RDD,椭圆形中的每个圆形代表一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD。我们可以看到图中的每一个RDD都是通过lineage相连接的,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每一个数据流拷贝两份到其他的机器)都能保证容错性,所以RDD中任意的Partition出错,都可以并行地在其他机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。
对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
应用于DStream的任何操作都转换为底层RDD上的操作,例如,在之前将lines DStream转换为words DStream,flatMap操作应用于lines DStream中的每一个RDD以生成words DStream的words RDD
这些底层RDD转换是由Spark引擎计算的,DStream操作隐藏了大部分细节.
DStream上的操作与RDD的类似,分为Transformations(转换)和OutputOperations(输出)两种,此外转换操作中还有一些比较特殊的操作,如"updateStateByKey(),transform()以及各种Window相关的操作.
Transformation | Meaning |
---|---|
map(func) | 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream |
flatMap(func) | 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项 |
filter(func) | 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream |
repartition(numPartitions) | 增加或减少DStream中的分区数,从而改变DStream的并行度 |
union(otherStream) | 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream. |
count() | 通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream |
reduce(func) | 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream. |
countByValue() | 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数 |
reduceByKey(func[numTasks]) | 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream |
join(otherStream,[numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W))类型的DStream |
cogroup(otherStream,[numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,Seq[V], Seq[W]) 元组类型的DStream |
transform(func) | 通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD |
updateStateByKey(func) | 根据key的之前状态值和key的新值,对key进行更新,返回一个新状态的DStream |
UpdateStateByKey用于记录历史记录,保存上次的状态.如果使用UpdateStateByKey,必须执行两个步骤
定义状态,状态可以是任意数据类型.
定义状态更新函数.使用函数指定如何使用先前状态和输入流中的新值更新状态.
滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后,就可以让窗口按照指定时间间隔在源DStream上滑动,每次窗口停放的位置上,都会有一部分DStream被框入窗口内,形成一个小段的DStream,这时,就可以启动对这个小段DStream的计算。
红色的矩形就是一个窗口,窗口框住的是一段时间内的数据流。
这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。
所以基于窗口的操作,需要指定2个参数:
window length - The duration of the window (3 in the figure).
窗口长度,一段时间内数据的容器.
sliding interval - The interval at which the window operation is performed (2 in the figure).
滑动间隔,每隔多久计算一次.
Output Operations可以将DStream的数据输出到外部的数据库或文件系统,当某个Output Operations被调用时(与RDD的Action相同),Spark Streaming程序才会开始真正的计算过程。
Output Operation | Meaning |
---|---|
print() | 打印到控制台 |
saveAsTextFiles(prefix,[suffix]) | 保存流的内容为文本文件,文件名为"prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix,[suffix]) | 保存流的内容为SequenceFile,文件名为"prefix-TIME_IN_MS[.suffix]". |
saveAsHadoopFiles(prefix,[suffix]) | 保存流的内容为hadoop文件,文件名为"prefix-TIME_IN_MS[.suffix]". |
foreachRDD(func) | 对Dstream里面的每个RDD执行func |
首先在Linux服务器用YUM安装nc工具,nc命令式netcat命令的简称,它式用来设置路由器,我们可以利用它向某个端口发送数据.执行如下命令:
nc -lk 9999
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
/**
* @author dw
* @date 2018-08-24 16:39:39
* @description dw
* 使用Spark Streaming读取Socket数据并实现词频统计
*/
public class SparkStreamingFromSocketWithJava {
public static void main(String[] args) throws InterruptedException {
//1:创建一个SparkConf对象
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("SparkStreamingFromSocketWithJava");
//2:根据SparkConf对象创建JavaStreamingContext对象,并指定每个批次处理的时间间隔
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.sparkContext().setLogLevel("WARN");
//3:注册一个监听的ip地址和端口,用来收集数据
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("spark-node01.itheima.com", 9999);
//4:切分每一行记录
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
//5:每个单词记为1
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
//6:对所有的单词进行聚合输出
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((a, b) -> a + b);
//7:打印数据
wordCounts.print();
//8:开启流式计算
jssc.start();
//Wait for the computation to terminate 等待计算终止
jssc.awaitTermination();
}
}
?
注意:由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序要指定并行度,如在本地运行设置setMaster("local[2]"),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1。
object SparkStreamingFromSocketWithScala {
def main(args: Array[String]): Unit = {
//1:创建SparkConf对象,并指定主机名称设置AppName
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
//2:根据SparkConf对象创建StreamingContext对象
val ssc = new StreamingContext(conf, Seconds(1))
ssc.sparkContext.setLogLevel("WARN")
//3:读取socket数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("spark-node01.itheima.com",9999)
//4:flatMap
val words: DStream[String] = lines.flatMap(line=>line.split(" "))
//5:map操作
val pairs: DStream[(String, Int)] = words.map(word=>(word,1))
//6:对pairs进行聚合统计
val wordCounts: DStream[(String, Int)] = pairs.reduceByKey((a, b)=>a+b)
//打印结果
wordCounts.print()
//开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
在上面的那个案例中存在这样一个问题:每个批次的单词次数都被正确的统计出来,但是结果不能累加!如果将所有批次的结果数据进行累加使用updateStateByKey(func)来更新状态.
public class SparkStreamingFromSocketTotalWithJava {
public static void main(String[] args) throws InterruptedException {
//1:创建一个SparkConf对象
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("SparkStreamingFromSocketTotalWithJava");
//2:根据SparkConf对象创建JavaStreamingContext对象,并指定每个批次处理的时间间隔
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.sparkContext().setLogLevel("WARN");
jssc.checkpoint("./ck");
//3:注册一个监听的ip地址和端口,用来收集数据
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("spark-node01.itheima.com", 9999);
//4:切分每一行记录
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
//5:每个单词记为1
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
//6:对所有的单词进行聚合输出
JavaPairDStream<String, Integer> wordCounts = pairs.updateStateByKey((values,state)->{
Integer updateValue = 0;
if(state.isPresent()){
updateValue = state.get();
}
for (Integer value : values) {
updateValue += value;
}
return Optional.of(updateValue);
});
//7:打印数据
wordCounts.print();
//8:开启流式计算
jssc.start();
//Wait for the computation to terminate 等待计算终止
jssc.awaitTermination();
}
}
object SparkStreamingFromSocketTotalWithScala {
?
def updateFunction(newValues: Seq[Int], historyValue: Option[Int]): Option[Int] = {
val newCount =historyValue.getOrElse(0)+newValues.sum
Option.apply(newCount)
}
def main(args: Array[String]): Unit = {
//1:创建SparkConf对象
val conf:SparkConf = new SparkConf()
.setMaster("local[2]")
.setAppName("SparkStreamingFromSocketTotalWithScala")
//2:根据SparkConf对象创建StreamingContext对象,并制定时间间隔
val ssc:StreamingContext = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
ssc.checkpoint("./ckscala")
//3:注册监听端口,指定主机和端口
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("spark-node01.itheima.com",9999)
//4:切分每一行数据
val words: DStream[String] = lines.flatMap(line=>line.split(" "))
//5:单词没出现一次就记为1
val pairs: DStream[(String, Int)] = words.map(word=>(word,1))
//6:统计单词出现的次数
val wordCounts: DStream[(String, Int)] = pairs.updateStateByKey(updateFunction)
//7:打印结果
wordCounts.print()
//8:开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
通过函数updateStateByKey实现。根据key的当前值和key的之前批次值,对key进行更新,返回一个新状态的DStream.
使用Spark Streaming的开窗函数reduceByKeyAndWindow,实现单词统计计数.
public class SparkStreamingWindowWithJava {
public static void main(String[] args) throws InterruptedException {
//1:创建一个SparkConf对象
SparkConf conf = new SparkConf()
.setMaster("local[4]")
.setAppName("SparkStreamingWindowWithJava");
//2:根据SparkConf对象创建JavaStreamingContext对象,并指定每个批次处理的时间间隔
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.sparkContext().setLogLevel("WARN");
//3:注册一个监听的ip地址和端口,用来收集数据
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("spark-node01.itheima.com", 9999);
//4:切分每一行记录
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
//5:每个单词记为1
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
//6:进行开窗函数的计算
//reduceByKeyAndWindow 参数说明
//windowDuration:表示window框住的时间长度,如本例5秒切分一次RDD,框10秒,就会保留最近2次切分的RDD
//slideDuration:表示window滑动的时间长度,即每隔多久执行本计算
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(10), Durations.seconds(5));
//7:将结果打印到控制台
wordCounts.print();
//8:开启流式计算
jssc.start();
//9:等待终止退出
jssc.awaitTermination();
}
}
object SparkStreamingWindowWithScala {
def main(args: Array[String]): Unit = {
//1:创建SparkConf对象
val conf:SparkConf = new SparkConf()
.setMaster("local[2]")
.setAppName("SparkStreamingWindowWithScala")
//2:根据SparkConf对象创建StreamingContext对象,并制定时间间隔
val ssc:StreamingContext = new StreamingContext(conf, Seconds(5))
//3:注册监听端口,指定主机和端口
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("spark-node01.itheima.com",9999)
//4:切分每一行
val words: DStream[String] = lines.flatMap(line=>line.split(" "))
//5:单词出现1次就记为1(word,1)
val pairs: DStream[(String, Int)] = words.map(word=>(word,1))
//6:对pairs进行开窗统计
//参数说明
//reduceFunc:集合函数
//windowDuration:窗口的宽度,如本例5秒切分一次RDD,框10秒,就会保留最近2次切分的RDD
//slideDuration:表示window滑动的时间长度,即每隔多久执行本计算
val result: DStream[(String, Int)] = pairs.reduceByKeyAndWindow((a:Int, b:Int)=>a+b,Seconds(10),Seconds(5))
//7:打印结果
result.print()
//8:开启流式计算
ssc.start()
//9:等待终止退出
ssc.awaitTermination()
}
}
现象:Spark Streaming每隔5s计算一次当前在窗口大小为10s内的数据,然后将结果数据输出。窗口的宽度和滑动长度都应该是批次处理时间的整数倍.
public class SparkStreamingWindowHotWordsWithJava {
public static void main(String[] args) throws InterruptedException {
//1:创建一个SparkConf对象
SparkConf conf = new SparkConf()
.setMaster("local[4]")
.setAppName("SparkStreamingWindowWithJava");
//2:根据SparkConf对象创建JavaStreamingContext对象,并指定每个批次处理的时间间隔
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.sparkContext().setLogLevel("WARN");
//3:注册一个监听的ip地址和端口,用来收集数据
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("spark-node01.itheima.com", 9999);
//4:切分每一行记录
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
//5:每个单词记为1
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
//6:进行开窗函数的计算
//reduceByKeyAndWindow 参数说明
//windowDuration:表示window框住的时间长度,如本例5秒切分一次RDD,框10秒,就会保留最近2次切分的RDD
//slideDuration:表示window滑动的时间长度,即每隔多久执行本计算
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(10), Durations.seconds(5));
//7:对wordCounts进行排序
JavaPairDStream<String, Integer> finalDStream = wordCounts.transformToPair(wordCount->{
//将统计的每一个单词进行位置替换
JavaPairRDD<Integer, String> reverseWordCountRDD = wordCount.mapToPair(tuple2 -> new Tuple2<Integer, String>(tuple2._2, tuple2._1));
//对反转之后的rdd进行排序
JavaPairRDD<Integer, String> sortedWordCountRDD = reverseWordCountRDD.sortByKey(false);
//对排序之后的rdd进行再次反转
JavaPairRDD<String, Integer> reverSortedWordCount = sortedWordCountRDD.mapToPair(tuple2 -> new Tuple2<String, Integer>(tuple2._2, tuple2._1));
//对排序之后的结果取出前三位
List<Tuple2<String, Integer>> take = reverSortedWordCount.take(3);
//打印前三位
for (Tuple2<String, Integer> tuple2 : take) {
System.out.println(tuple2._1+"............."+tuple2._2);
}
return reverSortedWordCount;
});
//7:对wordCounts进行排序
/*JavaPairDStream<String, Integer> finalDStream = wordCounts.transformToPair(new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
@Override