码迷,mamicode.com
首页 > 其他好文 > 详细

Disruptor源码分析

时间:2017-11-05 17:29:52      阅读:219      评论:0      收藏:0      [点我收藏+]

标签:public   cep   cached   策略   poi   cas   err   cache   生产者与消费者   

本文将介绍Disruptor的工作机制,并分析Disruptor的主要源码

基于的版本是3.3.7(发布于2017.09.28)

水平有限,如有谬误请留言指正

 

0. 什么是Disruptor?

Disruptor是一个开源的并发框架,提供了类似于Java中有界队列的功能,主要用于生产消费者场景。

与Java原生并发队列不同的是,Disruptor高度优化,在单机上可以轻松跑到千万级别的tps

 

1. Disruptor的关键想法

a. 使用环形队列作为底层存储(存储空间连续,可以充分利用cache)

b. 生产者使用两阶段提交的方式来发布事件(第一阶段是先在环形队列中预占一个空位,第二阶段是向这个空位中写入数据,竞争只发生在第一阶段),并使用CAS操作来解决冲突,而不是使用昂贵的Lock

c. 用cache line padding(缓冲区填充)的思想来解决false sharing(伪共享)的问题

d. 使用了Java底层的Unsafe操作

 

2. Disruptor的核心组件

RingBuffer

环形缓冲区,本质是一个定长Object数组(后续称里面的格子为slot),为了避免伪共享,在这个数组的两端额外填充了若干空位(这也导致访问RingBuffer数据的方式比较崎岖,具体请自行参见源码)

Sequence

类似于AtomicLong,用于标记事件id

所有生产者共用一个Sequence,用于不冲突的将事件放到RingBuffer上

每个消费者自己维护一个Sequence,用于标记自己当前正在处理的事件的id

Sequencer

生产者访问RingBuffer时的控制器,主要实现有两种:SingleProducerSequencer与MultiProducerSequencer,分别用于单生产者和多生产者的场景

SequenceBarrier

只有一个实现类为ProcessingSequenceBarrier,用于协调生产者与消费者(如果某个slot中的事件还没有被所有消费者消费完毕,那么这个slot是不能被复用的,需要等待)

WaitStrategy

消费者等待下一个可用事件的策略,Disruptor自带了多种WaitStrategy的实现,可以根据场景自行选择。

 

3. 生产者发布事件到RingBuffer

示例代码如下:

        long sequence = ringBuffer.next();  // 第一阶段,获取RingBuffer上下一个可用的slot的序列号,这里可能会有争用
        try {
            Event event = ringBuffer.get(sequence); // 根据序列号直接去RingBuffer上获取对应的slot上存储的事件
            event.setData(data);  // 写入数据
        } finally {
            ringBuffer.publish(sequence); // 第二阶段,将这个事件正式发布到RingBuffer中
        }

需要重点关注的是next()与publish()方法

RingBuffer的next方法直接调用关联的Sequencer的next方法,Sequencer的实现又分为SingleProducerSequencer与MultiProducerSequencer这两种

先从相对简单的SingleProducerSequencer看起:

SingleProducerSequencer.next()
    @Override
    public long next()
    {
        return next(1);
    }

    @Override
    public long next(int n)
    {
        if (n < 1)//参数检验
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue;//上一次返回的seq

        long nextSequence = nextValue + n;//这次应该返回的序列值,这个序列值还未被产生,对应的slot里的元素的seq需要减去RingBuffer的大小
        long wrapPoint = nextSequence - bufferSize;//这个序列值对应的slot上正在存储的元素的seq,这个slot可能已经被消费了,也可能没有
        long cachedGatingSequence = this.cachedValue;//获取消费者未消费的元素的seq最小值,这个值不是实时的

        //wrapPoint > cachedGatingSequence,检查将要被放入元素的slot是否已经没有消费者占用了
        //cachedGatingSequence > nextValue,用于来应对seq发生溢出的情况
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence,更新RingBuffer的游标,用到了Unsafe方法

            long minSequence;
            //Util.getMinimumSequence可以获得所有消费者未消费事件的seq最小值,在比这个值更小的slot里发布元素是安全的
            //如果这个判断成立,说明生产者正在试图将元素放到消费者未消费完毕的slot里,这个操作是不安全的,生产者需要在这里被阻塞
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                waitStrategy.signalAllWhenBlocking();//激活所有的消费者(避免有的消费者睡死过去了)
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? 自旋等待
            }

            this.cachedValue = minSequence;//更新cachedValue
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }

逻辑比较难懂,关键之处如下:

a. 返回的seq对应的slot必须已经被所有消费者消费完毕

b. Util.getMinimumSequence会遍历所有消费者使用的Sequence,并获取其最小值,这是一个比较昂贵的操作,所以将其缓存在本地的cachedValue变量中

c. 如果

 

 

 

 

 

 

参考资料

并发编程网上关于Disruptor的系列文章

 

Disruptor源码分析

标签:public   cep   cached   策略   poi   cas   err   cache   生产者与消费者   

原文地址:http://www.cnblogs.com/stevenczp/p/7783977.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!