码迷,mamicode.com
首页 > 其他好文 > 详细

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

时间:2018-08-10 01:29:33      阅读:200      评论:0      收藏:0      [点我收藏+]

标签:spark   分享   evel   win   form   example   cond   void   bsp   

官网文档:《http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example》

Spark Streaming官网的例子reduceByKeyAndWindow

简单的介绍了spark streaming接收socket流的数据,并把接收到的数据进行windows窗口函数对数据进行批量处理。

import java.util.Arrays;                
                
import org.apache.spark.SparkConf;                
import org.apache.spark.api.java.JavaSparkContext;                
import org.apache.spark.streaming.Durations;                
import org.apache.spark.streaming.api.java.JavaDStream;                
import org.apache.spark.streaming.api.java.JavaPairDStream;                
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;                
import org.apache.spark.streaming.api.java.JavaStreamingContext;                
                
import scala.Tuple2;                
                
public class HelloWord {                
    public static void main(String[] args) throws InterruptedException {            
        // Create a local StreamingContext with two working thread and batch interval of 1 second        
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount");        
        JavaSparkContext jsc=new JavaSparkContext(conf);        
        jsc.setLogLevel("WARN");        
        JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(1));        
                
        // Create a DStream that will connect to hostname:port, like localhost:9999        
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("xx.xx.xx.xx", 19999);        
                
        // Split each line into words        
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());        
                
        // Count each word in each batch        
        JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));        
                
        // Reduce last 60 seconds of data, every 30 seconds        
        JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2,         
                Durations.seconds(60), 
                Durations.seconds(30));
                
        // Print the first ten elements of each RDD generated in this DStream to the console        
        windowedWordCounts.print();        
                
        jssc.start(); // Start the computation        
        jssc.awaitTermination(); // Wait for the computation to terminate        
    }
}

输入数据:

技术分享图片

窗口中数据随着时间的变化:

技术分享图片

实际工作中上边的代码统计出的结果:

技术分享图片

Window操作解读:

        // Reduce last 60 seconds of data, every 30 seconds        
        JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2,         
                Durations.seconds(60), 
                Durations.seconds(30));

上边代码的意义就是:按照key对value进行求count,数据处理范围是60s内的数据,每隔30s统计一次。

Spark Streaming提供了窗口计算,它允许你对滑动窗口上的数据使用变换(transformations)。下图说明了滑动窗口:

技术分享图片

上图介绍了,两个信息:

1)original DStream:Spark Streaming是把一段时间接收到的流作为一个批数据“也就是图中上边绿色框框示意内容”;

2)windowed DStream:窗口每次滑动就是把“滑动长度(时间)”内的数据合并到一起进行一次运算,另外"‘滑动长度(时间)‘内的数据"受两个因素影响:“窗口时长”、“水印时长”。

上边的例子及图可以充分解释为什么每次窗口触发时参与计算的数据受“窗口时长”的影响。“窗口时长”实际上就是定义每次窗口事件处罚时,参与计算的数据长度(范围)。

 

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

标签:spark   分享   evel   win   form   example   cond   void   bsp   

原文地址:https://www.cnblogs.com/yy3b2007com/p/9452489.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!