标签:
1 Disruptor disruptor = new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, ringBufferSize, exec,
ProducerType.MULTI, waitStrategy);
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); }
public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } }
createMultiProducer:
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);//Sequencer
//public final class MultiProducerSequencer extends AbstractSequencer
//public abstract class AbstractSequencer implements Sequencer return new RingBuffer<E>(factory, sequencer); }
RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize()]; fill(eventFactory); }
private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < entries.length; i++) { entries[i] = eventFactory.newInstance(); } }
2 disruptor.handleEventsWith(eventHandlers);
public EventHandlerGroup<T> handleEventsWith(final EventHandler<T>... handlers) { return createEventProcessors(new Sequence[0], handlers); }
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);//
//ProcessingSequenceBarrier ProcessingSequenceBarrier implements SequenceBarrier for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } if (processorSequences.length > 0) { consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); }
consumerRepository 与 batchEventProcessor eventHandler SequenceBarrier 关联起来
EventHandlerGroup 与disruptor consumerRepository Sequence 关联起来
3 disruptor.start();
public RingBuffer<T> start() { Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true); ringBuffer.addGatingSequences(gatingSequences); checkOnlyStartedOnce(); for (ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor);//自定义的线程池执行EventProcessorInfo } return ringBuffer; }
class EventProcessorInfo<T> implements ConsumerInfo
@Override
public void start(final Executor executor)
{
executor.execute(eventprocessor);//batchEventProcessor
}
BatchEventProcessor RUN方法
@Override
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
//Concurrent sequence class used for tracking the progress of
// * the ring buffer and event processors. Support a number
//* of concurrent operations including CAS and order writes
long nextSequence = sequence.get() + 1L;//使用CAS取得下一个序列号
try
{
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);//等待策略取得可用序列号
if (nextSequence > availableSequence)//
{
Thread.yield();
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);//dataProvider 就是ringbuffer
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);//触发自定义事件
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
4 介绍 final long availableSequence = sequenceBarrier.waitFor(nextSequence);//等待可用序列号
6种等待策略
标签:
原文地址:http://www.cnblogs.com/clds/p/5570137.html