标签:soc contex 统计 生成 context 处理 打印 出现 line
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 创建一个本地模式的StreamingContext, 两个工作线程, 1s的批处理间隔
//Master要求2个核,以防出现饥饿情况
object Socket {
def main(args: Array[String]): Unit = {
// Spark配置项
val conf = new SparkConf().setAppName("Socket").setMaster("local[*]")
// 创建流式上下文,1s批处理间隔
val ssc = new StreamingContext(conf, Seconds(1))
// 创建一个DStream,链接指定的hostname: prot, 比如localhost: 9999
val lines = ssc.socketTextStream("localhost", 9999)
// 将收到的每条信息分割成词语
val words = lines.flatMap(_.split(" "))
// 统计每个batch的词频
val pairs = words.map(word => (word, 1))
// 词频汇总
val WordCounts = pairs.reduceByKey(_+_)
// 打印从Dstream中生成的RDD的前10个元素到控制台
WordCounts.print()
ssc.start() //开始计算
ssc.awaitTermination() //等待计算结束
}
}
标签:soc contex 统计 生成 context 处理 打印 出现 line
原文地址:https://www.cnblogs.com/tan2022/p/14962772.html