标签:des cWeb c style class blog
为啥写这篇文章?因为我现在做的这套实时计算系统在公司里很难玩下去了。去年年初来到ctrip,主要就是做两个实时应用,一个是实时报警,功能是做出来了,但应用效果不好;一个是XXX(敏感应用,不敢写出来,以XXX代替),也是实现了功能需求,但想继续按自己的思路往下走是不可能了,我捉急的表达能力很难让上头去理解实时计算和传统的request-response方式的应用不同点在哪里,为啥要这么干,也很难明白上头喜欢的系统是什么样的,真是挠破头。我的方式看来是做不下去了,但总觉得还是有点价值,所以写下来,看看有没有也想山寨的同学,可以作为参考,下面一段是扯淡,想看实际内容的请跳到系统结构。
至于为什么起这个标题:
后续系统设计都是以这些为出发点的
storm是目前比较火热的实时处理系统,虽然不能和H系的比,但资料也还是不少,我这就默认大家已经知道storm的概况了,具体的资料就不举了!
国内而言,阿里系对storm的应用比较多,网上有很多的文章;在ctrip,也有另外一个team在用storm做前端的用户行为分析,感觉是蛮成功的,应该算公司里拿得出手的项目之一了。还有很多公司也是用storm在做一些实时的业务开发。
storm本身只是提供了一个实时处理的框架,帮我们解决了大部分的可靠性,数据流向组织,并发性等基本问题,但是实际的数据处理还是要根据业务需求去开发代码适配。因此它只是解决了实时计算的组织和管理,而对计算本身是没有支持的,直接用是达不到我想要的“不写代码只配置”的效果,所以我把重心放在esper上,storm只作为外部的容器,帮我做数据的简单处理和sharding,节点的自动分配和重启,数据源的组织和数据结果的分配等等外围的功能,计算就交给esper了。
esper
绝壁是个好东西,功能强大,但门槛太高。资料的话官网上看下例子,也有一篇很详尽的文档,这是html版,有兴趣的同学可以上官网下pdf版。
简单的介绍的话,esper是一个用于针对数据流进行流式处理的lib库(java和.net都有),他跟传统的数据库不同之处在于:数据库是数据先写进来,再定义一个sql,然后去拉取数据并计算一次得到结果;esper是定义一个计算规则,并作为一个计算节点,然后再把数据不停的推给他,由计算节点不停的做增量计算,并出来一系列的结果。
上图是传统的数据库(也有用nosql的)的方式,目前很多项目应该都是采取这种设计,比如我要做一个5分钟的数据统计,就需要不停的跑sql去拉数据做统计和计算(当然可以提前做一些预聚合,数据库也应该有触发器之类的功能,不过只能作为优化,也很难去掌控)。这种pull的方式容易理解,也有已经成熟到烂街的数据库技术支持,pull的时候靠分库分表来sharding,只是要自己写点聚合的代码,对单个查询来说很快,因为只要一次数据库查询,一跳就完了,数据的查询和存储都由数据库来保证高效。但对于数据量大,然后又要实时更新的场景来说,这种低效的方式是走不通的,图上就可以看到因为数据和计算节点的分离,势必造成冗余的数据被多次拉取(或者要写复杂的代码去优化),而且实时度靠轮询来达到。这种设计要么只能适合于数据量和计算量比较小的场景,要么只能适合于人傻钱多的场景。但它好理解。
也有说拿redis做counter的,但这种方式还是解决不了数据和计算分离的问题;在者,对于功能稍复杂的计算就力不从心了,而且redis的逻辑抽象程度远不如sql,开发工作比sql都要大很多。具体的不展开了
此图中下方每个方框代表一个用esper实现的实时计算引擎,配置好运算规则后,我们主动去把数据喂过去,引擎不停的做增量计算,每次有新结果就通过回调通知我们。这个图是公司内部写ppt时临时画的,不大恰当。因为esper是个lib,单实例的可靠性和性能没法scale,所以我们是架在storm上,由storm去自动部署和分配多个esper进程,并在前端做sharding来达到高扩展性。
select avg(price) from StockTickEvent.win:time(30 sec) select a.id, count(*) from pattern [ every a=Status -> (timer:interval(10 sec) and not Status(id=a.id) ] group by id
esper声称自己是SEP(stream event processing)和cep(complex event processing)。上面从官网抄了两个比较有代表性的例子来分别说明。
这两个例子还只是揭露了esper能力和复杂度冰山一角。对于流式的数据处理和事件间pattern的描述,它提供了很多的底层支持和选项。基本我们的需求都能得到满足,一种功能还能用好多种写法来实现,越用就越是觉得它强。
然而,强大的东西不容易掌控。我是在前公司paypal时听说这个开源软件的,当时老板的另外一个team花了很大的力气和资源在上面,希望做实时大数据,我离开不久听说这个team就散了,大概因为没出好的成果,员工图谱上,我和cto之间那条线上的老板们都陆续黯然离开了;而我自己,是在来到ctrip才接触,因为大概知道前同事的不顺利,所以是带着敬畏心在做,小心翼翼,尽量让它更易用,结果虽然功能是出来了,奖也拿了,还是被骂,结局也离屎差不多了。所以想用它的要当心啊,不祥之物啊:)
不过失败不能白失败,经验教训要总结下的,希望能成为它人的成功之母:
我同事在opentsdb(后端是hbase)的思路上开发了一个叫dashboard的系统,可以对海量metrics进行实时的存储和查询。,同事对整个前后端都进行了重写和改进,细节上有蛮多独到之处,我所有程序的监控,包括storm的监控和想要做的计算平台的监控,都是基于此。这应该是ctrip里非常好的一个系统了。
下面分别从逻辑和物理上描述整个系统的结构。
目前的系统设计,对一个应用,包含了四大部分
所以整套系统是一个典型的输入(part 1) --> 处理(part 2&3) --> 输出(part
4)的结构,每个应用只需给出四大部分的配置,就可以得到一个实时事件处理应用。
这里需要补充说明的是:
OPERATION(input1, input2)-->
output1
的最简单的范式,input和output都是系统里的数据流,用variable的叫法更贴切。对于data source来说,框架部分开发出不同类型数据源的数据抽取驱动,可以对以下数据类型数据源进行数据拉取:
对于每一种数据源,大致只需要定义元数据信息,就可以完成外部的数据拉取并到系统内部数据(一般称为event,包括name、key、timestamp、value四大固有属性和其他属性)格式的转换:
目前大部分数据源都是根据应用写死,但长期希望抽象出特征来,可以通过配置自动完成,只剩下少数特别的通过开发完成。
这一份的结果是我们可以从每个datasource源源不断的得到数据,因此就是一条条数据流,经过alignment后(时间对齐),要求所有事件都以相同的时钟下汇聚成一个逻辑上的总的数据流,这是系统最大的limitation)
对于进入系统的数据流(stream),我们可以对此进行一些操作(包括但不限于split/join/aggreation/filtering),实时形成新的数据流,得到一系列variable(可以对应为BI中的维度,风控模型中的variable),如下图所示
进入这部分系统时有两个数据流DS1, DS2;经过处理后,得到6个Variable(每个Variable主要包括name,key,timestamp,value几个固有属性和其他用户定义的属性),每个variable其实也是随时间变化的数据流,通过加工后的数据会更有利于作进一步的决策;最后在某一个时间点,对所有variable进行切片,可以得到一系列的latest值,这样就可以做为决策规则(模型)的输入依据,这一部分由rule management部分完成。
实际上,从数学的角度看,这部分工作希望以variable(数据流/维度,counter只是其中一种)这种数据流作为基本单元,完成一个带少量操作符的代数系统,从而整个计算过程可以由这样一些基本的操作符去搭建一个DAG,而不是从头到尾全部由程序员编码实现功能。
操作部分基本由esper完成,通过封装,将esper实现相关的部分封装掉,只提供逻辑上的运算符给用户/admin,减轻使用负担。目前只提供少量基本的操作类型:
a.
(已实现)单线聚合/过滤。提供基于单个数据流(variable或data
source)的按时间聚合、按条件过滤。通过此操作可以实现split/aggregate/filter等逻辑操作
b.
(已实现)双线merge。对两条数据流(variable)进行合并操作,实现简单的join(根据key和timestamp严格匹配)。
c.
(未实现)多线merge。将多条数据流(variable)根据key去所有的latest值,进行合并计算。
以上是系统中现在和将要实现的操作符,目前看效果不是很好,比如操作符a带的功能太多,希望一个操作符就能解决多个问题,对用户并不贴切,应该拆分为aggregate,filter(split用多个filter来实现)。
对于每个应用来说,直接拿底层框架的操作符进行配置可以基本满足需求。在资源充足的情况下,每个应用可以在框架的variable体系开发一套更高一级抽象层的操作符,来方便应用的使用。参见实时报警的实现
同时,variable部分会提供dump操作,定期的将variable值dump到hbase中,供后续查询。这个功能主要是为了事后分析和离线查询,目前还没有在生产启用。最近打算先简单写一路进dashboard,以利用其实时查询和展现的能力。至于是否以后要扩展,要看最后项目的发展了
Rule这块会直接对上一部的variable进行操作,通过用户提供的阈值来得出有价值的信息(暂且称为alert),并且根据后续用户配置的action操作分发到外部处理的地方。
目前,规则管理准备实现以下三种:
这里需要注明的事,rule和variable有部分重合,rule的一些功能后续也能扩展到variable里实现,两者的区别在于:
Dispatch引擎会将rule engine产生的alerts进行分发,供用户进行进一步访问。这一快将与data source一块形成相对应的关系,目标是通过配置将alerts推送到制定的地方:
App 管理作为后续计划(nice to have),应该是没可能实现了,本打算实现以下功能。
这一部分主要设计了逻辑上数据流的定义,以及其整个生命周期,下一段讲一下我们简单的物理结构是如何去分别实现各个功能的。
之前已经提到过,我们是storm+esper的形式,esper负责内部绝大部分的计算,storm负责外围的组织。整个系统实现起来非常简单,如图
外部数据过来有两种方式:
storm的多个spout/bolt节点获取外部数据后,成为统一的格式event(包含name,key,timestamp和其它特定属性),并sharding(需要key)到不同esper 节点,作进一步计算。
每一个esper节点,都已经是运行在单个机器的单个进程上了,所以在这里将不同来源的数据流对齐(需要timestamp),并作一些优化处理(比如去除冗余的对象)。最后这些event就分配到esper引擎了。
variable计算引擎一方面将这些外部数据转化为简单变量(一种我自己约定格式的esper数据流),这些简单变量在一些操作符下可以生成其它变量,有最简单的操作符
insert into
outputVar select * from inputVar where $condition
insert into
outputVar select count/sum/avg(*) as value,... from inputVar:time:win(5
min)
insert into outputVar1 select * from
inputVar where $condition;insert into outputVar2 select * from inputVar where
not $condition
insert into outputVar select inputVar1.*,
inputVar2.* from inputVar1.latest(key), inputVar2.latest(key) where
inputVar1.key = inputVar2.key
,这个伪epl表示将两个原始数据流,以其key作为
单位,将最新值拼接起来,这里还可作一些运算。举个例子,有一个变量源数据来源于pc访问,另一个变量数据来源于移动app(可能用户再家里同时用手机和pc访问),按ip/uid去做join,就可以得到这个ip的完整视图这些基本的操作符都非常简单,不会太多,开发起来也比较容易。只是要定义一下配置项,以及运算到底层esper语句的映射。我们目前新加一个基本variable操作的话,后端只用新增一个文件即可,但前端比较难做,还没有想到很好的解决方法。
当然,对于一些特殊的应用,简单的操作符可能抽象度较低,难以直接使用,可以在简单操作之上进一步封装,详细的参考case
study里面实时报警的就可以了
变量出来之后,就可以通过rule来过滤出我们感兴趣的内容了:
insert into rule1
select * from inputVar where value > 5
insert into rule1
select ... from inputVar1.latest(key), inputVar2.latest(key) where
inputVar1.key = inputVar2.key and (inputVar1.value 5 or inputVar2.value
<10)
最后产生的结果,通过各种方式送到系统下游的各个handler。由于storm这种灵活的代码运行框架,这一点很容易做到,不详细叙述了。
ctrip内部用dashboard系统对各个应用的内部状态进行监控,包括物理的/逻辑的,利用hbase的特性,获得了实时聚合和展示的能力。我们的实时报警项目,就希望能自动化的去拉取数据,找出异常点。
公司内部也有其他报警系统,会对订单这些作一些同环比,阈值报警,系统跑的很好很有效。我们这边的报警系统更多的在于系统监控方面,会有一些更大的挑战,一是数量更大,我可能要对每个hostip,每个url,甚至是他们的组合作为单位去检查并报警;二是简单的阈值规则对上层业务来说有价值,对底层来说没太大指导作用,超出定义的阈值是比较常见的情况,结局就是大量的报警邮件发出来了,但没人关心,自己反倒成了公司内部最大的垃圾邮件制造商,所以需要各种更加复杂类型的报警:
这里可以看出,尽管有各种复杂的规则类型,但基本的操作是相同的,所以只要在基本操作上封装一层即可。下面是我的一条规则配置,由于没有太多资源专门管理这些五花八门的规则,我将所有配置项揉到了一起,看着复杂些,但管理成本稍微低些(因为各个部分的配置相互间可以排列组合,分开来成本太高)。
{ "namespace": "ns", "group": "group, "name": "rulename", // 这三项只是名字标识,方便rule管理 "config": { "dashboardUrl": "http://xxxxx", // dashboard url,返回json数据 "timeAdjustment": 300, // 表示取多久之前的数据,同环比就在这个配置项有不同 "dataType": "Single", // 是否拉取到的值直接报警,还是要做同环比,或者是多值计算 "period": 0, // 以下四项配置同环比的参数 "ops": "-", "oldCondition": "", "newCondition": "", "secondUrl": "", // 以下四项配置配置双metrics计算 "secondTimeAdjustment": 0, "dualOps": "-", "firstCondition": "", "secondCondition": "", "valueType": "", // 以下三项配置是对之前的原始值或计算值直接检测,还是要看变化率(差或比值) "formerPointCondition": "", "latterPointCondition": "", "triggerType": "fixed", // 以下四项配置阈值类型,要么直接给阈值,要么给个标识符,用户自己去写复杂的阈值 "lower": 0, "upper": 30000, "thresholdName": "", "conditionWindow": 0, // 以下两项控制规则多次命中才报警,减少误报 "conditionCount": 0, "cooldown": 600, // 报警cooldown,防止短期内重复报警 "to": "wenlu@ctrip.com", // 报警邮件配置 "cc": "", "bcc": "", "mailInterval": 60, "cats": "", "catsEnv": "", "catsLevel": "info", "catsMessage": "", "catsDevid": "", "catsName": "", "catsPriority": "info", "type": "MetricsAlertingRule", // 表示规则类型,不同值会触发不同操作 "desc": "" }, "type": "MetricsAlertingRule", "desc": "", "status": "on", "app_subid": "auto@hotel_product_common_utility_logging_responsetime_30s" // 自动生成底层数据时用给的标识符 }
这算是一条顶层规则,系统会自动生成一系列底层的data
source/variable/rule/action,对顶层的CRUD操作也会自动映射为底层的操作。
这样开发一条新规则只用考虑如何去运用底层的计算平台,可以在更高的抽象层次上去开发,而不是从头到尾重新开发一遍。
总体而言,实时报警的功能是实现了,但运用的不好,而且是给底层人民用的,可视度不高。
XXX应用是敏感项目,会讲的含糊些。
这个项目数据量比较大,基本上对ctrip的每次访问都要促发一系列的运算和规则检验。过滤后,每秒k级数据,目前有几十个变量和规则,意味着每秒上w次的聚合操作和检测。生产上用了三台机器跑storm集群,实时上借助于storm+esper的高效,单台测试机(16g内存+6核cpu),已经能基本扛得住如此量的运行(不过最近随着变量的增加和流量的上升已经比较勉强,需要考虑可能的优化了)。
公司另外一个项目,用的是我之前提的数据库的方式。他们的数据跨度比较大,需要很长时间段的数据,而不局限于当前,总数据量是xxx项目当前时间段数据的几倍,但计算触发频次较低,每秒几十次,如果不算db的话,用了10台服务器,据说cpu使用率20%不到。虽然我很不喜欢拿这两个系统去比较,因为解决的问题和适用的场景不一样,就好像拿乔丹和贝利去比较,但由此得出XXX项目效率比较低这种结论是很难让人心服口服的。esper这种基于本地内存的效率远不是基于db或者是redis这种异地内存的系统可以比拟的,他的滑动窗口的计算效率也不是简单算法就能超过的。如果说可靠性和成熟度倒是可以让人服帖的。
这个项目一方面展示了当前架构能扛得住中等流量的冲击,另一方面本意是尝试让用户能自主的动态的创建变量:
上图的拓扑是根据用户的需求演化而来,其实具有一定的普适性,通过一定的过滤找到具体的感兴趣的数据,进行聚合得到统计特征,在此之上才能建出灵活的规则来。
图中最下方有个join变量,目前还没用到,是为了多数据源(多设备或者多数据中心的数据)的数据整合。有种说法是直接从数据源上进行合并,但这样会增加数据源的复杂度,破坏整体的结构,还不能完全覆盖;另一方面,join过滤后的数据远比join原始数据高效的多得多
这一套之前一半用代码,一半靠实时计算系统的规则,最近才抽象出来,本来打算完全迁移到实时计算系统中,把整个图的掌控交给用户,由用户去控制整个DAG的结构构造和节点配置,这样灵活性比较高,通过统一的监控功能可以让用户掌控每一环节的具体信息。这让我想起去年有家国内公司来推销大数据的,他们学术背景是可视计算。现在想来如果这套能实现的话,是不是也有点可视计算的味道了。
不过现实是这条路已经断掉了,一是我们team,尤其是我的前端能力还比较差,另一方面老板觉得这个太复杂,易用性比较差,不能让人家去管理树(内部介绍还停留在树一层),于是改成了与业务适配,拓扑定死的结构,只让用户配底层的聚合变量,内部称为counter。现在只希望能满足用户需求,能多多的建出变量来,这样才有往下一步走的可能。
以下基本是未实现,只是思考过的部分
insert into
varInfo select count(*),.... from inputVar.win:time_length(1
min)
就可以获得每个变量的运行时统计,直接导入ctrip的dashboard系统,就可以获得监控和展示能力,这样还能导入到实时报警应用里面去,获得实时报警能力(用户提的需求,所以说多让用户参与讨论是很有用的,不要把用户当傻瓜)。对XXX项目而言,这些统计信息甚至能作为业务指标使用,例如如果我需要公司某一业务线的访问量,只要配个变量+监控就ok了。没什么难度,但应该很有用其实为啥要这么折腾,除了人懒想写点一本万利的代码外,主要是从前公司那里产生了完整的生态系统才是最重要的这一个想法。所以力图往这个方向靠。
此图是我在paypal待了一年后最大的感受:别看技术烂,只要生态系统建成了,整个系统能有机的跑起来,就能源源不断的带来收益。
整个系统是一个循环往复的过程:
这这个一套系统才是完整的有机体,本文所述的内容都只是描述了其中最不重要的在线那一块,而建立整个体系和完善离线系统才是更重要的,这个是真正做一个完善有用的实时系统需要解决的。我自己还没做到,就不多说了
想说的都说了,没想到这么长。最后只想说句做实时计算真不容易,越做越觉得能力和经验上的不足益发明显了,已经不是简单搭个开源软件就能搞定的。希望后面能看到别人的做法,有机会能跟风。
什么,你居然看到结束了,请你喝酸梅汤。
如何快速做一个山寨的实时“大数据”处理,布布扣,bubuko.com
标签:des cWeb c style class blog
原文地址:http://www.cnblogs.com/lw02nju/p/3776922.html