码迷,mamicode.com
首页 > 其他好文 > 详细

FLINK SIDDHI ADDON 学习笔记

时间:2018-02-25 11:11:12      阅读:447      评论:0      收藏:0      [点我收藏+]

标签:siddhi   flink   

SIDDHI 是一款功能强大的CEP 引擎,具有自己的DSL,丰富的模式匹配功能和可扩展性, 感谢陈浩同学提供了SIDDHI和FLINK的整合功能 https://github.com/haoch/flink-siddhi 本文主要介绍了这个ADDON的一些实现思路

  1. 将FLINK STREAM 转化为 SIDDHI STREAM 定义

??用法: SiddhiCEP.registerStream(streamName, FlinkDataStream, fieldNames)

??通过 FlinkDataStream.getType 获得流对象的类型定义.registerStream方法会构造一个 SiddhiStreamSchema 对象,根据流对象的类型定义,自动获取每个field对应的数据类型存在内部的fieldTypes数组中.

??SiddhiStreamSchema 内部会创建一个Siddhi StreamDefinition对象, StreamDefinition的attribute的定义根据fieldNames + fieldTypes 来添加.SiddhiTypeFactory.getAttributeType 负责将Flink 的数据类型映射为Siddhi的数据类型, 并可自动生成一段Define Stream的定义(见 SiddhiStreamSchema.getStreamDefinitionExpression 方法) define stream [streamName] ([fieldName 1] [fieldType 1], ...[fieldName n] [fieldType n])

??SiddhiStreamSchema 包括一个StreamSerializer: 用于将DataStream 中的对象转化为 Siddhi Stream 的输入(Object Array):
????如果流对象是一个简单类型 Atomic Type 直接将流对象放到 ARRAY中
????如果流对象是Tuple 类型,直接将Tuple 中前N个值放到ARRAY中
????如果流对象是Pojo或者CaseClass类型,直接根据每个fieldName 获取Class对应的属性放到ARRAY中

  1. 串联FLINK STREAM 和 SIDDHI STREAM

??SiddhiStream: 抽象的Stream基类

??convertDataStream 将原始的FLINK流转化为Tuple类型的流,Tuple的第一个元素为StreamId, 第二个元素为原来流中的数据,支持普通Stream 和 KeyedStream

??ExecutionSiddhiStream: 构建 SiddhiOperatorContext 并调用SiddhiStreamFactory.createDataStream 创建了集成Siddhi的 DataStream. DataStream的类型为Tuple的子类.SiddhiTypeFactory.getTupleTypeInformation: 其核心思路是通过Siddhi输出Stream的StreamDefinition获得其Attribute的定义,再通过 TypeInfoParser.parse构造Flink Tuple 类型的定义

??ExecutableStream 根据Siddhi query 创建ExecutionSiddhiStream对象
??SingleSiddhiStream, UnionSiddhiStream: ExecutableStream 的子类,支持Fluent Style的链式调用. UnionSiddhiStream 调用了DataStream.union 方法

??SiddhiStreamFactory.createDataStream 通过 FLINK DataStream的transform方法使用了自定义的StreamOperator: SiddhiStreamOperator. 在 AbstractSiddhiOperator 的 setup 方法中创建SiddhiManager 和 SiddhiAppRuntime 并注册了 InputHandler 和 OutputCallback (StreamOutputHandler)

??SiddhiStreamOperator.processElement 需要处理两种场景:
????Flink TimeCharacteristic = ProcessingTime: 先调用StreamSerializer将数据转化为Object Array, 再直接调用InputHandler.send将数据发送给Siddhi处理
????Flink TimeCharacteristic = EventTime: 缓存接收到的StreamRecord 到内部的priorityQueue中,直到收到Watermark, 将priorityQueue中小于watermark的StreamRecord一次发送给Siddhi处理

  StreamOutputHandler:根据Output的TypeInfo将Siddhi Event 转化为 Flink StreamRecord. 再转发到SiddhiStreamOperator的Output

  1. CHECKPOINT

??SiddhiStreamOperator中保留了两种State信息,一种是priorityQueue中保存的由于watermark未发送给Siddhi的消息. 另一种是Siddhi本身的State, 通过SiddhiAppRuntime.snapshot() 获得

FLINK SIDDHI ADDON 学习笔记

标签:siddhi   flink   

原文地址:http://blog.51cto.com/shadowisper/2072801

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!