标签:
业务的日志ETL拉取框架一直存在很多问题,每次出现故障就导致手忙大乱,因此这次决心要对其进行大改造。这个ETL系统是基于Storm实现的,主要是依靠Spout拉取原始日志,Bolt进行处理再入库,为了提高吞吐量,采用了12个Bolt进行并行处理。旧算法由于没有使用Storm的ack特性,而且还是根据发送Tuple的hash值发送到对应的Bolt中,完全没有考虑好负载均衡问题,在崩溃重启后也需要重新处理当天日志,可用性极低。
为了能够达到bolt的负载平衡,想到了采用动态平衡的算法去适应,另外也需要提供一个可靠的checkpoint机制,能够使崩溃重启后从当前时间点继续处理。
这个动态平衡的算法,可以参考TCP的流控制算法。TCP是可靠报文传输协议,具有可靠性,高效性,能够避免网络拥堵等特点。这里列出几个主要的TCP流控制算法的特点:
当然TCP远远不止以上6点的特点,这里只列出部分的对于新算法有参考性的特点。具体可以参考这里以及IETF的标准规范rfc2001。
由于ETL系统是依赖Storm进行开发,因此有一定的限制。不过可以根据Storm的特性类比TCP算法。Spout,也就是发送端,每个发送单元被称为Tuple,可以类比为TCP的数据报。Bolt接收端收到Tuple后,根据需要ack或者fail掉这个tuple,然后Spout会重发fail的tuple,同样地,Storm内部会探测到超时没响应的Tuple,然后发送fail到Spout。
虽然Storm拥有这些发送特性,但Storm除了ack和fail以外,没办法发送更多的反馈信息到Spout处,特别是Bolt的接收窗口大小(也就是缓存队列剩下的容量),这样Spout就没法根据Bolt进行发送窗口调整。另外Bolt每次也只能够ack或者fail对应的Tuple,不能使用cumulative ack算法。另外Spout和Bolt的关系,目前是一对多,也就是一个发送端发送到多个接收端进行并行处理,因此负载平衡显得尤为重要。
对于这个新算法,希望达到的目标是高吞吐量,高可用性以及简单性。高吞吐量是指多个Bolt的并发处理量,为了提高吞吐量,关键是实现动态的负载平衡。对于每个Bolt,发送窗口越大并不一定是越好,因为即使发送过去,Bolt也未必能够马上处理,相反,应该是把更多的Tuple发送到处理更快的Bolt上去,因此最恰当的做法是根据Bolt的处理速度,也就是ack的往返时间来衡量发送窗口大小。对于高可用性,要达到joiner崩溃后,重新启动后能够崩溃前的处理点重新处理,这就需要一种类似checkpoint的机制。要达到这个要求,并且保证简单性,可以通过保存已经ack的rt值。但由于Bolt只能够做到ack或者fail接收到的tuple,因此必须在Spout端统计采用累计的算法统计rt值,也就是如果ack了1,3,4的Tuple,只能记录第1个tuple的rt值,如果再收到ack 2的tuple,才记录第4个Tuple的rt值。简单性是为了避免算法和实际业务的代码的耦合,可以采用类似消费——生产者模式。
在rfc2001规范中,指出的TCP的拥塞处理和避免算法采用的是一种启发式的动态流控制算法。由于网络拥堵没办法可以使用数字精确测定,但可以简单地根据接收端的ack情况和丢包情况进行判断拥堵情况,然后线性或者指数式增加/减小发送窗口大小,从而避免增加网络拥堵压力。基于此算法,结合joiner的目标,可以延伸出一种类似的自适应动态流控制算法:
Spout把多条数据库记录组合成一个Tuple进行发送,这样可以减轻ack/fail时的压力,提高发送效率
Spout针对不同的Bolt拥有不同的发送窗口,每个发送窗口都会根据情况动态地增加或减小,并且都会记录Tuple的发送时间
Spout收到Bolt的ack时,根据之前记录的发送时间计算处理时间值,如果处于正常范围,则为NORMAL状态,线性增加发送窗口大小,如果超出阈值,则转为SLOW状态,线性减小发送窗口大小,另外Spout通过累积的ack计算出需要记录的tuple的rt值。
当Spout收到fail,也就是Storm判断Tuple处理时间超时(目前不允许Bolt主动发送fail),则转为BUSY状态,马上减小1个发送窗口大小,并把Tuple重新放回发送缓冲区,等待发送到下一个Bolt,减轻当前BUSY的Bolt的压力。
这个新算法相比较TCP的拥堵控制算法,减小了一些特性,如Slow Start、Fast Retransmit等,有些是由于没有必要(Slow Start是避免一开始就遇到拥堵,但Bolt的初始化状态肯定为空闲),有些则是为了适用多个接收端的情况,(和Fast Retransmit相比较起来,发送给下一个空闲的Bolt更有优势),另外,新算法没有了ssthresh,慢开始阈值,用以区分指数增加发送窗口和线性增加发送窗口的阈值,因为指数增加发送窗口的时候很容易造成SLOW状态,另外盲目地增加发送窗口大小也并不能提高吞吐量,因此只选择线性增加。还有的就是Bolt并没有强调Tuple的处理顺序性,也就是只要收到ack,对应的Tuple就从发送窗口移出,就可以发送一个新的Tuple,这和TCP的发送窗口的必须等待处理完毕才能右移有区别。
标签:
原文地址:http://blog.csdn.net/pun_c/article/details/51338057