标签:datatime add 函数 create span select hat creat tac
在Flink去重第一弹:MapState去重中介绍了使用编码方式完成去重,但是这种方式开发周期比较长,我们可能需要针对不同的业务逻辑实现不同的编码,对于业务开发来说也需要熟悉Flink编码,也会增加相应的成本,我们更多希望能够以sql的方式提供给业务开发完成自己的去重逻辑。本篇介绍如何使用sql方式完成去重。
为了与离线分析保持一致的分析语义,Flink SQL 中提供了distinct去重方式,使用方式:
SELECT DISTINCT devId FROM pv
表示对设备ID进行去重,得到一个明细结果,那么我们在使用distinct来统计去重结果通常有两种方式, 仍然以统计每日网站uv为例。
第一种方式
SELECT datatime,count(DISTINCT devId) FROM pv group by datatime
该语义表示计算网页每日的uv数量,其内部核心实现主要依靠DistinctAccumulator与CountAccumulator,DistinctAccumulator 内部包含一个map结构,key 表示的是distinct的字段,value表示重复的计数,CountAccumulator就是一个计数器的作用,这两部分都是作为动态生成聚合函数的中间结果accumulator,透过之前的聚合函数的分析可知中间结果是存储在状态里面的,也就是容错并且具有一致性语义的
其处理流程是:
第二种方式
select count(*),datatime from(
select distinct devId,datatime from pv ) a
group by datatime
内部是一个对devId,datatime 进行distinct的计算,在flink内部会转换为以devId,datatime进行分组的流并且进行聚合操作,在内部会动态生成一个聚合函数,该聚合函数createAccumulators方法生成的是一个Row(0) 的accumulator 对象,其accumulate方法是一个空实现,也就是该聚合函数每次聚合之后返回的结果都是Row(0),通过之前对sql中聚合函数的分析(可查看GroupAggProcessFunction函数源码), 如果聚合函数处理前后得到的值相同那么可能会不发送该条结果也可能发送一条撤回一条新增的结果,但是其最终的效果是不会影响下游计算的,在这里我们简单理解为在处理相同的devId,datatime不会向下游发送数据即可,也就是每一对devId,datatime只会向下游发送一次数据;
外部就是一个简单的按照时间维度的计数计算,由于内部每一组devId,datatime 只会发送一次数据到外部,那么外部对应datatime维度的每一个devId都是唯一的一次计数,得到的结果就是我们需要的去重计数结果。
两种方式对比
标签:datatime add 函数 create span select hat creat tac
原文地址:https://www.cnblogs.com/pucheung/p/12184771.html