码迷,mamicode.com
首页 > 编程语言 > 详细

Disruptor——一种可替代有界队列完成并发线程间数据交换的高性能解决方案

时间:2017-06-20 00:20:39      阅读:331      评论:0      收藏:0      [点我收藏+]

标签:imp   nload   获取   预取   失效   use   队列的实现   如何   batch   

   本文翻译自LMAX关于Disruptor的论文,同时加上一些自己的理解和标注。Disruptor是一个高效的线程间交换数据的基础组件,它使用栅栏(barrier)+序号(Sequencing)机制协调生产者与消费者,从而避免使用锁和CAS,同时还组合使用预分配内存机制、缓存行机制(cache line)、批处理效应(batch effect)来达到高吞吐量和低时延的目标。目前Disruptor版本已经迭代至3.0,本论文是基于Disruptor1.0写就,在新版本中,相对与1.0版本,其核心设计思想没有变,只是实现细节有所调整和优化,因此,此论文仍然很有研读意义。

  Disruptor论文原文地址:http://disruptor.googlecode.com/files/Disruptor-1.0.pdf

  Disruptor github地址:http://lmax-exchange.github.io/disruptor/

  【翻译过程中,参考了“yfx416的博客”,部分生动的注解直接摘抄自他的blog,已经特别标记出来,他的blog文章链接点击此处。如有侵犯,请及时告知。】

 

*****************************************以下为对Disruptor论文的翻译*****************************************************

摘要

  Lmax计划创建一个高性能的财务交易系统。作为我们工作的一部分,我们评估了多种方案去设计这个系统以求达到高性能目标,最后我们发现在传统的解决方案中我们遇到了基础上的瓶颈。

  许多应用程序通过队列来在不同的处理阶段之间交换数据。我们的性能测试显示,如果按照传统的方式来使用队列,延时代价的量级和磁盘IO操作的延时量级是同一个量级-非常慢。如果在端到端的操作中采用多个队列,又会增加了几百个毫秒的额外开销【注:用于协调这些队列】。这里有很大的优化空间。

  通过进一步研究和调研计算机科学,我们意识到传统方案(如:队列和处理节点)耦合了多个关注点,从而带来了多线程之间的资源争抢,这暗示我们肯定别有妙方。

  结合现代CPU的工作原理,也就是我们常说的“机制共鸣”(mechanical sympathy)【注:意为参考现代cpu的设计思想,顺应底层设计思路,来做上层的应用设计,以便使用底层设计的优势,从而达到一个最佳的设计结果,得到一个‘共鸣’。】,通过隔离关注,我们提出了一个数据结构和基于该数据结构的模式,这就是disruptor。

  测试结果显示,对于一个三阶段的任务管道,Disruptor的平均延时的数量级要小于基于传统队列的方法三个数量级。另外disruptor的吞吐量是传统方法的8倍。

  这些性能改进也意味着对于并发编程我们前进了一大步。对于高吞吐量和低延时的异步事件处理系统,这种新的模式是一个非常理想的基础组件。

  在Lmax中,我们已经建立起一个订单匹配引擎,实时风险管理系统,以及一个高可用性的内存事务处理系统,这些系统都是基于disruptor。这些系统的性能都是很牛的。

  Disruptor不只是专门为财务行业设计的,它具有相当的通用性,他能够解决并发编程中的一个复杂问题:如何最大化性能。这个系统的实现非常简单,尽管这里面的有些概念不是那么直观,但相比于其他机制,基于这种模式的系统往往更加简单。

  相比于其他方法,disruptor的写竞争比较少,并发开销更低,而且更加缓存友好,吞吐量更高,延时抖动更低。对于一个普通时钟频率的处理器,disruptor每秒处理的消息量为2500万,延时低于50纳秒。这个性能指标已经接近于现代处理器在多核之间交换数据的上限。

概述

  Disruptor是Lmax开发的世界上最快的财务交易系统的产物。早期的设计思路主要借鉴SEDA和Actors的实现,希望使用pipeline来提升吞吐量。通过测试各种实现,我们发现管道(pipeline)在不同s阶段(stage)之间,事件排队是性能的主要杀手。我们发现队列带入了剧烈的延时抖动。我们为了达到更好的性能于是花了很多精力来开发一个新的队列实现。然而最终发现队列有其局限性——耦合了生产者、消费者、数据存储等多个关注点。Disruptor的实现很好地隔离上述关注点。

并发的复杂性

  本文遵循计算机科学的通用定义:并发不仅是说有两个或者多个任务同时执行,还意味着对资源的竞争访问。这些竞争的资源可能是数据库,文件,socket或者内存中的某个地址。

  代码的并发执行主要有两个方面:互斥和变化的可见性。互斥主要用来管理对某些资源的竞争更新。变化的可见性主要是用来控制什么时候这些变化对其他线程可见。如果你能够在应用层面上限制并发更新那么你就有可能避免互斥。比如,如果你的算法能够确保任何资源只会被一个线程更新,那么互斥就是不必要的。读或者写要求所有的变化对其他线程可见,但实际上只有竞争写操作才真正需要互斥。

  并发环境中最耗费时间的操作其实就是并发写操作。多线程对同一个资源的写需要复杂昂贵的协调,通常会通过某种锁来实现资源协调。

