码迷,mamicode.com
首页 > 其他好文 > 详细

SparkStreaming

时间:2019-07-01 13:58:53      阅读:125      评论:0      收藏:0      [点我收藏+]

标签:oca   app   tap   ack   技术   get   array   建表   pac   

1.workcount

技术图片

package dayo7

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NewworkWordCount {
Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    //new Conf
    val conf = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" )

    //创建ssc  第二个参数是时间间隔
    val ssc = new StreamingContext ( conf, Seconds ( 2 ) )

    //获取数据
    val result = ssc.socketTextStream ( "192.168.186.150", 1234 )

    //处理数据,输出打印
    val result2 = result.flatMap ( _.split ( " " ) ).map ( (_, 1) ).reduceByKey ( _ + _ ).print ()

    //开启sparkStreaming
    ssc.start ()

    //创建阻塞线程
    ssc.awaitTermination ()
  }
  }

2.将数据写到redis

开启redis bin/redis-server  etc/redis.conf  查看端口 ps -ef|grep redis

package dayo7

import day08.Jpoods
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCountToRedis1 {
Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    //new xonf
    val conf=new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")

    //创建SparkStreaming
    val ssc=new StreamingContext(conf,Seconds(2))

    //读取数据
    val result: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.186.150",1234)


    result.foreachRDD(rdd=>{
      //处理数据
      val result2=rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

   //写入到redis
      result2.foreachPartition(filter=>{
        val jedis=Jpoods.getJedis()

        //创建表
        filter.foreach(tp=>{
          jedis.hincrBy("zi",tp._1,tp._2)
        })

        //关闭jedis
        jedis.close()

      })

    })

    //开启SparkStraming
    ssc.start()

    //创建阻塞线程
    ssc.awaitTermination()
  }
}

 

SparkStreaming

标签:oca   app   tap   ack   技术   get   array   建表   pac   

原文地址:https://www.cnblogs.com/wangshuang123/p/11113347.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!