码迷,mamicode.com
首页 > 其他好文 > 详细

disruptor的并行用法

时间:2017-05-17 19:19:54      阅读:167      评论:0      收藏:0      [点我收藏+]

标签:spin   使用   produce   art   bsp   缓冲   并行执行   style   返回   

实现EventFactory,在newInstance方法中返回,ringBuffer缓冲区中的对象实例;代码如下:

public class DTaskFactory implements EventFactory<DTask> {
    @Override
    public DTask newInstance() {//disruptor使用环形缓冲区,这是环形缓冲区所承载的对象
        return new DTask();
    }
}

生产消费的对象类型:

public class DTask {
    public String getName1() {
        return name1;
    }

    public void setName1(String name1) {
        this.name1 = name1;
    }

    public String getName2() {
        return name2;
    }

    public void setName2(String name2) {
        this.name2 = name2;
    }

    public String getName3() {
        return name3;
    }

    public void setName3(String name3) {
        this.name3 = name3;
    }

    String name1;
    String name2;
    String name3;

}

disruptor的消费处理事件onEvent为消费调用的方法(下面的代码中包含并行和串行执行的消费事件):

public class DTaskHandle implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("开始最后消费");
        System.out.println(dTask.getName1());

        System.out.println(dTask.getName2());
        System.out.println(dTask.getName3());
        System.out.println("结束最后消费");
    }
}

public class DTaskHandle1 implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("-----DTaskHandle1-----");
        dTask.setName1("name1");
    }
}

public class DTaskHandle2 implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("-----DTaskHandle2-----");
        dTask.setName2("name2");
    }
}

public class DTaskHandle3 implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("-----DTaskHandle3-----");
        dTask.setName3("name3");
    }
}

测试执行类:

public class DisruptorTest {

    public void exec() throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        Disruptor<DTask> disruptor = new Disruptor(new DTaskFactory(),
                1024 * 1024,
                executor,
                ProducerType.SINGLE, new BusySpinWaitStrategy());

        DTaskHandle dTaskHandle = new DTaskHandle();
        DTaskHandle1 dTaskHandle1 = new DTaskHandle1();
        DTaskHandle2 dTaskHandle2 = new DTaskHandle2();
        DTaskHandle3 dTaskHandle3 = new DTaskHandle3();
        disruptor.handleEventsWith(dTaskHandle1, dTaskHandle2, dTaskHandle3);//消费生产出的对象,并行执行

        disruptor.after(dTaskHandle1, dTaskHandle2, dTaskHandle3).handleEventsWith(dTaskHandle);//并行执行1 2 3后,串行执行dTaskHandle

//        disruptor.

        disruptor.start();
        CountDownLatch latch = new CountDownLatch(1);
        //生产者准备
        executor.submit(new TradePublisher(latch, disruptor));

        latch.await();//等待生产者完事.

        disruptor.shutdown();
        executor.shutdown();
    }

}

 

disruptor的并行用法

标签:spin   使用   produce   art   bsp   缓冲   并行执行   style   返回   

原文地址:http://www.cnblogs.com/zzq-include/p/6868874.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!