锁的代价

  锁提供了互斥,并能够确保变化能够以一个确定的顺序让其它的线程看见。锁其实是很昂贵的,因为他们在竞争的时候需要进行仲裁。这个仲裁会涉及到操作系统的上下文切换,操作系统会挂起所有在等待这把锁的线程,直到锁持有者释放该锁。上下文切换期间,执行线程会丧失对操作系统的控制,导致执行线程的执行上下文丢失之前缓存的数据和指令集,这会给现代处理器带来严重的性能损耗。当然效率更高的用户态锁是另一种选择,但用户锁只有在没有竞争的时候才真正会带来益处。【注:因为用户态的锁往往是通过自旋锁来实现(或者带休眠的自旋锁),而自旋在竞争激烈的时候开销是很大的(一直在消耗CPU资源)。】

  为了探究影响到底有多大,我们写了一个程序,这个程序很简单,就是调用一个循环5亿次递增操作的函数。这个java函数在单线程,2.4G Intel Westmere EP的CPU上只需要300ms。

  一旦引入锁,即使没有发生竞争,程序的执行时间也会发生显著的增加。实验结果如下:

Method

Time (ms)

Single thread

300

Single thread with lock

10,000

Two threads with lock

224,000

Single thread with CAS

5,700

Two threads with CAS

30,000

Single thread with volatile write

4,700

CAS的代价

  除了锁之外,另外一种方法是CAS。CAS依赖于处理器的支持,当然大部分现代处理器都支持。CAS相对于锁是非常高效的,因为它不需要涉及内核上下文切换进行仲裁。但cas并不是免费的,处理器需要对指令pipeline加锁以确保原子性,并且会用到内存栅栏【注:内存栅栏的实现细节还不清楚,后续加强理解。】以确保对其他线程的可见性。JAVA中的-java.util.concurrent.Automaic*类用到了CAS操作。

  CAS的一个问题就是太复杂了,本来用锁进行并发编程就已经很头疼了,用CAS来实现复杂逻辑就更头痛了。

  最理想的算法就是只有一个线程来负责对单个资源的所有写,而其它所有的线程都是读结果。在多核环境中读取结果会要求memory barrier来使得变化对运行在另外的CPU核心上的线程可见。

内存栅栏(Memory Barriers)

  现代处理器为了获得更高的性能会做指令重排,在内存和执行单元中,指令执行、数据的加载和存储都会被进行指令重排。处理器只需要确保程序逻辑能得到相同的结果,它不会关心指令的执行顺序。对于单线程程序,指令重排不会有问题,但对于共享状态的多线程程序而言,内存有序变化就变得非常重要。处理器用内存栅栏来标识对内存更新顺序敏感的代码片段,它们确保确保指令的硬件执行顺序和内存变化在线程间的可见性。编译器可以在代码的合适位置放置额外的软件栅栏来确保被编译代码的执行顺序,这些软件栅栏是附加在处理器自身的硬件栅栏之上的。

  现代CPU相比内存系统来讲速度是非常快的。为了桥接其各个CPU,现代处理器使用了复杂的缓存系统,这些缓存实际上是一些高效的独立的硬件哈希表。不同CPU之间的缓存是通过消息传输协议来保证一致性。另外,处理器还会使用“存储缓冲区”缓解对缓存的写压力,使用“失效队列”确保——在写操作发生时,缓存一致性协议能快速知道失效消息【注:实现细节不是很了解】

  对于此种实现方式,最近写入的数据可能处于任何存储中:在寄存器里,在存储缓冲区中,在各级缓存中,在主存中。如果多个线程要共享这个值,那么这个值必须要按照一定的顺序对其它线程可见,这种可见性是通过交换缓存一致性消息来协调完成的。内存栅栏可以控制这些消息的适时产生。

  读内存栅栏(a read barrier)确保CPU上的加载指令有序,当缓存发生变化时,读内存栅栏会在失效队列上标记一个点。读内存栅栏标记点之前的写操作可以通过内存栅栏提供一致性视图。【注:读内存栅栏会在“失效队列”中标记一个节点,这个节点意味着read barrier之前读取的所有数据都已不可靠(这也就告诉我们可能有变化发生),之后的所有read操作都需要重新从内存中加载,因此之后的操作从而能够看到数据的最新变化,barrier之前的所有线程的写指令和barrier之后的读指令就有了一个先后顺序。Read memory barrier使得排在read memory barrier之前的写操作成为一个整体,在这个整体内部写操作的顺序是不确定的,但这个整体形成了一个完整的视图,整个整体的结果对后面的操作是可见。】

  写内存栅栏(a Write barrier)用来确保CPU的存储指令有序,写内存栅栏会在存储缓冲区(store buffer)中标记一个点,这个标记点之前的数据变化(write操作)会通过缓存flush到主存。写内存栅栏标记点之前发生的存储操作可以通过写内存栅栏提供有序性视图。

  读写(a full barrier)能同时确保加载/存储操作的有序性。

  【注:综上所述,实际上read barrier意味着read操作不能穿越这个read barrier,write barrier意味着write操作不能穿越这个write barrier。执行read barrier的CPU是多线程中的消费者的角色,它通过read barrier能够尽快看到生产者线程的执行结果。而执行write barrier的CPU往往充当生产者的角色,它通过write barrier把自己执行的结果尽可能快得让其它线程看见。】

  【注:内存栅栏的实现原理?】

  一些CPU在上述三个基础栅栏基础上引入了很多变化,但是通过这三个基础栅栏足以理解内存栅栏期望解决问题的复杂性。在Java内存模型中,对volatile域的读取和写入实际上就是对应的read barrier和write barrier。Java内存模型规范对此有明确的定义。

