标签:execution nbsp RoCE san 产生 and fun class array
object SideOutputTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val socketStream = env.socketTextStream("hadoop102", 7777)
    val dataStream: DataStream[SensorReading] = socketStream.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
      })
    //低温报警处理
    val processStream = dataStream.process(new FreezingAlert)
    //打印主输出流
    processStream.print("process stream")
    //打印侧输出流。先得到某个测输出流。
    processStream.getSideOutput(new OutputTag[String]("freezing alert")).print("freezing alert")
    env.execute("window test")
  }
}
class FreezingAlert extends ProcessFunction[SensorReading, SensorReading] {
  lazy val tag = new OutputTag[String]("freezing alert")
  override def processElement(value: SensorReading,
                              ctx: ProcessFunction[SensorReading, SensorReading]#Context,
                              collector: Collector[SensorReading]): Unit = {
    if (value.temperature<32){
      //侧输出流
      ctx.output(tag,"freezing alert for " + value.temperature)
    }else{
      //主输出流
      collector.collect(value)
    }
  }
}
端口数据
[atguigu@hadoop102 ~]$ nc -lk 7777 sensor_1, 1547718200, 30 sensor_1, 1547718200, 25 sensor_1, 1547718200, 35
控制台打印
freezing alert> freezing alert for 30.0 freezing alert> freezing alert for 25.0 process stream> SensorReading(sensor_1,1547718200,35.0)
标签:execution nbsp RoCE san 产生 and fun class array
原文地址:https://www.cnblogs.com/noyouth/p/12892078.html