码迷,mamicode.com
首页 > 其他好文 > 详细

大数据之获取数据

时间:2020-07-10 00:44:51      阅读:89      评论:0      收藏:0      [点我收藏+]

标签:ase   端口   case   初始   nts   集合   ide   org   system   

package com.sjw.flink

import java.util.{Properties, Random}

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011


object SourceTest {

def main(args: Array[String]): Unit = {
//搭建环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment

//1.集合中获取数据
val stream1 = environment
.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.80018327300259),
SensorReading("sensor_6", 1547718201, 15.402984393403084),
SensorReading("sensor_7", 1547718202, 6.720945201171228),
SensorReading("sensor_10", 1547718205, 38.101067604893444)
))

// stream1.print("stream1:").setParallelism(1)

//2.文件中读取
val stream2 = environment.readTextFile("src/main/resources/words.txt")
// stream2.print("stream2").setParallelism(1)

//3.从不同的元素中读取
// environment.fromElements(1,2.0,"string").print("stream3")

//4.从kafka中读取数据
val properties = new Properties()
//设置主机名和端口
properties.setProperty("bootstrap.servers","sunjunwei1.com:9092")
//设置消费者组
properties.setProperty("group.id","consumer-group")
//设置key的反序列化器
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
//设置value的反序列化器
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
//kafka偏移量自动重置
properties.setProperty("auto.offset.reset","latest")

val stream4 = environment.addSource(new FlinkKafkaConsumer011[String]("flink_kafka",new SimpleStringSchema(),properties))
stream4.print("stream4").setParallelism(1)

//5.自定义数据源
val stream5 = environment.addSource(new SensorSource())
// stream5.print("stream5").setParallelism(1)

//启动
environment.execute("source test")
}

}

case class SensorReading(id:String,timestamp:Long,temperature:Double)

class SensorSource() extends SourceFunction[SensorReading]{

//定义一个flag,表示数据是否正常运行
var flag:Boolean = true

//取消数据源生成
override def cancel() = {
flag = false
}

override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit ={
//初始化一个随机数生成器
val random = new Random()
//初始化一组传感器温度数据
var curTemp = 1.to(10).map(
i => ("sensor_" + i, 60 + random.nextGaussian() * 20)
)
//用循环不断产生数据
while (flag){
curTemp = curTemp.map(
t => (t._1, t._2 + random.nextGaussian())
)
//当前时间
val curTime = System.currentTimeMillis()
curTemp.foreach(
t=>sourceContext.collect(SensorReading(t._1,curTime,t._2))
)
//设置时间间隔
Thread.sleep(500)
}

}
}

大数据之获取数据

标签:ase   端口   case   初始   nts   集合   ide   org   system   

原文地址:https://www.cnblogs.com/whyuan/p/13276896.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!