缓存行(cache line)

  现代处理器中缓存的使用对高性能操作意义非凡。处理器将数据和指令持有在缓存中以达到高效处理的目的,对应的,如果数据和指令在在缓存中丢死,则会变得无效。

  硬件操作缓存并不是以字节或字为单位,为了效率考虑,缓存通常以缓存行(cache line)的形式进行组织,缓存行通常有32-256字节,最常见的是64字节。缓存行也是缓存一致性协议操作的最小粒度。这就意味着:如果两个变量不幸在同一个缓存行里,而且它们分别由不同的线程写入,那么这两个变量的写入会发生竞争,就好像多线程在竞争写入同一个变量一样。这种现象被称之为“伪共享”(false sharing)。出于高性能的考虑,需要确保独立但被并发写入的变量之间不会共享同一个缓存行,以求将资源竞争降到最低。

  可预测式的CPU访问主存【注:这里的memory译为主存,以便和缓存(cache)区分开来】时,通常会预测接下来将被访问到的主存内容并在后台将它加载到缓存中,从而将主存访问产生的时延降至最低。主存预读取发生的前提是:处理器能够检测到主存访问的模式/规律,主存预读取就像是以一个可预测的‘步幅’在主存中行走。比如说:当对一个数组的内容进行迭代时,‘步幅’是可预测的,这样主存中的内容就能预读取到缓存行中,最大化访问主存的效率。在处理器能感知到的任何方向中,‘步幅’通常要小于2048字节。然而,像链表(linked lists)和树(trees)这样的数据结构在主存中拥有分布广泛的节点,从而没有可预测的访问‘步幅’【注:由于链表或者树各个节点之间分布并不是顺序的,相邻节点的存储地址相隔很远,所以处理器找不到对应的读取规律,无法进行预读取。】,由于主存中缺乏一致的模式限制了系统预取缓存行的能力,导致主存访问的效率可能低了2个数量级。

队列的问题

  典型的队列要么使用链表要么使用数组作为底层的数据存储。如果一个内存队列是无界的,很大情况,他会变得不可控直到消耗大量内存导致严重错误。这通常发生在生产者处理能力好于消费者的场景下。无界队列在生产者能保证不超过消费者消费能力的情况下非常有用,但是仍然有担保失效的风险。为了避免灾难发生,队列通常是有界的,为了确保队列有界就要求它是数组形式或者他的大小是被跟踪的。

  常见的队列形式下,在队列的头(消费者要频繁使用),尾(生产者要频繁使用)以及长度(生产者和消费者都需要频繁更改)会产生资源竞争。当队列在使用时,在大多数情况下队列要么是几乎是满的,要么几乎是空的,这是由于生产者和消费者不同的工作节奏造成。很少能取得平衡以使消费者和生产者的工作节奏完美匹配。几乎为满或者几乎为空的队列会导致严重的资源竞争和为了维护缓存一致性而带来的高昂消耗。【注:以一个几乎为空队列为例,大量的消费者被阻塞,在可消费的entity上产生资源竞争,同时缓存对应的内容迅速变化,导致各个cpu之间维护缓存一致性的成本急剧上升】。即便为队列的头和尾使用不同的同步对象(比如:锁或者CAS变量)也同样存在问题,因为,这些不同的同步对象对应的主存内容通常位于同一个缓存行中,对这些同步对象的频繁操作也会带来资源竞争【注:这里不太确定位于同一个缓存行的是同步对象还是队列的头和尾对象】

  相对于在队列上加单个大粒度(large-grain)的锁,管理分散(生产者关注队列头,消费者关注队列尾和对头尾之间的存储节点的关注)。在put和take上添加单个大粒度锁的使用时非常简单的,但是对吞吐量影响实在太大。如果想要很好得解决队列场景下的并发问题,那么队列的实现就必然很复杂,当然如果你是单生产者-单消费者的场景,那么也许队列实现没有必要那么复杂。  

  Java里使用queue还有一个更大的问题:queue会成为很大的垃圾回收源。首先,队列中的对象需要被分配和替换,再者,当链表重新寻址,对象需要重新分配。当队列中的对象不再被引用时,所有这些对象都需要被回收。

