标签:0ms 滚动 return auto 并且 允许 api spark 比较
在实际的流式计算中数据到来的顺序对计算结果的正确性有至关重要的影响
比如:某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有2秒的延时,也就是在实际时间的第1秒产生的数据有可能在第3秒中产生的数据之后到来。
假设在一个5秒的滚动窗口中,有一个EventTime是 9秒的数据,在第11秒时候到来了。
图示:
Flink支持不同的时间概念
Processing Time(处理时间)
Event Time(事件时间)
Ingestion Time(摄入时间)
Process time 与 Event time对比:
设置时间特行
Flink程序的第一部分工作通常是设置时间特性,该设置用于定义数据源使用什么时间,在时间窗口处理中使用什么时间。
代码:
// 设置执行环境, 类似spark中初始化SparkContext
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
WaterMark 产生背景
WaterMark 介绍
WaterMark 的产生方式
代码:
package com.ronnie.flink.stream.test;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
/**
*
hello,2019-09-17 11:34:05.890
hello,2019-09-17 11:34:07.890
hello,2019-09-17 11:34:13.890
hello,2019-09-17 11:34:08.890
hello,2019-09-17 11:34:16.890
hello,2019-09-17 11:34:19.890
hello,2019-09-17 11:34:21.890
*/
public class WaterMarkTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 设置多久查看一下当前的水位线... 默认200ms
env.getConfig().setAutoWatermarkInterval(10000);
System.err.println("interval : " + env.getConfig().getAutoWatermarkInterval());
DataStreamSource<String> streamSource = env.socketTextStream("ronnie01", 9999);
SingleOutputStreamOperator<String> watermarks = streamSource.assignTimestampsAndWatermarks(new MyWaterMark());
watermarks.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] split = value.split(",");
String key = split[0];
return new Tuple2<String, Integer>(key, 1);
}
}).keyBy(0)
.timeWindow(Time.seconds(10))
// 自定义的一个计算规则......
.apply(new MyWindowFunction())
.printToErr();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
class MyWaterMark implements AssignerWithPeriodicWatermarks<String>{
// 目前系统里所有数据的最大事件时间
long currentMaxTimeStamp = 0;
// 允许数据延迟5s
long maxLateTime = 5000;
Watermark wm = null;
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Nullable
@Override
// 周期性地获取目前的水位线时间, 默认200ms
public Watermark getCurrentWatermark() {
// 未处理的延迟/乱序问题
// wm = new Watermark(currentMaxTimeStamp);
// 处理数据的延迟/乱序问题
wm = new Watermark(currentMaxTimeStamp - maxLateTime);
System.out.println(format.format(System.currentTimeMillis()) + " 获取当前水位线: " + wm + ","+ format.format(wm.getTimestamp()));
return wm;
}
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
String[] split = element.split(",");
String key = split[0];
long timestamp = 0;
try {
//将2019-09-17 10:24:50.958 格式时间转成时间戳
timestamp = format.parse(split[1]).getTime();
} catch (ParseException e) {
e.printStackTrace();
}
// 对比新数据的时间戳和目前最大的时间戳, 取大的值作为新的时间戳
currentMaxTimeStamp= Math.max(timestamp, currentMaxTimeStamp);
System.err.println(key +", 本条数据的时间戳: "+ timestamp + "," +format.format(timestamp)
+ "|目前数据中的最大时间戳: "+ currentMaxTimeStamp + ","+ format.format(currentMaxTimeStamp)
+ "|水位线时间戳: "+ wm + ","+ format.format(wm.getTimestamp()));
return timestamp;
}
}
class MyWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>{
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
int sum = 0;
for (Tuple2<String, Integer> tuple2:input){
sum += tuple2.f1;
}
long start = window.getStart();
long end = window.getEnd();
out.collect("key:" + tuple.getField(0) + " value: " + sum + "| window_start :"
+ format.format(start) + " window_end :" + format.format(end)
);
}
}
事件时间(event time)与水印(watermark)
标签:0ms 滚动 return auto 并且 允许 api spark 比较
原文地址:https://www.cnblogs.com/ronnieyuan/p/11848725.html