标签:asi ati ble streaming rod res package ext text
package com.sjw.flink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
object KafkaSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")
val dataDS: DataStream[String] = stream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble).toString
})
dataDS.addSink(new FlinkKafkaProducer011[String]("sunjunwei1.com:9092","sensor",new SimpleStringSchema()))
env.execute("kafka sink")
}
}
标签:asi ati ble streaming rod res package ext text
原文地址:https://www.cnblogs.com/whyuan/p/13276888.html