管道和图(Pipelines and Graphs)

  很多问题场景下,需要将多个处理阶段绑定在一起组成管道,这个管道通常以并行地方式组织成图的拓扑结构。各个阶段之间通常使用队列来连接,同时每个阶段会有自己的处理线程。

  这种处理方式并不便宜——每个阶段都会有入队和出队的开销,当路径必须分叉(fork)时,有多少个目标消费者,就会增加多少倍的成本【注:到每个消费者都会有入队操作】;同时,分叉之后还需要合并,这时候会因为不可避免的资源竞争产生额外的成本。

  【注:举个例子(这个例子来自于yfx416的译文,非常浅显易懂,因此贴在下面)

技术分享

  上面图讲的是如何组装一台小汽车,要组装一台汽车,我们第一阶段要有个底座,接着装引擎,装驾驶员座椅,装乘客座椅,装后座座椅等等等等,最后是装四个轮子。这里每个圆圈都代表一个stage。箭头代表依赖关系(或者路径),这很容易理解,只有四个门和引擎盖都装好了(body complete)才能进行喷漆工作。每个箭头都意味着一个队列,箭头两端代表着生产者和消费者。每个消费者和消费者都意味着一个线程。比如对于paint而言,它拥有五个消费者线程来处理队列。

  这么做看起来很朴素,但其实Actor和SDEA等大名鼎鼎的架构都是源自类似的朴素简陋的思想。这么做固然好,但也是有代价的,让我们来分析下代价。首先有队列就意味着有出队和入队的操作,这就意味着开销。对于chassis stage而言,它作为生产者,它的开销 =  路径数 * 入队开销(对于chassis stage,同样的消息我需要重复传播4次到4个不同的目的stage,浪费啊...)。对于paint stage而言,它有五个消费者线程来处理不同的队列,这五个消费者线程互相竞争CPU,内存以及其他资源(比如为了给paint stage准备数据,大家一起往一个对象里填充数据),如果线程数目不是5个,而是500个,这个压力还是很大的。通过分析我们看到,由于各个stage在业务中的作用不同,造成了压力分布的不均衡,比如bonnet stage就比较闲,paint stage就比较忙。而且对于类似chassis这样的stage,同样的消息要重复插入多次,带来了开销浪费。不均衡和浪费正是我们感觉不爽的地方。

  能改进么?可以。同样是这样的依赖关系图,如果我们能把各个stage之间联系的队列去掉,用一种统一的数据结构来代替,这样就避免了重复,这该多么理想。而且基于统一的数据结构,如果我们能够用统一的可控的线程组来控制,就能使得压力比较均匀,这不也是我们求之不得么?实际上disruptor的思想就是来自于此。】

LMAX Disruptor的设计

  为了解决上面提出的问题,该设计严格地实现了关注分离(separation of the concerns)。该设计确保任何数据只被一个线程进行写访问,从而避免写冲突。这个设计就是Disruptor,之所以叫这个名字,是因为它和java7中出现的“Phasers”有很多相似之处。 

  Disruptor的设计初衷就是为了解决上面提到的问题【注:锁代价过大、CAS代价过大、缓存行导致的伪共享、队列存在的问题】,以求最优化内存分配,使用缓存友好的方式来最佳使用现代硬件资源。

  Disruptor的核心机制在于:以RingBuffer的形式预分配有界的数据结构,单个或者多个生产者可以向RingBuffer中写入数据,单个或者多个消费者可以从RingBuffer读取数据。

