码迷,mamicode.com
首页 > 其他好文 > 详细

disruptor

时间:2017-10-03 23:24:10      阅读:239      评论:0      收藏:0      [点我收藏+]

标签:com   queue   wiki   gets   jdk   http   end   ret   生产   

 disruptor提供了线程间消息通信机制,通常会拿它和jdk中的blockingQueue作比较,blockingQueue使用了ReentrantLock。

 

下例的逻辑是,生产500个ValueEvent,先后由toDbHandler和businessHandler消费事件。

normalTest使用的是低级接口,DSLWizardTest使用的是高级接口,后者明显要简洁一些。

技术分享
 1 class ValueEvent {
 2   private long value;
 3 
 4   public long getValue() {
 5     return value;
 6   }
 7 
 8   public void setValue(final long value) {
 9     this.value = value;
10   }
11 
12   @Override
13   public String toString() {
14     return "ValueEvent{" +
15             "value=" + value +
16             ‘}‘;
17   }
18 }
19 // 生产者的线程工厂
20 class SimpleThreadFactory implements java.util.concurrent.ThreadFactory {
21   @Override
22   public Thread newThread(Runnable r) {
23     return new Thread(r, "simpleThread");
24   }
25 };
26 
27 // RingBuffer生产工厂,初始化RingBuffer的时候使用
28 class SimpleEventFactory implements EventFactory<ValueEvent> {
29   @Override
30   public ValueEvent newInstance() {
31     return new ValueEvent();
32   }
33 };
34 
35 class JournalEventHandler implements EventHandler<ValueEvent> {
36   public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception {
37     // process a new event.
38     System.out.println("to db: " + event);
39   }
40 };
41 class ReplicationEventHandler implements EventHandler<ValueEvent> {
42   public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception {
43     // process a new event.
44     System.out.println("replication: " + event);
45   }
46 };
47 class ApplicationEventHandler implements EventHandler<ValueEvent> {
48   public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception {
49       // process a new event.
50       System.out.println(event);
51             // System.out.println("sequence: " + sequence);
52             // System.out.println("end of batch: " + endOfBatch);
53   }
54 };
View Code

 

 1 public class TestDisruptor {
 2 
 3   // 指定RingBuffer的大小
 4   private int bufferSize = 16;
 5   // 阻塞策略
 6   BlockingWaitStrategy strategy = new BlockingWaitStrategy();
 7 
 8   @Test
 9   public void normalTest() throws InterruptedException {
10     RingBuffer<ValueEvent> ringBuffer =
11             RingBuffer.createSingleProducer(new SimpleEventFactory(), bufferSize, strategy);
12 
13     SequenceBarrier barrier = ringBuffer.newBarrier();
14     BatchEventProcessor<ValueEvent> journalEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, new JournalEventHandler());
15     BatchEventProcessor<ValueEvent> replicationEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, new ReplicationEventHandler());
16 
17     barrier = ringBuffer.newBarrier(journalEventProcessor.getSequence(), replicationEventProcessor.getSequence());
18     BatchEventProcessor<ValueEvent> businessEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, new ApplicationEventHandler());
19 
20 
21     ringBuffer.addGatingSequences(businessEventProcessor.getSequence());
22 
23     ExecutorService executor = Executors.newCachedThreadPool(new SimpleThreadFactory());
24     // Each EventProcessor can run on a separate thread
25     executor.submit(businessEventProcessor);
26     executor.submit(journalEventProcessor);
27     executor.submit(replicationEventProcessor);
28 
29     for (int i = 0; i < 500; i++) {
30       // Publishers claim events in sequence
31       long sequence = ringBuffer.next();
32       try {
33         ValueEvent event = ringBuffer.get(sequence);
34         event.setValue(i); // this could be more complex with multiple fields
35       } finally {
36         ringBuffer.publish(sequence);
37       }
38       // make the event available to EventProcessors
39       Thread.sleep(10);
40     }
41     Thread.sleep(100);
42   }
43 
44   /**
45    * DSLWizard
46    *
47    * @throws Exception
48    */
49   @Test
50   public void dslWizardTest() throws Exception {
51     // 创建disruptor,采用单生产者模式
52     Disruptor<ValueEvent> disruptor =
53             new Disruptor<>(new SimpleEventFactory(), bufferSize, new SimpleThreadFactory(), ProducerType.SINGLE, strategy);
54     // 设置EventHandler
55     disruptor.handleEventsWith(new JournalEventHandler(), new ReplicationEventHandler())
56             .then(new ApplicationEventHandler());
57     // 启动disruptor的线程
58     disruptor.start();
59 
60     EventTranslator<ValueEvent> eventTranslator = new EventTranslator<ValueEvent>() {
61       private AtomicInteger value = new AtomicInteger(0);
62 
63       @Override
64       public void translateTo(ValueEvent event, long sequence) {
65         event.setValue(value.getAndIncrement());
66       }
67     };
68     for (int l = 0; l < 500; l++) {
69       disruptor.publishEvent(eventTranslator);
70       Thread.sleep(10);
71     }
72   }
73 }

 

上述实现的功能可以使用下图表示,ApplicationConsumer(ApplicationEventHandler)依赖于JournalConsumer(JournalEventHandler)和ReplicationConsumer(ReplicationEventHandler),这种依赖关系体现在ApplicationConsumer的SequenceBarrier依赖ReplicationConsumer和JournalConsumer的Sequence(三者的SequenceBarrier都依赖于RingBuffer的Sequence)。

我们看到Sequencer依赖于ApplicationConsumer的Sequence,ApplicationConsumer作为最后一级消费者,其消费速度如果过慢,就会阻塞Sequencer产生下一个序号的速度(RingBuffer.next()调用了Sequencer.next())。

Sequencer的next使用CAS分配序号,避免了锁开销,当RingBuffer满时,不断循环判断是否可以分配序号,如果不可分配则LockSupport.parkNanos(1)。

每个Consumer都有一个独立的Sequence,Consumers和Sequencer共享一个waitStrategy,Consumer在取事件时,会通过waitStrategy的waitFor获得可用序号。

waitStrategy有不同的策略,比如BlockingWaitStrategy、YieldingWaitStrategy等,这些策略决定了在无可用序号时线程的等待策略,比如BlockingWaitStrategy会阻塞(涉及锁),YieldingWaitStrategy会不断循环并yield。

技术分享

disruptor中的几个重要组件:

RingBuffer:存放元素的环形容器;

Sequence:序号,类似AtomicLong,考虑了缓存行带来的伪共享(false sharing),对value进行了填充,用于并发获取RingBuffer中可存入元素的序号;

Sequencer:有两个实现类,SingleProducerSequencer和MultiProducerSequencer,这是一个非常关键的组件,next()会得到RingBuffer的下一个可分配序号,然后判断该序号是否可以被分配,判断依据就是最低级的Consumer的消费速度是否跟上了生产者的生产速度(通过最低级的Sequence与待分配序号比较即可判断);

SequenceBarrier:可以理解为Sequence的屏障,Sequence不能跨过该屏障取序号;

EventHandler:消费者的消费逻辑;

EventProccessor:消费者,循环通过SequenceBarrier.waitFor得到可用(可消费)序号,回调EventHandler的onEvent消费;

WaitStrategy:Consumer(EventProcessor)的等待策略;

Producer:用户逻辑;

 

参考:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction

  http://lmax-exchange.github.io/disruptor/

disruptor

标签:com   queue   wiki   gets   jdk   http   end   ret   生产   

原文地址:http://www.cnblogs.com/holoyong/p/7622985.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!