标签:frequency div 介绍 空格 use 扩展 park dsd first
??Spark Streaming是Spark 核心API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。 数据可以从诸如Kafka,Flume,Kinesis或TCP套接字的许多来源中获取,并且可以使用由高级功能(如map,reduce,join和window)表达的复杂算法进行处理。 最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。 事实上,您可以在数据流上应用Spark的机器学习和图形处理算法。
??在内部,它的工作原理如下。 Spark Streaming接收实时输入数据流,并将数据分成批,然后由Spark引擎对其进行处理,以批量生成最终的结果流。
??Spark Streaming提供称为离散流或DStream的高级抽象,它表示连续的数据流。 可以从诸如Kafka,Flume和Kinesis等来源的输入数据流中创建DStream,或者通过对其他DStream应用高级操作来创建。 在内部,DStream表示为一系列RDD。
??本指南介绍如何开始使用DStreams编写Spark Streaming程序。 您可以在Scala,Java或Python(在Spark 1.2中引入)中编写Spark Streaming程序。
??DStream是一个抽象的概念, 表示一系列的RDD
//使用两个工作线程和1秒的批量间隔创建本地StreamingContext
SparkConf conf = new SparkConf().setAppName("Spark Streaming WordCount").setMaster("local[2]");
// 创建该对象就类似于Spark Core中的JavaSparkContext,类似于Spark SQL中的SQLContext
// 该对象除了接受SparkConf对象,还要接受一个Batch Interval参数,就是说,每收集多长时间数据划分一个batch去进行处理
// 这里我们看Durations里面可以设置分钟、毫秒、秒,这里设置10秒
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
//首先创建输入的DStream, 代表一个数据源如:从Scoket或Kafka来持续不断的获取实时的数据流
//此处创建一个监听端口的Scoket的数据流, 这里面就会每10秒生成一个RDD,RDD的元素类型为String,就是一行行的文本
JavaDStream<String> lines = jsc.socketTextStream("192.168.1.224", 9999);
lines
的DStream表示将从数据服务器接收的数据流。 此流中的每条记录都是一行文本。 然后,我们要将空格划分为单词。 //使用Spark Core提供的算子直接作用于DStreams上, 算子底层会应用在里面的每个RDD上面,RDD转换后的新RDD会作为新DStream中RDD
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
//Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//将此DStream中生成的每个RDD的前十个元素打印到控制台
wcs.print();
The words DStream is further mapped (one-to-one transformation) to a DStream of
(word, 1) pairs, using a PairFunction object. Then, it is reduced to get the
frequency of words in each batch of data, using a Function2 object. Finally,
wcs.print() will print a few of the counts generated every second.
words
DStream进一步映射(one-to-one transformation)到(word,1)pairs
的DStream。 然后,使用Function2对象减少每批数据中的单词的频率。 最后,wcs.print()
将打印每秒产生的几个计数。jsc.start(); // Start the computation
jsc.awaitTermination(); // Wait for the computation to terminate
yum install nc
nc -lk 9999
在控制台写入数据
package com.chb.spark.streaming;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Spark Streaming WordCount").setMaster("local[2]");
// 创建该对象就类似于Spark Core中的JavaSparkContext,类似于Spark SQL中的SQLContext
// 该对象除了接受SparkConf对象,还要接受一个Batch Interval参数,就是说,每收集多长时间数据划分一个batch去进行处理
// 这里我们看Durations里面可以设置分钟、毫秒、秒,这里设置10秒
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
//首先创建输入的DStream, 代表一个数据源如:从Scoket或Kafka来持续不断的获取实时的数据流
//此处创建一个监听端口的Scoket的数据流, 这里面就会每10秒生成一个RDD,RDD的元素类型为String,就是一行行的文本
JavaDStream<String> lines = jsc.socketTextStream("192.168.1.224", 9999);
//使用Spark Core提供的算子直接作用于DStreams上, 算子底层会应用在里面的每个RDD上面,RDD转换后的新RDD会作为新DStream中RDD
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 最后每次计算完,都打印一下这10秒钟的单词计数情况,并休眠5秒钟,以便于我们测试和观察
wcs.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
package com.chb.scala
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Counts words in UTF8 encoded, ‘\n‘ delimited text received from the network every second.
*
* Usage: NetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
*/
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by ‘nc‘)
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
JavaDStream<String> lines = jsc.textFileStream("hdfs://192.168.1.224:9000/user/root/");
标签:frequency div 介绍 空格 use 扩展 park dsd first
原文地址:http://blog.csdn.net/wuxintdrh/article/details/71077098