内存分配

  Ring buffer的内存是在启动时预先分配的。Ring buffer要么是一个引用数组,每个元素是指向对象的引用【注:针对C#, java这样的语言】,要么是一个结构数组,每个元素(entry)代表的就是对象实体【注:针对C,C++这样的语言】。由于java语言的限制,Java实现的Ring buffer实际上只能是一个引用数组。每个元素只能是一个数据的容器,而不是数据本身。这种预先分配的策略就能够避免Java内存回收引起的一些性能问题,因为这些元素对象(enries)在能够在Disruptor实例中的整个生命周期存活和被复用【注:这些enties一直被RingBuffer对象持有,而RingBuffer实例对象又被Disruptor持有,故只要Disruptor存在,则这些enties便不会被GC掉】。由于这些对象是在开始阶段同时分配的,很大程度上被连续分布在主存中,即使不连续它们在内存中得间隔也有可能是固定的,从而支持缓存步幅,非常有利于缓存的数据预取【注:见前文中‘缓存行’中的描述】。John Rose提出了一个草案希望未来JAVA能够支持所谓的“value type”,就像C语言那样,如果这样的话,disruptor就能够确保对象在内存中一定是连续的,而不仅仅只是有很大可能性了。

  对于JAVA这样的运行环境来讲,垃圾回收对于低延时系统是一个严重的挑战。内存越大,垃圾回收造成的性能压力也就越大。垃圾回收喜欢的对象要么寿命非常短,要么对象干脆是不死的。Ring buffer的预先分配使得我们的对象变成了不死的对象,这是大大减轻垃圾回收的压力。

  在高负载的情况下,基于队列的系统可以back up【注:不清楚这里的back up是什么意思,故使用原文表示】,这会降低处理效率,进而导致被分配的对象比原本存活得更长,在使用分代垃圾收集器的vm中,这些对象将会从年轻代进入老年代。这就意味着:

  • 1) 这些对象被不断地在各代之间【注:年轻代eden gen 和 suviving erea,以及young gen和old gen之间】进行拷贝,造成延时波动;
  • 2) 进入老年代的对象回收成本会更高,GC带来的内存碎片也会更多,引起‘stop the world’的概率会更大;越多的临时对象进入到老年代,也更可能带来性能上的损耗。

  【注:如果对象是永生不死的,1)的代价无法避免,但2)的代价就会大大降低,因为这些对象一直会存在,那么老年代触发major collection时它扫描一下所有对象,发现这些对象都无法释放,什么都不用干,那就直接收工。由于对象不会被释放,那也就不会有碎片,那么stop the world的概率也就大大降低了。永生对象在2)中所做的只不过是进行一次扫描,这个代价非常小。】

   【注:下面通过分析ArrayBlockingQueue和RingBuffer来加深Disruptor内存预分配所带来的好处

  在ArrayBlockingQueue中,数组Object[] items负责存储队列中的所有元素,如下图所示,当消费者消费完items[0]元素,紧接着生产者向items[0]放入新的元素entity1,这时候items[0]存储的是对象entity1的引用,items[0]到entity0对象的引用被切断,entity0等待被GC。生产者不断地向items[0]中写入消息,则老的entity将不断地需要被GC,一旦队列阻塞,items可能熬过多次minor GC,幸存下来,并进入到老年代,带来更严重的性能隐患。

技术分享

  再来看看RingBuffer预分配内存方式的精妙之处。RingBuffer同样使用数组Object[] entries作为存储元素,如下图所示,初始化RingBuffer时,会将所有的entries的每个元素指定为特定的Event,这时候event中的detail属性是null;后面生产者向RingBuffer中写入消息时,RingBuffer不是直接将enties[7]指向其他的event对象,而是先获取event对象,然后更改event对象的detail属性;消费者在消费时,也是从RingBuffer中读取出event,然后取出其detail属性。可以看出,生产/消费过程中,RingBuffer的entities[7]元素并未发生任何变化,未产生临时对象,entities及其元素对象一直存活,知道RingBuffer消亡。故而可以最小化GC的频率,提升性能。

