标签:
1、storm事务性topology的提出
对于容错机制,Storm通过一个系统级别的组件acker,结合xor校验机制判断一个msg是否发送成功,进而spout可以重发该msg,保证一个msg在出错的情况下至少被重发一次。但是在一些事务性要求比较高的场景中,需要保障一次只有一次的语义,比如需要精确统计tuple的数量等等。Storm 0.7.0引入了Transactional Topology, 它可以保证每个tuple”被且仅被处理一次”, 这样你就可以实现一种非常准确,非常可扩展,并且高度容错方式来实现计数类应用。
2、API介绍
IBatchBolt有三个方法
execute(Tuple tuple)
finishBatch()
prepare (java.util.Map conf, TopologyContext context, BatchOutputCollector collector,T id)
ITransactionalSpout有以下几个主要方法:
ITransactionalSpout.Coordinator<T> getCoordinator(java.util.Map conf,
TopologyContext context)
ITransactionalSpout.Emitter<T> getEmitter(java.util.Map conf,
TopologyContext context)
3、事务机制原理分析
1) 对于一次只有一次的语义,从原理上来讲,需要在发送tuple的时候带上xid,在需要事务处理的时候,根据该xid是否以前已经处理成功来决定是否进行处理,当然需要把xid和处理结果一起做保存。并且需要保障顺序性,在当前请求xid提交前,所有比自己低xid请求都已经提交。
在事务处理时单个处理tuple效率比较低,因此storm中引入batch处理,一批tuple赋予一个xid,为了提高batch之间处理的并行度,storm采用了pipeline 处理的模型。参见下图pipeline模型,多个事务可以并行执行,但是commit的是严格按照顺序的。
对应到storm中的具体实现中,把一个batch的计算分成了两个阶段processing和commit阶段:
Processing阶段:多个batch可以并行计算,上面例子中bolt2是普通的batchbolt(实现BaseBatchBolt),那么多个batch在bolt2的task之间可以并行执行,比如对batch3和batch4并行执行execute或finishbatch(什么时候调用该操作,后面会介绍)方法。
Commiting阶段:batch之间强制按照顺序进行提交,上图中Bolt3实现BaseBatchBolt并且标记需要事务处理的(实现了ICommitter接口或者通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面),那么在Storm认为可以提交(至于什么时候可以提交,后面会介绍)batch的时候调用finishbatch,在finishBatch做xid的比较以及状态保存工作。例子中batch2必须等待batch1提交后,才可以进行提交。
Storm事务性的拓扑看起来比较复杂,需要对batch的commit进行管理,错误的发现,batch的发射以及处理等等,其内部实现完全基于storm的相关底层操作进行抽象。
当使用Transactional Topologies的时候, storm为你做下面这些事情:
事务性topology从实现上来讲,包括事务性的spout,以及事务性的bolt。
2) 事务性的spout需要实现ITransactionalSpout,这个接口包含两个内部类Coordinator和Emitter。在topology运行的时候,事务性的spout内部包含一个子的topology,类似下面这个结构:
其中coordinator是spout,emitter是bolt。
这里面有两种类型的tuple,一种是事务性的tuple,一种是真实batch中的tuple;
coordinator为事务性batch发射tuple,Emitter负责为每个batch实际发射tuple。
具体如下:
*****说明******
TransactionalTopology里发送的tuple都必须以TransactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。
TransactionAttempt包含两个值:一个transaction id,一个attempt id。transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的
,而且不管这个batch replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了,
我们可以把attempt id理解成replay-times, storm利用这个id来区别一个batch发射的tuple的不同版本
metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。
**************
3) 事务性的Bolt继承BaseTransactionalBolt,处理batch在一起的tuples,对于每一个tuple调用调用execute方法,而在整个batch处理(processing)完成的时候调用finishBatch方法。如果BatchBolt被标记成Committer,则只能在commit阶段调用finishBolt方法。一个batch的commit阶 段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。那么如何知道batch的processing完成了,也就是bolt是否接收处理了batch里面所有的tuple;在bolt内部,有一个CoordinatedBolt的模型。
CoordinateBolt具体原理如下:
CoordinateBolt具体原理如下:
事务性的拓扑在storm中的一个应用是Trident,它是在storm的原语和事务性的基础上做更高层次的抽象,做到一致性和恰好一次的语义。
标签:
原文地址:http://www.cnblogs.com/thinkpad/p/4702830.html