标签:注意 匹配 and 泛型方法 目标 env rac mes 通过
一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。
目标:从有序的简单事件流中发现一些高阶特征
输入:一个或多个由简单事件构成的事件流
处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
输出:满足规则的复杂事件
CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。
CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。
CEP的工作流图:
看起来很简单,但是它有很多不同的功能:
输入的流数据,尽快产生结果
在2个event流上,基于时间进行聚合类的计算
提供实时/准实时的警告和通知
在多样的数据源中产生关联并分析模式
高吞吐、低延迟的处理
市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的library支持。但是Flink提供了专门的CEP library。
Flink为CEP提供了专门的Flink CEP library,它包含如下组件:
Event Stream
pattern定义
pattern检测
生成Alert
首先,开发人员要在DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成告警。
为了使用Flink CEP,我们需要导入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
我们首先需要为Stream Event设计java pojo,但是注意,由于要对event对象进行对比,所以我们需要重写hashCode()方法和equals()方法。下面进行监控温度事件流。
创建抽象类MonitoringEvent,重写hashCode()和equals()方法;再创建POJO:TemperatureEvent,同样重写hashCode()和equals()方法:
MonitoringEvent:
TemperatureEvent:
创建env,创建source:
每个Pattern都应该包含几个步骤,或者叫做state。从一个state到另一个state,通常我们需要定义一些条件,例如下列的代码:
DataStream<Event> input = ... Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 42) // next表示"middle"事件紧跟着"start"事件发生 .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0) // followedBy表示"end"事件不一定紧跟着"middle"事件发生 .followedBy("end").where(evt -> evt.getName().equals("end")); PatternStream<Event> patternStream = CEP.pattern(input, pattern); DataStream<Alert> result = patternStream.select(pattern -> { return createAlertFrom(pattern); });
需求:同一个机箱连续两次温度超标,报警
拓展需求:锅炉房温度检测;信用卡反欺诈:连续大额消费;反作弊:同一个用户短时间内连续登陆失败
flink cep
pattern定义
pattern匹配
select选出匹配到的事件,报警
public class CEPExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<TemperatureEvent> inputEventStream = env.fromElements( //不是DataStreamSource类型 new TemperatureEvent("xyz", 22.0), new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2), new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7), new TemperatureEvent("xyz", 27.0)); // 定义Pattern,检查10秒钟内温度是否高于26度 Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent>begin("start") //加泛型 .subtype(TemperatureEvent.class) .where(new SimpleCondition<TemperatureEvent>() { private static final long serialVersionUID = 1L; @Override public boolean filter(TemperatureEvent value) throws Exception { if (value.getTemperature() >= 26) { return true; } return false; } }).within(Time.seconds(10)); //匹配pattern并select事件,符合条件的发生警告,即输出 //Alert类属性message,表示在满足一定的pattern条件后,需要告警的内容: DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern) //DataStream类型的 .select(new PatternSelectFunction<TemperatureEvent, Alert>() { @Override public Alert select(Map<String, List<TemperatureEvent>> pattern) throws Exception { return new Alert("Temperature Rise Detected: " + pattern.get("start").get(0).getTemperature() + " on machine name: " + pattern.get("start").get(0).getMachineName()); } }); patternStream.print(); env.execute(); } }
异常检测:同一个用户连续两次登陆失败,报警
flink cep
pattern定义
pattern匹配
select输出报警事件
//需求: 如果同一个userid在三秒之内连续两次登陆失败,报警。 public class FlinkLoginFail { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 这里mock了事件流,这个事件流一般从Kafka过来 DataStream<LoginEvent> loginEventStream = env.fromCollection(Arrays.asList( //自定义一个pojo类:userId、ip、type new LoginEvent("1", "192.168.0.1", "fail"), new LoginEvent("1", "192.168.0.2", "fail"), new LoginEvent("1", "192.168.0.3", "fail"), new LoginEvent("2", "192.168.10.10", "success") ));//不用DataStreamSource,使用DataStream Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("start") //泛型类或泛型接口上的泛型形参是不能用于静态成员的,那么当静态方法需要用到泛型时,只能用泛型方法。 .where(new IterativeCondition<LoginEvent>() { // 模式开始事件的匹配条件为事件类型为fail, 为迭代条件 @Override public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception { return loginEvent.getType().equals("fail"); } }).next("next") .where(new IterativeCondition<LoginEvent>() { // 事件的匹配条件为事件类型为fail @Override public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception { return loginEvent.getType().equals("fail"); } }).within(Time.seconds(3));// 要求紧邻的两个事件发生的时间间隔不能超过3秒钟 // 以userid分组,形成keyedStream,然后进行模式匹配 ::方法引用 PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern); DataStream<LoginWarning> loginFailDataStream = patternStream.select((Map<String, List<LoginEvent>> pattern) -> { List<LoginEvent> first = pattern.get("start"); List<LoginEvent> second = pattern.get("next"); return new LoginWarning(first.get(0).getUserId(), first.get(0).getIp(), first.get(0).getType()); });//不用SingleOutputStreamOperator类型的,使用 loginFailDataStream.print(); env.execute(); } }
标签:注意 匹配 and 泛型方法 目标 env rac mes 通过
原文地址:https://www.cnblogs.com/shengyang17/p/10858981.html