技术分享注:图中对象Entry写错,应当为Event。


  思考一个问题:如果RingBuffer的enties[7]指向的event对象中含有对象属性,会不会有queue类似的GC问题呢?】

 隔离关注(Teasing Apart the Concerns

  如下几个分开的关注点是所有队列实现方式都需要综合实现的,这些也是为队列实现定义的接口:

  •   1) 队列元素的存储;
  •   2) 队列协调生产者声明下一个需要交换的队列元素的序号;
  •   3) 队列协调并告知消费者等待的元素已经就绪。

  在使用带有垃圾回收特性语言来设计金融用的交换机(exchange)时,过多的内存分配会带来麻烦,所以,我们基于链表的队列不是一个好的解决方案。如果用于存储各个阶段交换数据的节点(entries)可以被预先分配内存,那么垃圾回收可以被最小化;更进一步地,如果节点被统一分配为相同大小的块,那么在遍历节点时,将会以一种缓存友好的方式进行,效率会更高。预分配内存的数组满足上述的要求,在创建RingBuffer时,DIsruptor使用抽象工厂模式预分配了所有节点,当一个节点被声明时,生产者只需要将它的数据拷贝到这个预分配的数据空间中即可。【注:可参考上节中的注解分析。】

  对于现代处理器而言,取余操作是一种比较昂贵的操作。但在RingBuffer中取余是一个使用频率很高的操作,因为需要计算某一个序号在RingBuffer中的位置需要用到取余。一个替代的方法是将RingBuffer的长度设置为2的幂,这样通过简单的位操作就可以获取余数。  

  我们前面提到,有界队列会在队列头和队列尾形成激烈的竞争。但是RingBuffer使用的数据结构则没有这种竞争和并发源于,因为RingBuffer将这些竞争的焦点(concerns)转移到了生产者/消费者栅栏(barriers)上去,接下来我们将详细阐述这一逻辑。

  Disruptor的典型应用场景通常只有一个生产者,典型的生产者是文件读取或者网络侦听。如果只有一个生产者,那么队列元素分配或者序号分配不会存在竞争。

  但在一些特别的场景,Disruptor会有多个生产者,这种情况下生产者们可能会彼此竞争来获取RingBuffer中下一个可用的位置,这里的竞争问题可以通过CAS操作来处理。

  当生产者将相关的数据拷贝到RingBuffer的位置(entry)中后,生产者提交这个序号,告知消费者这个位置的数据可以消费了。这里可以不使用CAS操作,而是用简单地自旋直到等待的其他生产者都到达了这个序号便可提交。【注:RingBuffer通过一个cursor来告知消费者当前那些位置可以被消费,多个生产者时,需要确保cursor游标之前的seq都已经提交,故而这里需要协调各个生产者】。为了避免覆盖情况发生【注:覆盖是指生产者熟读快于消费者速度,导致生产者写入消费者还未来得及消费的位置】,生产者在写入前会检查所有消费者最小的seq,确保写入的seq不会大于这个最小的消费者seq。

  消费者在读取元素之前需要等待一个序号,该序号指示了有效的可以读取的元素。怎么样等待这个序号,有很多种策略。如果CPU资源比较宝贵,那么消费者可以等待某一个锁的条件变量,由生产者来唤醒消费者。这种方式明显会带来竞争,只适用于CPU资源的稀缺性比系统的延时/吞吐量重要的场景。另外一种策略是所有消费者线程循环检查游标(cursor),该游标表示RingBuffer中当前有效的可供读取的元素的位置,这种策略使用更多消耗CPU资源来换取低时延。这种方法由于没有用锁和条件变量,因此打破了生产者和消费者之间的竞争依赖关系。如果你想支持多生产者-多消费者的场景,你就不得不采用很多CAS操作,这些CAS操作作用在头,尾,队列长度等等,这就带来了复杂性。但disruptor避免了这种复杂的CAS竞争。

序列化(Sequencing)

  顺序化是disruptor管理并发的核心概念。每个生产者和消费者都维护自己的序号,这个序号用来和RingBuffer交互。当一个生产者希望在RingBuffer中添加一个元素时,它首先要做的是声明一个序号【注:这个序号被称之为生产者的声明序号,该序号用来指示下一个空闲的slot的位置,一旦序号被声明,那么该序号就不会被其它生产者重复操作,生产者就可以操作该声明序号的slot数据】。单生产者的场景下,这个声明序号可以是一个简单的整数;多生产者的场景,这个序号必须是一个支持CAS的原子变量。当一个生产者序号被声明,这个序号对应的位置就可以被声明该序号的生产者写入了;当生产者完成更新元素后,它就通过更新一个单独的序号来提交变化,这个单独的序号是一个游标(cursor),用来指示消费者可以消费的最新的元素。生产者可以通过一种自旋的方式来读取和更新RingBuffer的游标,这里只需要使用内存栅栏而不需要使用CAS操作,如下:

        long expectedSequence = claimedSequence – 1;
        while (cursor != expectedSequence)  //this is a memory Barrier
        {
            // busy spin
        }

        cursor = claimedSequence;

  消费者等待指定的消费者序号变得可用,它通过内存栅栏(memory barrier)来读取游标(cursor),一旦游标的值被更新,内存栅栏会确保RingBuffer中的这一变化会被所有在游标上等待的消费者可见。【注:通过这种机制,消费者能够及时知道生产者提交了新的消息,并尝试进行消费】

  每个消费者都各自维护一个序号来表示自己最新消费的位置序号。生产者通过跟踪这些序号来确保不会覆盖消费者还未来得及消费的位置。同时这些序号也可以用于协调消费者之间的执行顺序【注:Disruptor中的多消费者实际上是指对于同一个event,有多个处理阶段,每个阶段被认为是一个独立的消费者,各个阶段的执行通常是有顺序要求的】

  单生产者场景下,不管消费者有多么复杂的依赖,Disruptor都无需使用锁和CAS操作,它通过多个序号(Sequences)上的内存栅栏就可以协调整个并发场景。【注:Disruptor中有两类Sequence——生产者序号(又叫cursor)和消费者序号,通过在这两个序号上建立内存栅栏,达到协调并发的目的】

批量效应(Batching Effect)

  当消费者等待RingBuffer中可用的前进游标序号时,如果消费者发现RingBuffer游标自上次检查以来已经前进了多个序号,消费者可以直接处理所有可用的多个序号,而不用引入并发机制, 这样滞后的消费者能够迅速跟上生产者的步伐,从而平衡系统,这一特性是队列(queues)不具有的。这种类型的批处理增加了吞吐量,同时减少和平滑了延迟。 根据我们的观察结果,无论负载如何,时延都会维持在一个常数时间值上,直到存储子系统饱和,然后根据小定律(Little’s Law),该曲线是线性的。 这与在负载增加时观察队列所得到时延“J”曲线效应非常不同。

