标签:
长度窗口实现原理图
上图长度窗口为5,事件W1至W5进入引擎后属于NewEvents队列,事件W6进入引擎后,W2至W6就属于NewEvents队列,而事件W1就属于OldEvents队列了。NewEvents为先进先出队列,队列长度为EPL语句中制定的长度窗口大小,OldEvent队列为过期数据的存放队列。
EPL长度窗口示例
1 | select count(*) as result from orderEvent.win:time_batch(3 sec) |
时间窗口实现原理图
说明:
从此图可以看出,随着时间推移,每个进入到引擎的W事件都是newEvents,即Insert Stream。W后括号里的值为属性值(可忽略),超过EPL语句设置的时间窗口值的事件将进入OldEvent队列.
时间周期格式
year-part : (number|variable_name) ("years" | "year")
month-part : (number|variable_name) ("months" | "month")
week-part : (number|variable_name) ("weeks" | "week")
day-part : (number|variable_name) ("days" | "day")
hour-part : (number|variable_name) ("hours" | "hour")
minute-part : (number|variable_name) ("minutes" | "minute" | "min")
seconds-part : (number|variable_name) ("seconds" | "second" | "sec")
milliseconds-part : (number|variable_name) ("milliseconds" | "millisecond" | "msec")
EPL时间窗口示例
1 | // 统计最近三秒内获取的事件中salary的平均值 |
2 | String epsql = "select avg(salary) as result from orderEvent.win:time(3 sec)"; |
数据类型转换(cast)
示例:
1 | // avg(salary)默认返回为Double类型,cast(avg(salary),int)转换为int类型; |
2 | // 30秒内 salary 的平均值 |
3 | String epsql = "select bean,name,cast(avg(salary),int) from orderEvent.win:time(3 msec)"; |
Annotation(注解)
示例:
1 | String epsql = "@Name(\"EsperEvent\")select avg(salary) from OrderEventMap.win:length_batch(2)"; |
2 | |
3 | EPStatement epstate = epAdmin.createEPL(epsql); |
4 | epstate.addListener(new orderListener()); |
5 | System.out.println("Name is ["+epstate.getName()+"]"); |
Expression(自定义函数)
格式:
Expression expression_name { expression_body }
expression_name为自定义的Expression名,expression_body为Expression的具体内容。
expression_body表现形式为:(input_param[,…] ]) => expression
input_param为事件流别名(不能和事件流同名)
示例:
1 | // 创建转换函数 add |
2 | String expSql = "create expression add { x => x.salary+500 }"; |
3 | // 将oe(orderEvent)事件流中的salary属性值加500 |
4 | String epsql = "select add(oe) as s from orderEvent.win:length_batch(1) as oe"; |
5 | |
6 | EPStatement expstate = epAdmin.createEPL(expSql); |
7 | expstate.addListener(new orderListener()); |
8 | EPStatement epstate = epAdmin.createEPL(epsql); |
9 | epstate.addListener(new orderListener()); |
自定义静态方法的应用
示例(事件流过滤)
1 | // 判断总数是否等于0 |
2 | public class IsZero |
3 | { |
4 | public static boolean isZero(int sum) |
5 | { |
6 | return sum==0; |
7 | } |
8 | } |
9 | |
10 | // 加载 |
11 | epService.getEPAdministrator().getConfiguration().addImport(IsZero.class); |
12 | |
13 | // 查询没有钱的用户的name值(User包含name和money属性) |
14 | select name from orderEvent(IsZero.isZero(salary)) |
注意:
1、要过滤的属性只能是数字和字符串。
2、过滤表达式中不能使用聚合函数。
示例(事件流转换输出)
通过自定义类的静态方法转换事件流的输出属性。
BaseUntil.java(静态方法实现类)
1 | public class BaseUntil { |
2 | |
3 | public static int Add(int n){ |
4 | return n+100; |
5 | } |
6 | |
7 | public static String UpdataText(String str){ |
8 | return str+",你好!"; |
9 | } |
10 | } |
11 | |
12 | // 字节码加载 |
13 | epService.getEPAdministrator().getConfiguration().addImport(BaseUntil.class); |
14 | |
15 | String epsql = "select BaseUntil.UpdataText(name) as result from orderEvent "; |
Esper系列(二)时间窗口、长度窗口、cast、注解、自定义函数、静态方法
标签:
原文地址:http://www.cnblogs.com/jianyuan/p/4827771.html