标签:场景 opened this port run ges 延迟 性能 方式
参考:http://www.cnblogs.com/haiq/p/4112689.html
Disruptor 是线程内通信框架,用于线程里共享数据。LMAX 创建Disruptor作为可靠消息架构的一部分并将它设计成一种在不同组件中共享数据非常快的方法。
先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。
参考:http://ifeve.com/disruptor-writing-ringbuffer/
Ring Buffer是基于数组,而不是链表,不会产生内存回收的性能消耗。
1)Disruptor 由消费者负责通知ProducerBarrier,处理到了哪个序列号,而不是 Ring Buffer。所以,如果我们想确定我们没有让 Ring Buffer 重叠,需要检查所有的消费者们都读到了哪里。
2)现在生产者想要写入 Ring Buffer 中序号 3 占据的节点,因为它是 Ring Buffer 当前游标的下一个节点。但是 ProducerBarrier 明白现在不能写入,因为有一个消费者正在占用它。所以,ProducerBarrier 停下来自旋 (spins),等待,直到那个消费者离开。
提交数据及多生产者
package com.disruptor; /** * 定义事件 * 事件event就是通过disruptor进行交换的数据类型 * @author gaojiay * */ public class LongEvent { private long value; public void set(long value){ this.value = value; } public long get() { // TODO Auto-generated method stub return value; } }
package com.disruptor; import com.lmax.disruptor.EventFactory; /** * 事件工厂(Event Factory)定义了实例化定义的事件(Event); * Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例; * * 个 Event 实例实际上被用作一个“数据槽”,发布者发布前, * 先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据, * 之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event * 实例并从中读取数据。 * */ public class LongEventFactory implements EventFactory<LongEvent>{ @Override public LongEvent newInstance() { // TODO Auto-generated method stub return new LongEvent(); } }
package com.disruptor; import com.lmax.disruptor.EventHandler; /** * 定义事件处理的具体实现 消费者,也就是事件处理器 * * @author gaojiay * */ public class LongEventHandler implements EventHandler<LongEvent> { private String name; @Override public void onEvent(LongEvent event, long sequence, boolean endofBatch) throws Exception { System.out.println("CoustmerEvent_"+name+":" + event.get()); } public LongEventHandler() { super(); } public LongEventHandler(String name) { this(); this.name = name; } }
main
package com.disruptor; /** * ringbuffer的写入 http://ifeve.com/disruptor-writing-ringbuffer/ */ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; //http://www.cnblogs.com/haiq/p/4112689.html public class main { //static AtomicLong data = new AtomicLong(1000); static long count = 100; public static void main(String[] args) { //Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。例如: ExecutorService executor = Executors.newCachedThreadPool(); /*BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现; SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景; YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。*/ WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy(); WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy(); WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy(); //启动 EventFactory<LongEvent> eventFactory = new LongEventFactory(); int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方; Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); EventHandler<LongEvent> eventHandler1 = new LongEventHandler("1"); //消费者1 EventHandler<LongEvent> eventHandler2 = new LongEventHandler("2"); //消费者2 disruptor.handleEventsWith(eventHandler1,eventHandler2); disruptor.start(); // 发布事件; /* 发布方式一 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); for(int i= 0;i<100;i++){ long sequence = ringBuffer.next();//请求下一个事件序号; try { LongEvent event = ringBuffer.get(sequence);//获取该序号对应的事件对象; long data = getData();//获取要通过事件传递的业务数据; event.set(data); } finally{ //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用; 如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。 Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。 ringBuffer.publish(sequence);//发布事件; } } disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理; executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭; */ //发布方式二 :一些复杂的操作放在Ring Buffer RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducerWithTranslator translator = new LongEventProducerWithTranslator(ringBuffer); translator.onData(getData()); } private static long getData(){ // return data.getAndIncrement(); return ++count; } }
package com.disruptor; import java.nio.ByteBuffer; import com.lmax.disruptor.RingBuffer; public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } /** * onData用来发布事件,每调用一次就发布一次事件事件 * 它的参数会通过事件传递给消费者 * * @param bb */public void onData(ByteBuffer bb) { //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽 long sequence = ringBuffer.next(); try { //用上面的索引取出一个空的事件用于填充 LongEvent event = ringBuffer.get(sequence);// for the sequence event.set(bb.getLong(0)); } finally { //发布事件 只ringBuffer ringBuffer.publish(sequence); } } }
package com.disruptor; import java.nio.ByteBuffer; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在Ring Buffer * ,所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator来发布事件。 * @author gaojiay * */ public class LongEventProducerWithTranslator { //一个translator可以看做一个事件初始化器,publicEvent方法会调用它 //填充Event private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR = //Translator中方法的参数是通过RingBuffer来传递的。 new EventTranslatorOneArg<LongEvent, Long>() { public void translateTo(LongEvent event, long sequence, Long bb) { event.set(bb); } }; private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
测试第两种:
package com.disruptor; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; public class LongEventMain { public static void main(String[] args) throws InterruptedException { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024;// Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // 可以使用lambda来注册一个EventHandler disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.get())); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); Thread.sleep(1000); } } }
标签:场景 opened this port run ges 延迟 性能 方式
原文地址:http://www.cnblogs.com/gaojy/p/7216321.html