依赖图(Dependency Graphs)

  队列从本质上来讲表示一个简单的消费者和生产者之间的只具有一步的管道(pipeline)。如果消费者形成了一个链条,或者一个图状的依赖关系,那么图中的每个阶段之间都会需要一个队列。大量的队列就带来了开销。在设计LMAX财务交易系统的过程中,我们发现基于队列的设计方法会导致大量的队列开销,而这些为数众多的队列所带来的开销耗费了事务处理的大部分时间。

  在Disruptor设计模式中,生产者和消费者的竞争被很好得隔离开了,因此通过使用一个简单的RingBuffer也可以在消费者之间构造复杂的依赖关系。这样降低了执行时延,从而提高了吞吐量。

  一个RingBuffer可以用来处理一个具有复杂的依赖关系图的流程。设计RingBuffer的时候需要特别注意,需要避免消费者之间造成的jvm伪共享。

Disruptor类图

  下图是Disruptor框架的核心类图。这个图里遗漏了一些简化编程模型的类。一旦业务流程的依赖关系图构建完毕,那么编程模型就变简单了。生产者通过ProducerBarrier来顺序申请entry,同时将数据变化写入entry中,然后再通过ProducerBarrier来提交数据变化并使得这些变化对消费者可见。作为一个消费者,它所需要做的只不过是提供一个BatchHandler实现,当一个新的entry可见时,这个回调会被触发。这使得Disruptor编程模型是一个基于事件的模型,和Actor模型类似。

  为了更灵活的设计,队列通常将关注点组合起来考虑。RingBuffer是Disruptor模式的核心,它为数据交换提供了存储,同时又避免了竞争。通过RingBuffer,生产者和消费者之间的并发问题被隔离开了。ProducerBarrier就是用来管理RingBuffer中的位置槽(slot)声明,同时跟踪相关的消费者从而避免冲突覆盖。而ConsumerBarrier在有新的元素有效时会负责通知消费者。通过这些barrier,消费者之间就构造成了一个依赖关系图,这个依赖关系关系图实际上代表了流程处理过程中的各个阶段。

  【注:一下类图是Disruptor1.0的实现类图,现在已经更新到3.0版本,类图也有了很大的改变,后面会给出最新的3.0版本的类图】

技术分享

代码示例

  下面的代码示例是一个单生产者和单消费者的场景,它通过BatchHandler来实现消费者。消费者运行在一个单独的线程上,当元素可用时,它被用来接收元素。

// Callback handler which can be implemented by consumers
final BatchHandler<ValueEntry> batchHandler = new BatchHandler<ValueEntry>()
{
    public void onAvailable(final ValueEntry entry) throws Exception
    {
        // process a new entry as it becomes available.
    }

    public void onEndOfBatch() throws Exception
    {
        // useful for flushing results to an IO device if necessary.
    }

    public void onCompletion()
    {
        // do any necessary clean up before shutdown
    }
};

RingBuffer<ValueEntry> ringBuffer =
    new RingBuffer<ValueEntry>(ValueEntry.ENTRY_FACTORY, SIZE,
                               ClaimStrategy.Option.SINGLE_THREADED,
                               WaitStrategy.Option.YIELDING);
ConsumerBarrier<ValueEntry> consumerBarrier = ringBuffer.createConsumerBarrier();       
BatchConsumer<ValueEntry> batchConsumer = 
    new BatchConsumer<ValueEntry>(consumerBarrier, batchHandler);
ProducerBarrier<ValueEntry> producerBarrier = ringBuffer.createProducerBarrier(batchConsumer);   

// Each consumer can run on a separate thread
EXECUTOR.submit(batchConsumer);

// Producers claim entries in sequence
ValueEntry entry = producerBarrier.nextEntry();

// copy data into the entry container

// make the entry available to consumers
producerBarrier.commit(entry);  

性能测试——吞吐量

 我们选取了Doug Lea的ArrayBlockingQueue的实现作为参考目标进行测试,ArrayBlockingQueue是所有有界队列中性能最好的,测试是按照阻塞的方式进行的。

技术分享

技术分享

技术分享

技术分享

技术分享

  上述配置中,ArrayBlockingQueue被用于每一个数据流箭头的位置,相当于Disruptor中的栅栏所处的位置。下表展示了总共处理5亿条消息时每秒吞吐量的性能测试结果,测试环境为:没有HT的1.6.0_25 64-bit Sun JVM, Windows 7, Intel Core i7 860 @ 2.8 GHz ,以及Intel Core i7-2720QM, Ubuntu 11.04。 我们取了最好的前三条结果,这个结果使用于任何JVM运行环境,表中显示的结果并不是我们发现最好的结果。

 

