标签:ring parallel style reading val conf extends over mes
输入
实现 SourceFunction[...]
object SourceFunctionExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val numbers = env.addSource(new CountSource)
numbers.print()
env.execute()
}
}
class CountSource extends SourceFunction[Long] {
var isRunning: Boolean = true
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
var cnt: Long = -1
while (isRunning && cnt < Long.MaxValue) {
cnt += 1
ctx.collect(cnt)
}
}
override def cancel(): Unit = isRunning = false
}
输出
实现 RichSinkFunction[...]
object SinkFunctionExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(1000L)
val readings = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
readings.addSink(new SimpleSocketSink("localhost", 9191))
.setParallelism(1)
env.execute()
}
}
class SimpleSocketSink(val host: String, val port: Int) extends RichSinkFunction[SensorReading] {
var socket: Socket = _
var writer: PrintStream = _
override def open(parameters: Configuration): Unit = {
socket = new Socket(host, port)
writer = new PrintStream(socket.getOutputStream)
}
override def close(): Unit = {
writer.close()
socket.close()
}
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
writer.println(value.toString)
writer.flush()
}
}
233
标签:ring parallel style reading val conf extends over mes
原文地址:https://www.cnblogs.com/lemos/p/12657673.html