码迷,mamicode.com
首页 > 其他好文 > 详细

disruptor架构三 使用场景 使用WorkHandler和BatchEventProcessor辅助创建消费者

时间:2017-10-12 14:35:07      阅读:3735      评论:0      收藏:0      [点我收藏+]

标签:executors   wait   new   turn   batch   ted   sleep   实体类   结束   

 

在helloWorld的实例中,我们创建Disruptor实例,然后调用getRingBuffer方法去获取RingBuffer,其实在很多时候,我们可以直接使用RingBuffer,以及其他的API操作。我们一起熟悉下示例:

使用EventProcessor消息处理器。

BatchEventProcessor 多线程并发执行,不同线程执行不同是不同的event 

EventProcessor有3个实现类

BatchEventProcessor 多线程并发执行,不同线程执行不同是不同的event

使用BatchEventProcessor 消费者需要实现EventHandler接口

我们来看下面的代码:

需要处理的实体类

package bhz.generate1;

import java.util.concurrent.atomic.AtomicInteger;

public class Trade {  
    
    private String id;//ID  
    private String name;
    private double price;//金额  
    private AtomicInteger count = new AtomicInteger(0);
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public double getPrice() {
        return price;
    }
    public void setPrice(double price) {
        this.price = price;
    }
    public AtomicInteger getCount() {
        return count;
    }
    public void setCount(AtomicInteger count) {
        this.count = count;
    } 
      
      
}  

消费者类:

package bhz.generate1;

import java.util.UUID;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
      
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
        //杩欓噷鍋氬叿浣撶殑娑堣垂閫昏緫  
        event.setId(UUID.randomUUID().toString());//绠?崟鐢熸垚涓婭D  
        System.out.println(event.getId());  
    }  
}  

消费者除了实现EventHandler接口之外,还实现了WorkHandler接口,为啥了,因为后面我们要使用了WokerPool来发送该实体类,所以这里就让该实体类实现两个接口

我们来看看main方法

package bhz.generate1;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;

public class Main1 {  
   
    public static void main(String[] args) throws Exception {  
        int BUFFER_SIZE=1024;  
        int THREAD_NUMBERS=4;  
        /* 
         * createSingleProducer创建一个单生产者的RingBuffer, 
         * 第一个参数叫EventFactory,从名字上理解就是"事件工厂",其实它的职责就是产生数据填充RingBuffer的区块。 
         * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 
         * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 
         */  
        final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, BUFFER_SIZE, new YieldingWaitStrategy());  
        
        //创建线程池  
        ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
        
        //创建SequenceBarrier  ,用于平衡生产者和消费者速率,用障碍来处理
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        //创建消息处理器  
        BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  
                ringBuffer, sequenceBarrier, new TradeHandler());  
          
        //这一步的目的就是把消费者的位置信息引用注入到生产者    如果只有一个消费者的情况可以省略 ,将生产者和消费者关联起来
        ringBuffer.addGatingSequences(transProcessor.getSequence());  
          
        //把消息处理器提交到线程池  
        executors.submit(transProcessor);  
        
        //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类  
          
        Future<?> future= executors.submit(new Callable<Void>() {  
            @Override  
            public Void call() throws Exception {  
                long seq;  
                for(int i=0;i<10;i++){  
                    seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块  
                    ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据 
                    ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见  
                }  
                return null;  
            }  
        }); 
        
        future.get();//等待生产者结束  
        Thread.sleep(1000);//等上1秒,等消费都处理完成  
        transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)  
        executors.shutdown();//终止线程  
    }  
}  

//创建消息处理器
BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(
ringBuffer, sequenceBarrier, new TradeHandler());

它主要有三个成员RingBuffer、SequenceBarrier和EventHandler

上面对应对应的是一个生产者,一个消费者的情况

我们来看看程序运行的效果

1a7226d0-e212-4183-b109-cab5e5c41545
3e1da0fa-686d-4361-bea2-600c2c5d26b9
bf31874a-3405-4008-80e7-03caf9f16ae4
080a05ef-0052-4271-a2ee-ee50038a5a77
71e1a5a8-24ba-4175-b53a-f8b71e99464a
99670de9-6aa5-48fa-8fa2-a490250e25ba
7a44b351-0caa-4ac3-b344-97cf72c9dd5f
10a7fe52-eef1-453c-80a2-126fd8bac948
c78f2ed5-3c3e-4481-9062-dd96ff7ba051
49f51ad6-2ee5-4c36-a0d0-96bc0e17fba9

如果是一个生产者,对应多个消费者,那么

//创建消息处理器
BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(
ringBuffer, sequenceBarrier, new TradeHandler());

//这一步的目的就是把消费者的位置信息引用注入到生产者 如果只有一个消费者的情况可以省略 ,将生产者和消费者关联起来
ringBuffer.addGatingSequences(transProcessor.getSequence());

//把消息处理器提交到线程池
executors.submit(transProcessor);

//如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类

所以:BatchEventProcessor 多线程并发执行,不同线程执行不同是不同的event 

2、使用WorkerPool消息处理器。

消费者需要实现:WorkHandler接口

我们来看看主程序的代码:

package bhz.generate1;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;

public class Main2 {  
    public static void main(String[] args) throws InterruptedException {  
        int BUFFER_SIZE=1024;  
        int THREAD_NUMBERS=4;  
        
        EventFactory<Trade> eventFactory = new EventFactory<Trade>() {  
            public Trade newInstance() {  
                return new Trade();  
            }  
        };  
        
        RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);  
          
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);  
          
        WorkHandler<Trade> handler = new TradeHandler();  

        WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);  
          
        workerPool.start(executor);  
          
        //下面这个生产8个数据
        for(int i=0;i<8;i++){  
            long seq=ringBuffer.next();  
            ringBuffer.get(seq).setPrice(Math.random()*9999);  
            ringBuffer.publish(seq);  
        }           
        Thread.sleep(1000);  
        workerPool.halt();  
        executor.shutdown();  
    }  
}  

程序运行的效果:

4bbffa55-b19f-44a4-bfa7-100affc63323
121a0ee8-7e8e-4637-b659-ca78ae9aaa20
0fc1cdb8-8186-44fc-a3a5-4bf5fea66086
afb70a80-e1ce-46f9-bfc1-4e0d81be96b4
0e0b3690-830b-4d38-b78b-e0930b499515
f5b4e23f-10c8-45ea-b064-32ae40f54912
4a172494-480a-4509-99d0-d416b5e2c5c9
902c0669-6196-423e-9924-31cb9633bbb5

 

disruptor架构三 使用场景 使用WorkHandler和BatchEventProcessor辅助创建消费者

标签:executors   wait   new   turn   batch   ted   sleep   实体类   结束   

原文地址:http://www.cnblogs.com/kebibuluan/p/7655876.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!