码迷,mamicode.com
首页 > 其他好文 > 详细

Flink输出到Redis

时间:2020-01-10 14:09:06      阅读:210      评论:0      收藏:0      [点我收藏+]

标签:local   key   set   override   mda   rri   long   oca   定义   

   1.代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

//温度传感器读取样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)

object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

//source
val inputStream = env.readTextFile("sensor1.txt")

//transform
import org.apache.flink.api.scala._
val dataStream = inputStream.map(x => {
val arr = x.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})

//sink
val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()

dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper))

env.execute("redis sink test")
}
}

//定义一个redis的mapper类,用于定义保存到redis时调用的命令
class MyRedisMapper extends RedisMapper[SensorReading] {
override def getCommandDescription: RedisCommandDescription = {
//把传感器id和温度值保存成哈希表: HSET key field value
new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
}

//相当于是field
override def getKeyFromData(data: SensorReading): String = {
data.id
}

override def getValueFromData(data: SensorReading): String = {
data.temperature.toString
}

}

2.结果
 

技术图片

Flink输出到Redis

标签:local   key   set   override   mda   rri   long   oca   定义   

原文地址:https://www.cnblogs.com/wddqy/p/12175808.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!