标签:返回 当前时间 thread splay mtime collect roc red for
Flink中的时间类型和窗口是非常重要概念,是学习Flink必须要掌握的两个知识点。
Flink流式处理中支持不同类型的时间。分为以下几种:
Flink支持的这几种时间刚好和我们上一篇播客中的内容相对应。
https://www.cnblogs.com/ilovezihan/p/12254479.html
应用一张Flink官网的图。
通常,我们在Flink初始化流式运行环境时,就会设置流处理时间特性。这个设置很重要,它决定了数据流的行为方式。(例如:是否需要给事件分配时间戳),以及窗口操作应该使用什么样的时间类型。例如:KeyedStream.timeWindow(Time.seconds(30))。
我们接下来通过实现一个每5秒中进行一次单词计数的案例,来说明Flink中如何指定时间类型。
public class WordCountWindow { public static void main(String[] args) throws Exception { // 1. 初始化流式运行环境 Configuration conf = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); // 2. 设置时间处理类型,这里设置的方式处理时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 3. 定义数据源,每秒发送一个hadoop单词 DataStreamSource<String> wordDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanaled = false; @Override public void run(SourceContext<String> ctx) throws Exception { while (!isCanaled) { ctx.collect("hadooop"); Thread.sleep(1000); } } @Override public void cancel() { isCanaled = true; } }); // 4. 每5秒进行一次,分组统计 // 4.1 转换为元组 wordDS.map(word -> Tuple2.of(word, 1)) // 指定返回类型 .returns(Types.TUPLE(Types.STRING, Types.INT)) // 按照单词进行分组 .keyBy(t -> t.f0) // 滚动窗口,3秒计算一次 .timeWindow(Time.seconds(3)) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }, new RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { @Override public void apply(String word, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { // 打印窗口开始、结束时间 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("窗口开始时间:" + sdf.format(window.getStart()) + " 窗口结束时间:" + sdf.format(window.getEnd()) + " 窗口计算时间:" + sdf.format(System.currentTimeMillis())); int sum = 0; Iterator<Tuple2<String, Integer>> iterator = input.iterator(); while(iterator.hasNext()) { Integer count = iterator.next().f1; sum += count; } out.collect(Tuple2.of(word, sum)); } }).print(); env.execute("app"); } }
窗口开始时间:2020-02-05 00:22:21 窗口结束时间:2020-02-05 00:22:24 窗口计算时间:2020-02-05 00:22:24
4> (hadooop,2)
窗口开始时间:2020-02-05 00:22:24 窗口结束时间:2020-02-05 00:22:27 窗口计算时间:2020-02-05 00:22:27
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:27 窗口结束时间:2020-02-05 00:22:30 窗口计算时间:2020-02-05 00:22:30
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:30 窗口结束时间:2020-02-05 00:22:33 窗口计算时间:2020-02-05 00:22:33
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:33 窗口结束时间:2020-02-05 00:22:36 窗口计算时间:2020-02-05 00:22:36
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:36 窗口结束时间:2020-02-05 00:22:39 窗口计算时间:2020-02-05 00:22:39
我们可以看到,这个滚动窗口,每3秒计算一次,是按照系统时间来计算的。
我们再把时间窗口设置为1分钟,再试试。
窗口开始时间:2020-02-05 00:27:00 窗口结束时间:2020-02-05 00:28:00 窗口计算时间:2020-02-05 00:28:00
4> (hadooop,32)窗口开始时间:2020-02-05 00:28:00 窗口结束时间:2020-02-05 00:29:00 窗口计算时间:2020-02-05 00:29:00
4> (hadooop,60)
刚好在 00:27:00 – 00:28:00之间。
参考文件:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html
标签:返回 当前时间 thread splay mtime collect roc red for
原文地址:https://www.cnblogs.com/ilovezihan/p/12262245.html