标签:使用方法 col 组合模式 实现 ssi rom stream get cto
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.9.1</version>
</dependency>
场景: 用户登录, 在整个模式匹配的规则在5秒内,如果连续两次登录失败,则发出警告。。
代码:
LoginEvent
package com.ronnie.flink.demo.cep;
public class LoginEvent {
private String userId;//用户ID
private String ip;//登录IP
private String type;//登录类型
public LoginEvent() {
}
public LoginEvent(String userId, String ip, String type) {
this.userId = userId;
this.ip = ip;
this.type = type;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "LoginEvent{" +
"userId='" + userId + '\'' +
", ip='" + ip + '\'' +
", type='" + type + '\'' +
'}';
}
}
LoginWarning
package com.ronnie.flink.demo.cep;
public class LoginWarning {
private String userId;
private String type;
private String ip;
public LoginWarning() {
}
public LoginWarning(String userId, String type, String ip) {
this.userId = userId;
this.type = type;
this.ip = ip;
}
@Override
public String toString() {
return "LoginWarning{" +
"userId='" + userId + '\'' +
", type='" + type + '\'' +
", ip='" + ip + '\'' +
'}';
}
}
LoginWarningDemo
package com.ronnie.flink.demo.cep;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* 整个模式匹配的规则在5秒内,如果连续两次登录失败,则发出警告。。
*/
public class LoginFailWarningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource loginEventStream = env.fromCollection(Arrays.asList(
new LoginEvent("1", "192.168.0.1", "fail"),
new LoginEvent("1", "192.168.0.2", "fail"),
new LoginEvent("1", "192.168.0.3", "fail"),
new LoginEvent("2", "192.168.10,10", "fail"),
new LoginEvent("2", "192.168.10,10", "success")
));
// 开启一个模式匹配规则
Pattern<LoginEvent, LoginEvent> begin = Pattern.begin("begin");
// 模式匹配的条件
Pattern<LoginEvent, LoginEvent> p1 = begin.where(new IterativeCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
// 过滤掉非fail的, 只返回fail的
return loginEvent.getType().equals("fail");
}
});
// 追加一个新的模式。 匹配事件必须直接跟着先前的匹配事件(严格连续性)
Pattern<LoginEvent, LoginEvent> next = p1.next("next");
// 新模式的匹配条件
Pattern<LoginEvent, LoginEvent> p2 = next.where(new IterativeCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
// 第二次匹配
return loginEvent.getType().equals("fail");
}
});
// 定义事件序列进行模式匹配的最大间隔。如果未完成的事件序列超过此时间, 则将其丢弃:
Pattern<LoginEvent, LoginEvent> p3 = p2.within(Time.seconds(5)); // 注意导的是窗口的时间
PatternStream patternStream = CEP.pattern(loginEventStream.keyBy("userId"), p3);
patternStream.select(new PatternSelectFunction<LoginEvent, LoginWarning>() {
@Override
public LoginWarning select(Map<String, List<LoginEvent>> pattern) throws Exception {
List<LoginEvent> begin01 = pattern.get("begin");
System.out.println("------- Begin List ------");
begin01.stream().forEach(System.out::println);
System.out.println("------- next list ------");
List<LoginEvent> next01 = pattern.get("next");
next01.stream().forEach(System.out::println);
return new LoginWarning(next01.get(0).getUserId(), next01.get(0).getType(), next01.get(0).getIp());
}
});
loginEventStream.printToErr();
env.execute();
}
}
单个模式
Pattern可以是单单个,也可以是循环模式。
单个模式接受单个事件,而循环模式可以接受多个事件。
在模式匹配符号中,模式“a b + c?d”(或“a”,后跟一个或多个“b”,可选地后跟“c”,后跟“d”),a,c ?,和d是单例模式,而b +是循环模式。 [吐槽一下: 搞得跟正则似的]
默认情况下,模式是单个模式,可以使用Quantifiers将其转换为循环模式。
每个模式可以有一个或多个条件,基于它接受事件。
1.1. Quantifiers
在FlinkCEP中,您可以使用以下方法指定循环模式:pattern.oneOrMore(),用于期望一个或多个事件发生的模式(例如之前提到的b +);
pattern.times(#ofTimes), 用于期望给定类型事件的特定出现次数的模式
patterntimes(#fromTimes,#toTimes),用于期望给定类型事件的最小出现次数和最大出现次数的模式
可以使用pattern.greedy()方法使循环模式变得贪婪,但是还不能使组模式变得贪婪。
可以使用pattern.optional()方法使得所有模式,循环与否,变为可选。
```java
// expecting 4 occurrences 出现4次
start.times(4);
// expecting 0 or 4 occurrences 出现 0 或 4 次
start.times(4).optional();
// expecting 2, 3 or 4 occurrences 出现 2, 3, 4 次
start.times(2, 4);
// expecting 2, 3 or 4 occurrences and repeating as many as possible
// 出现 2, 3, 4次 并重复尽可能多次
start.times(2, 4).greedy();
// expecting 0, 2, 3 or 4 occurrences 出现0, 2, 3, 4次
start.times(2, 4).optional();
// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
//
start.times(2, 4).optional().greedy();
// expecting 1 or more occurrences 出现一次或多次
start.oneOrMore();
// expecting 1 or more occurrences and repeating as many as possible
// 出现一次或多次 并 重复多次
start.oneOrMore().greedy();
// expecting 0 or more occurrences 出现0或多次
start.oneOrMore().optional();
// expecting 0 or more occurrences and repeating as many as possible
// 出现 0 或 多次并重复尽可能多次
start.oneOrMore().optional().greedy();
// expecting 2 or more occurrences 出现两次或多次
start.timesOrMore(2);
// expecting 2 or more occurrences and repeating as many as possible
// 出现两次或多次并尽可能重复多次
start.timesOrMore(2).greedy();
// expecting 0, 2 or more occurrences and repeating as many as possible
//出现0, 2或多次并重复尽可能多次
start.timesOrMore(2).optional().greedy();
```
1.2. Conditions
在每个模式中,从一个模式转到下一个模式,可以指定其他条件。您可以将使用下面这些条件:
传入事件的属性,例如其值应大于5,或大于先前接受的事件的平均值。
匹配事件的连续性,例如检测模式a,b,c,序列中间不能有任何非匹配事件。
1.3. Conditions on Properties
1.4 API 简介
- 定义当前模式的条件。 为了匹配模式,事件必须满足条件。 多个连续的where(),其条件为AND:
or(condition)
until(condition)
subtype(subClass)
pattern.subtype(SubEvent.class);
oneOrMore()
指定此模式至少发生一次匹配事件。
默认情况下, 使用宽松的内部连续性。
注意点: 建议使用until() 或 within() 来启用状态清除
pattern.oneOrMore().until(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // alternative condition 可替代的条件
}
});
timesOrMore(#times)
指定此模式至少需要 #times 次出现匹配事件。
默认情况下, 使用宽松的内部连续性(在后续事件之间)。
pattern.timesOrMore(2);
pattern.time(#ofTimes)
指定此模式需要匹配事件的确切出现次数。
默认情况下, 使用宽松的内部连续性 (在后续事件之间)。
pattern.times(2);
times(#fromTimes, #toTimes)
指定此模式期望在匹配事件的#fromTimes 和 #toTimes次之间出现。
默认情况下, 使用宽松的内部连续性。
pattern.times(2, 4);
optional()
指定此模式是可选的,即有可能根本不会发生。 这适用于所有上述量词。
pattern.oneOrMore().optional();
greedy()
指定此模式是贪婪的,即它将尽可能多地重复。 这仅适用于quantifiers,目前不支持组模式。
pattern.oneOrMore().greedy();
consecutive()
与oneOrMore()和times()一起使用并在匹配事件之间强加严格的连续性,即任何不匹配的元素都会中断匹配。
如果不使用,则使用宽松的连续性(如followBy())。
例如以下模式:
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
allowCombinations()
与oneOrMore()和times()一起使用,并在匹配事件之间强加非确定性宽松连续性(如 followedByAny()
如果不应用,则使用宽松的连续性(如followBy())。
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
组合模式
模式序列必须以初始模式开始,如下所示:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
可以通过指定它们之间所需的连续条件,为模式序列添加更多模式。
可以使用:
next() 对应 严格, followedBy() 对应 宽松连续性 followedByAny() 对应 非确定性宽松连续性
或:
notNext() 如果不希望一个事件类型紧接着另一个类型出现。 notFollowedBy() 不希望两个事件之间任何地方出现该事件。
注意 模式序列不能以notFollowedBy()结束。
注意 NOT模式前面不能有可选模式
// strict contiguity 强连续性
Pattern<Event, ?> strict = start.next("middle").where(...);
// relaxed contiguity 松连续性
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// non-deterministic relaxed contiguity 不可确认的松连续性
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// NOT pattern with strict contiguity 非模式强连续性
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// NOT pattern with relaxed contiguity 非模式松连续性
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
宽松连续性指的是仅第一个成功匹配的事件会被匹配到,然而非确定性宽松连续性,相同的开始会有多个匹配结果发出。距离,如果一个模式是"a b",给定输入序列是"a c b1 b2"。对于不同连续性会有不同输出。
a和b之间严格连续性,将会返回{},也即是没有匹配。因为c的出现导致a,被抛弃了。
a和b之间宽松连续性,返回的是{a,b1},因为宽松连续性将会抛弃为匹配成功的元素,直至匹配到下一个要匹配的事件
a和b之间非确定性宽松连续性,返回的是{a,b1},{a,b2}。
也可以为模式定义时间约束。 例如,可以通过pattern.within()方法定义模式应在10秒内发生。 时间模式支持处理和事件时间。
注意: 模式序列只能有一个时间约束。 如果在不同的单独模式上定义了多个这样的约束,则应用最小的约束。
next.within(Time.seconds(10));
PatternPatte <Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);
// strict contiguity
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
API
begin(#name)
Pattern<Event, ?> start = Pattern.<Event>begin("start");
begin(#pattern_sequence)
Pattern<Event, ?> start = Pattern.<Event>begin(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
next(#name)
Pattern<Event, ?> next = start.next("middle");
next(#pattern_sequence)
Pattern<Event, ?> next = start.next(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
followedBy(#name)
Pattern<Event, ?> followedBy = start.followedBy("middle")
Pattern<Event, ?> followedBy = start.followedBy(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
Pattern<Event, ?> followedByAny = start.followedByAny("middle");
Pattern<Event, ?> followedByAny = start.followedByAny(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
Pattern<Event, ?> notNext = start.notNext("not");
Pattern<Event, ?> notFollowedBy = start.notFollowedBy("not");
pattern.within(Time.seconds(10));
匹配后的跳过策略
对于给定模式,可以将同一事件分配给多个成功匹配。 要控制将分配事件的匹配数,需要指定名为AfterMatchSkipStrategy的跳过策略。 跳过策略有四种类型,如下所示:
SKIP_TO_LAST:丢弃包含PatternName最后一个匹配事件之前的每个部分匹配。
注意点:
使用SKIP_TO_FIRST和SKIP_TO_LAST跳过策略时,还应指定有效的PatternName。
例如,对于给定模式a b {2}和数据流ab1,ab2,ab3,ab4,ab5,ab6,这四种跳过策略之间的差异如下:
要指定要使用的跳过策略,只需调用以下命令创建AfterMatchSkipStrategy:
使用方法:
AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("patternName", skipStrategy);
检测模式 - Detecting Patterns
指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配。 要针对模式序列运行事件流,必须创建PatternStream。
给定输入流 input,模式 pattern 和可选的比较器 comparator,用于在EventTime的情况下对具有相同时间戳的事件进行排序或在同一时刻到达,通过调用以下命令创建PatternStream
DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
根据实际情况,创建的流可以是有key,也可以是无key的。
注意点: 在无key的流上使用模式,将导致job的并行度为1。
Selecting from Patterns
获得PatternStream后,您可以通过select或flatSelect方法从检测到的事件序列中进行查询。
select()方法需要PatternSelectFunction的实现。 PatternSelectFunction具有为每个匹配事件序列调用的select方法。
它以Map <String,List >的形式接收匹配,其中key是模式序列中每个模式的名称,值是该模式的所有已接受事件的列表(IN是输入元素的类型)。
给定模式的事件按时间戳排序。 返回每个模式的接受事件列表的原因是当使用循环模式(例如oneToMany()和times())时,对于给定模式可以接受多个事件。
选择函数只返回一个结果。
class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
@Override
public OUT select(Map<String, List<IN>> pattern) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
return new OUT(startEvent, endEvent);
}
}
class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
@Override
public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
for (int i = 0; i < startEvent.getValue(); i++ ) {
collector.collect(new OUT(startEvent, endEvent));
}
}
}
处理超时部分模式
每当模式具有通过within关键字附加的时间窗口长度时,部分事件序列可能因为超出时间窗口长度而被丢弃。
为了对这些超时的部分匹配作出相应的处理,select和flatSelect API调用允许指定超时处理程序。
超时处理程序接收到目前为止由模式匹配的所有事件,以及检测到超时时的时间戳。
为了处理部分模式,select和flatSelect API提供了一个带参数的重载版本
PatternTimeoutFunction/ PatternFlatTimeoutFunction。
OutputTag 超时的匹配将会在其中返回。
PatternSelectFunction / PatternFlatSelectFunction。
PatternStreamPatte <Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
outputTag,
new PatternSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);
SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
outputTag,
new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
时间时间模式下处理滞后数据
在CEP中,元素处理的顺序很重要。
为了保证在采用事件事件时以正确的顺序处理事件,最初将传入的事件放入缓冲区,其中事件基于它们的时间戳以升序排序
并且当watermark到达时,处理该缓冲区中时间戳小于watermark时间的所有元素。这意味着watermark之间的事件按事件时间顺序处理。
注意点:
在采用事件时间时,CEP library会假设watermark是正确的。
为了保证跨watermark的记录按照事件事件顺序处理,Flink的CEP库假定watermark是正确的,并将时间戳小于上次可见watermark的时间视为滞后事件。滞后事件不会被进一步处理。
标签:使用方法 col 组合模式 实现 ssi rom stream get cto
原文地址:https://www.cnblogs.com/ronnieyuan/p/11869607.html