标签:ring parallel style reading val conf extends over mes
输入
实现 SourceFunction[...]
object SourceFunctionExample { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val numbers = env.addSource(new CountSource) numbers.print() env.execute() } } class CountSource extends SourceFunction[Long] { var isRunning: Boolean = true override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { var cnt: Long = -1 while (isRunning && cnt < Long.MaxValue) { cnt += 1 ctx.collect(cnt) } } override def cancel(): Unit = isRunning = false }
输出
实现 RichSinkFunction[...]
object SinkFunctionExample { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.setAutoWatermarkInterval(1000L) val readings = env .addSource(new SensorSource) .assignTimestampsAndWatermarks(new SensorTimeAssigner) readings.addSink(new SimpleSocketSink("localhost", 9191)) .setParallelism(1) env.execute() } } class SimpleSocketSink(val host: String, val port: Int) extends RichSinkFunction[SensorReading] { var socket: Socket = _ var writer: PrintStream = _ override def open(parameters: Configuration): Unit = { socket = new Socket(host, port) writer = new PrintStream(socket.getOutputStream) } override def close(): Unit = { writer.close() socket.close() } override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { writer.println(value.toString) writer.flush() } }
233
标签:ring parallel style reading val conf extends over mes
原文地址:https://www.cnblogs.com/lemos/p/12657673.html