码迷,mamicode.com
首页 > 其他好文 > 详细

大数据之发送到卡夫卡

时间:2020-07-10 00:36:15      阅读:78      评论:0      收藏:0      [点我收藏+]

标签:asi   ati   ble   streaming   rod   res   package   ext   text   

package com.sjw.flink

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011

object KafkaSink {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")

val dataDS: DataStream[String] = stream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble).toString
})
dataDS.addSink(new FlinkKafkaProducer011[String]("sunjunwei1.com:9092","sensor",new SimpleStringSchema()))

 

env.execute("kafka sink")
}
}

大数据之发送到卡夫卡

标签:asi   ati   ble   streaming   rod   res   package   ext   text   

原文地址:https://www.cnblogs.com/whyuan/p/13276888.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!