码迷,mamicode.com
首页 > 其他好文 > 详细

Storm批处理事务原理详解

时间:2015-12-26 13:15:32      阅读:331      评论:0      收藏:0      [点我收藏+]

标签:

1、事务-批处理

对于容错机制,Storm通过一个系统级别的组件acker,结合xor校验机制判断一个tuple是否发送成功,进而spout可以重发该tuple ,保证一个tuple在k\出错的情况下至少被重发一次。

但是在需要精确统计tuple的数量如销售金额场景时,希望每个tuple”被且仅被处理一次” 。Storm 0.7.0引入了Transactional Topology, 它可以保证每个tuple”被且仅被处理一次”, 这样我们就可以实现一种非常准确,且高度容错方式来实现计数类应用。

  

逐个处理单个tuple,增加很多开销,如写库、输出结果频率过高

事务处理单个tuple效率比较低,因此storm中引入batch处理,

批处理是一次性处理一批(batch)tuple,事务可确保该批次要么全部处理成功,如果有处理失败的则全部不计,Storm会对失败的批次重新发送,且确保每个batch被且仅被处理一次。

2、API介绍

技术分享

 

IBatchBolt有三个方法

execute(Tuple tuple)

finishBatch()   会在所有的tuple处理完成之后,对整个批次的结果进行处理,执行commit的时候执行该方法

prepare (java.util.Map conf, TopologyContext context, BatchOutputCollector collector,T id)

技术分享

 

ITransactionalSpout有以下几个主要方法:

ITransactionalSpout.Coordinator<T> getCoordinator(java.util.Map conf,

                                                 TopologyContext context)

ITransactionalSpout.Emitter<T> getEmitter(java.util.Map conf,

                                          TopologyContext context)

 

3.事务机制原理

1) 对于只处理一次的需要,从原理上讲,需要在发送tuple(单个或者一个批次)的时候带上相同的事务id(txid)

在处理的时候就会根据这个txid判断是否已经处理过了。处理过了就会把处理结果和txid保存起来,以便以后比对,而且需要保障顺序性,在当前请求txid提交前,所有比自己低的txid请求都需要提交。

    在事务批处理的时候,一批tuple赋予一个体txid,为了提高batch之间处理的并行度

Storm采用了pipline(管道)处理模型,这样多个事务可以并行执行,但是提交的时候是严格按照顺序的

 

 

2. Storm事务处理中,把一个batch的计算分成两个阶段,Processing和commit阶段

      Process阶段:多个batch可以并行计算

      Commiting 阶段:batch之间强制按照顺序进行提交

技术分享

技术分享

 

技术分享

 

Processing阶段:多个batch可以并行计算,上面例子中bolt2是普通的batchbolt(实现BaseBatchBolt),那么多个batch在bolt2的task之间可以并行执行,比如对batch3和batch4并行执行execute或finishbatch(什么时候调用该操作,后面会介绍)方法。

Commiting阶段:batch之间强制按照顺序进行提交,上图中Bolt3实现BaseBatchBolt并且标记需要事务处理的(实现了ICommitter接口或者通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面),那么在Storm认为可以提交(至于什么时候可以提交,后面会介绍)batch的时候调用finishbatch,在finishBatch做xid的比较以及状态保存工作。例子中batch2必须等待batch1提交后,才可以进行提交。

 

Storm事务性的拓扑看起来比较复杂,需要对batch的commit进行管理,错误的发现,batch的发射以及处理等等,其内部实现完全基于storm的相关底层操作进行抽象。

当使用Transactional Topologies的时候, storm为你做下面这些事情:

 

  • 管理状态: Storm把所有实现Transactional Topologies所必须的状态保存在zookeeper里面。 这包括当前transaction id以及定义每个batch的一些元数据。
  • 协调事务: Storm帮你管理所有事情, 以帮你决定在任何一个时间点是该proccessing还是该committing。
  • 错误检测: Storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的batch。你不需要自己手动做任何acking或者anchoring(emit发生的动作) — storm帮你搞定所有事情。
  • 内置的批处理API: Storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。Storm管理所有的协调工作,包括决定什么时候一个bolt接收到一个特定transaction的所有tuple。Storm同时也会自动清理每个transaction所产生的中间数据。
  • 最后,需要注意的一点是Transactional Topologies需要一个可以完全重发(replay)一个特定batch的消息的队列系统(Message Queue)。storm-contrib里面的storm-kafka实现了这个。

 

