标签:you link schema 客户 span alt doc last second
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@Override public void run(SourceContext<MyType> ctx) throws Exception { while (/* condition */) { MyType next = getNext(); ctx.collectWithTimestamp(next, next.getEventTimestamp()); if (next.hasWatermarkTime()) { ctx.emitWatermark(new Watermark(next.getWatermarkTime())); } } }
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<MyEvent> stream = env.readFile( myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter(), typeInfo); DataStream<MyEvent> withTimestampsAndWatermarks = stream .filter( event -> event.severity() == WARNING ) .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()); withTimestampsAndWatermarks .keyBy( (event) -> event.getGroup() ) .timeWindow(Time.seconds(10)) .reduce( (a, b) -> a.add(b) ) .addSink(...);
/** * This generator generates watermarks assuming that elements arrive out of order, * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. */ public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> { private final long maxOutOfOrderness = 3500; // 3.5 seconds private long currentMaxTimestamp; @Override public long extractTimestamp(MyEvent element, long previousElementTimestamp) { long timestamp = element.getCreationTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the out-of-orderness bound return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } /** * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */ public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> { private final long maxTimeLag = 5000; // 5 seconds @Override public long extractTimestamp(MyEvent element, long previousElementTimestamp) { return element.getCreationTime(); } @Override public Watermark getCurrentWatermark() { // return the watermark as current time minus the maximum time lag return new Watermark(System.currentTimeMillis() - maxTimeLag); } }
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> { @Override public long extractTimestamp(MyEvent element, long previousElementTimestamp) { return element.getCreationTime(); } @Override public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) { return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null; } }
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props); kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() { @Override public long extractAscendingTimestamp(MyType element) { return element.eventTimestamp(); } }); DataStream<MyType> stream = env.addSource(kafkaSource);
Flink 1.8 Generating Timestamps, Watermarks 生成时间戳, 水印
标签:you link schema 客户 span alt doc last second
原文地址:https://www.cnblogs.com/sxpujs/p/11385678.html