标签:oca app tap ack 技术 get array 建表 pac
1.workcount
package dayo7 import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object NewworkWordCount { Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { //new Conf val conf = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" ) //创建ssc 第二个参数是时间间隔 val ssc = new StreamingContext ( conf, Seconds ( 2 ) ) //获取数据 val result = ssc.socketTextStream ( "192.168.186.150", 1234 ) //处理数据,输出打印 val result2 = result.flatMap ( _.split ( " " ) ).map ( (_, 1) ).reduceByKey ( _ + _ ).print () //开启sparkStreaming ssc.start () //创建阻塞线程 ssc.awaitTermination () } }
2.将数据写到redis
开启redis bin/redis-server etc/redis.conf 查看端口 ps -ef|grep redis
package dayo7 import day08.Jpoods import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCountToRedis1 { Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { //new xonf val conf=new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") //创建SparkStreaming val ssc=new StreamingContext(conf,Seconds(2)) //读取数据 val result: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.186.150",1234) result.foreachRDD(rdd=>{ //处理数据 val result2=rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //写入到redis result2.foreachPartition(filter=>{ val jedis=Jpoods.getJedis() //创建表 filter.foreach(tp=>{ jedis.hincrBy("zi",tp._1,tp._2) }) //关闭jedis jedis.close() }) }) //开启SparkStraming ssc.start() //创建阻塞线程 ssc.awaitTermination() } }
标签:oca app tap ack 技术 get array 建表 pac
原文地址:https://www.cnblogs.com/wangshuang123/p/11113347.html