事务性topology从实现上来讲,包括事务性的spout,以及事务性的bolt。

2) 事务性的spout需要实现ITransactionalSpout,这个接口包含两个内部类Coordinator和Emitter。在topology运行的时候,事务性的spout内部包含一个子的topology,类似下面这个结构:

技术分享

 

 

Interface ITransactionalSpout.Coordinator<X>

Method Summary
 void close() 
 X initializeTransaction(java.math.BigInteger txid, X prevMetadata)   初始化启动事务,prevMetadata:元数据
 boolean isReady()   返回时true,可以继续启动下一个事务

 其中coordinator是spout,emitter是bolt。

这里面有两种类型的tuple,一种是事务性的tuple,一种是真实batch中的tuple;

coordinator为事务性batch发射tuple,Emitter负责为每个batch实际发射tuple。

具体如下:

  • coordinator只有一个,emitter根据并行度可以有多个实例
  • emitter以all grouping(广播)的方式订阅coordinator的”batch emit”流
  • coordinator (其实是是一个内部的spout)开启一个事务准备发射一个batch时候,进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到”batch emit”流

        *****说明******

  

       TransactionalTopology里发送的tuple都必须以TransactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。

       TransactionAttempt包含两个值:一个transaction id,一个attempt id。transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的

       ,而且不管这个batch    replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了,

       我们可以把attempt id理解成replay-times, storm利用这个id来区别一个batch发射的tuple的不同版本

        metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。

 

       **************

  • Emiter接收到这个tuple后,会进行batch tuple的发送
  • Storm通过anchoring/acking机制来检测事务是否已经完成了processing 阶段;
  • Processing阶段完成后,并且之前的transactions都已经提交了,coordinator发射一个tuble到” commit”流,进入commit阶段。
  • commiting bolts通过all grouping方式订阅该”commit”流,事务提交后,coordinator同样通过anchoring/acking机制确认已经完成了commit阶段,接收到ack后,在zookeeper上把该transaction标记为完成。

  coordinator只有一个,emitter根据并行度可以有多个实例

  事务内部处理流程图

技术分享

  

 

 

 

3) 事务性的Bolt继承BaseTransactionalBolt,

处理batch在一起的tuples,对于每一个tuple调用调用execute方法,而在整个batch处理(processing)完成的时候调用finishBatch方法。如果BatchBolt被标记成Committer,则只能在commit阶段调用finishBolt方法。一个batch的commit阶 段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。那么如何知道batch的processing完成了,也就是bolt是否接收处理了batch里面所有的tuple;在bolt内部,有一个CoordinatedBolt的模型。

 

CoordinateBolt具体原理如下:

技术分享

 

CoordinateBolt具体原理如下:

 

  • 真正执行计算的bolt外面封装了一个CoordinateBolt。真正执行任务的bolt我们称为real bolt。
  • 每个CoordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我要给哪些tuple发送信息(同样根据groping信息)
  •  Real bolt发出一个tuple后,其外层的CoordinateBolt会记录下这个tuple发送给哪个task了。
  • 等所有的tuple都发送完了之后,CoordinateBolt通过另外一个特殊的stream以emitDirect的方式告诉所有它发送过 tuple的task,它发送了多少tuple给这个task。下游task会将这个数字和自己已经接收到的tuple数量做对比,如果相等,则说明处理 完了所有的tuple。
  • 下游CoordinateBolt会重复上面的步骤,通知其下游。

事务性的拓扑在storm中的一个应用是Trident,它是在storm的原语和事务性的基础上做更高层次的抽象,做到一致性和恰好一次的语义,后续章节会对trident做分析。

Storm批处理事务原理详解

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5077799.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!