Nehalem 2.8Ghz – Windows 7 SP1 64-bit

Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit

 

ABQ

Disruptor

ABQ

Disruptor

Unicast: 1P – 1C

5,339,256

25,998,336

4,057,453

22,381,378

Pipeline: 1P – 3C

2,128,918

16,806,157

2,006,903

15,857,913

Sequencer: 3P – 1C

5,539,531

13,403,268

2,056,118

14,540,519

Multicast: 1P – 3C

1,077,384

9,377,871

260,733

10,860,121

Diamond: 1P – 3C

2,113,941

16,143,613

2,082,725

15,295,197

 

 

 

 

 

 

 

 

 

 

 

 

性能测试——时延

  为了测量延时,我们采用3个阶段的pipeline作为测试场景,为了能够测出系统的最佳状态,我们让吞吐量压力维持在一个合适的水准,这个压力不至于耗尽队列资源。这个压力是通过每插入一个事件就等待1ms的方式来实现的,然后一直这样重复5000万次。为了精确测量延时,我们需要精确考量CPU的时间戳计数器(TSC)。我们采用了那些TSC恒定的CPU来作为测试机器,因为老的CPU为了节省功耗,往往会自动调节TSC。Intel Nehalem之后的CPU都支持恒定的TSC。可以使用运行于Ubuntu 11.04上的Oracle最新版本的JVM进行测试,本测试未做CPU绑定。

  我们依然采用ArrayBlockingQueue用来对比,而没有选取ConcurrentLinkedQueue,原因在于我们期望使用一个有界队列来确保生产者不会超过消费者,从而避免产生过载,对测试产生影响。【注:有人可能会问:为什么不用ConcurrentLinkedQueue来做对比呢?ConcurrentLinkedQueue的性能不是更好么?的确是这样,但是由于Disruptor模式的ringbuffer是一个长度固定的队列系统,而ConcurrentLinkedQueue是一个长度没有限制的队列,两者对于长度无限制的队列,生产者如果效率过高,且又无法阻塞,这样会抢占消费者的CPU资源,从而影响最后的测试结果。】

  Disruptor每一轮【注:一次完整的流水线】的平均延时为52纳秒,相比之下ArrayBlockQueue每一轮的平均延时为32757纳秒。跟踪显示ArrayBlockQueue性能损失主要是由条件变量的加锁/通知引起的。下表中测试结果是在配置为2.2Ghz Core i7-2720QM 的Ubuntu 11.04操作系统上运行版本为Java 1.6.0_25 64-bit 的jvm虚拟机得到的。

 

Array Blocking Queue (ns)

Disruptor (ns)

Min Latency

145

29

Mean Latency

32,757

52

99% observations less than

2,097,152

128

99.99% observations less than

4,194,304

8,192

Max Latency

5,069,086

175,567

 

 

 

 

 

 

 

 

 

 

技术分享

结论

  Disruptor在"提高吞吐量-降低时延"的领域里迈出了重要一步,这在许多应用中都是非常重要的考虑。我们的测试显示,与其它线程间交换数据的方法相比,disruptor的性能是最好的。我们相信disruptor应该是所有数据交换方法中最好的。通过隔离关注,通过限制写竞争,通过最小化读竞争,通过代码的缓存友好化(cache-friendly),我们创造了一种在线程之间交换数据的高效的方法。【注:这里是指线程间交换数据,并未包含具体业务逻辑处理。】

  批处理效应(Batch Effect)允许消费者能够一次性没有竞争的处理大量的元素,这为高性能系统提供了一个新的特性。对于大多数系统而言,随着吞吐量压力的增加,延时也会呈指数级增加,形成一个类似J的曲线。然而在Disruptor系统中,随着吞吐量压力的增加,延时依然会很平滑,直到内存被耗尽。

  我们相信Disruptor建立了高性能计算的新基准,并且非常适合继续利用处理器和计算机设计的发展趋势。

 参考文献

[1] Staged Event Driven Architecture – http://www.eecs.harvard.edu/~mdw/proj/seda/

[2] Actor model – http://dspace.mit.edu/handle/1721.1/6952

[3] Java Memory Model - http://www.ibm.com/developerworks/library/j-jtp02244/index.html

[4] Phasers - http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166ydocs/jsr166y/Phaser.html

[5] Value Types - http://blogs.oracle.com/jrose/entry/tuples_in_the_vm

[6] Little’s Law - http://en.wikipedia.org/wiki/Little%27s_law

[7] ArrayBlockingQueue - http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ArrayBlockingQueue.html

[8] ConcurrentLinkedQueue - http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

Disruptor——一种可替代有界队列完成并发线程间数据交换的高性能解决方案

标签:imp   nload   获取   预取   失效   use   队列的实现   如何   batch   

原文地址:http://www.cnblogs.com/daoqidelv/p/7043696.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!