码迷,mamicode.com
首页 > 其他好文 > 详细

Flink 侧输出流 SideOutput

时间:2020-05-15 00:38:29      阅读:167      评论:0      收藏:0      [点我收藏+]

标签:execution   nbsp   RoCE   san   产生   and   fun   class   array   

  大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。processfunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 sideoutput 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。processfunction 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。
  下面的代码演示了低于32F的温度信息进入到测输出流"freezing alert"中。
object SideOutputTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val socketStream = env.socketTextStream("hadoop102", 7777)

    val dataStream: DataStream[SensorReading] = socketStream.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
      })

    //低温报警处理
    val processStream = dataStream.process(new FreezingAlert)

    //打印主输出流
    processStream.print("process stream")

    //打印侧输出流。先得到某个测输出流。
    processStream.getSideOutput(new OutputTag[String]("freezing alert")).print("freezing alert")

    env.execute("window test")
  }
}

class FreezingAlert extends ProcessFunction[SensorReading, SensorReading] {

  lazy val tag = new OutputTag[String]("freezing alert")

  override def processElement(value: SensorReading,
                              ctx: ProcessFunction[SensorReading, SensorReading]#Context,
                              collector: Collector[SensorReading]): Unit = {
    if (value.temperature<32){
      //侧输出流
      ctx.output(tag,"freezing alert for " + value.temperature)
    }else{
      //主输出流
      collector.collect(value)
    }
  }

}

端口数据

[atguigu@hadoop102 ~]$ nc -lk 7777
sensor_1, 1547718200, 30
sensor_1, 1547718200, 25
sensor_1, 1547718200, 35

控制台打印

freezing alert> freezing alert for 30.0
freezing alert> freezing alert for 25.0
process stream> SensorReading(sensor_1,1547718200,35.0)

 

  

 

Flink 侧输出流 SideOutput

标签:execution   nbsp   RoCE   san   产生   and   fun   class   array   

原文地址:https://www.cnblogs.com/noyouth/p/12892078.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!