码迷,mamicode.com
首页 > 其他好文 > 详细

Flink输出到Elasticsearch

时间:2020-01-10 15:57:24      阅读:282      评论:0      收藏:0      [点我收藏+]

标签:text   cse   lin   apache   list   contex   温度传感器   访问   host   

1.代码

import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

//温度传感器读取样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)

object EsSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

//source
val inputStream = env.readTextFile("sensor1.txt")

//transform
import org.apache.flink.api.scala._
val dataStream = inputStream.map(x => {
val arr = x.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})

//sink
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
//创建一个esSink的Builder
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading] (
httpHosts,
new ElasticsearchSinkFunction[SensorReading] {
override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
print("saving data" + t)
//包装成一个Map或者JsonObject
val hashMap = new util.HashMap[String, String]()
hashMap.put("sensor_id", t.id)
hashMap.put("temperature", t.temperature.toString)
hashMap.put("timestamp", t.timestamp.toString)
//创建index request,准备发送数据
val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(hashMap)
//发送请求,写入数据
requestIndexer.add(indexRequest)
println("data saved successfully")
}
}
)

dataStream.addSink(esSinkBuilder.build())

env.execute("es sink test")

}
}

2.启动Elasticsearch
3.访问: 127.0.0.1:9200/sensor/_search?pretty ,结果如下

  技术图片

Flink输出到Elasticsearch

标签:text   cse   lin   apache   list   contex   温度传感器   访问   host   

原文地址:https://www.cnblogs.com/wddqy/p/12176053.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!