码迷,mamicode.com
首页 > 其他好文 > 详细

凡客传说4-DelayQueue使用方式,源码分析,以及应用场景

时间:2020-12-25 11:47:01      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:main   图片   使用   png   timeout   出队   key   nose   继承   

DelayQueue 顾名思义,它是一个延时队列

使用方式 :

假设我们生产者提交一个任务,消费者5秒钟之后才可以执行,那么我们可以把任务定义为如下格式,并实现Delayed接口,其中data是任务存储的信息。

/**
 * 具体的任务
 * @author wangshixiang
 */
public class Task implements Delayed {
    /**
     * 数据
     */
    private final String data;
    /**
     * 任务执行时间
     */
    private final long time;

    public Task(String data,TimeUnit timeUnit,long time){
        this.data=data;
        this.time=System.currentTimeMillis()+timeUnit.toMillis(time);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long res= time-System.currentTimeMillis();
        return unit.convert(res,TimeUnit.MILLISECONDS);
    }

    public String getData() {
        return data;
    }

    @Override
    public int compareTo(Delayed o) {
        if (o instanceof Task ){
            Task task= (Task) o;
            return (int) (this.time-task.time);
        }
        return 0;
    }
}

定义好任务后,我们需要定义一个任务队列 QUEUE_TASK,来存储消息,实现效果为程序运行后 五秒钟后输出Hello...

  private static final DelayQueue<Task> QUEUE_TASK =new DelayQueue<>();

    public static void main(String[] args) throws InterruptedException {
        QUEUE_TASK .add(new Task("Hello ... ", TimeUnit.SECONDS,5));
        System.out.println(QUEUE_TASK .take().getData());
    }

使用详解

  1. Delayed 接口定义:
public interface DeDlayed extends Comparable<Delayed> {

 
    long getDelay(TimeUnit unit);
}

我们发现Delayed接口继承了Comparable接口,并且有一个getDelay方法,在程序运行的过程中,会调用头部任务的这个方法,来返回该任务具体还有多长时间可以执行。当我们任务实现这个接口时 可以存储任务的执行时间,通过执行时间-当前时间 计算出距离执行时间的差值,因此我们Task定义了一个任务的变量,在创建对象时设置任务的执行时间。
2. DelayQueue 延时队列
首先我们看一下DelayQueue类继承实现结构图
技术图片

可以理解为 DelayQueue 是一个带延迟执行功能的阻塞队列

深入理解

  • 为什么Delayed接口继承了Comparable接口 ?
  • DelayQueue是怎么实现只有到预定时间才能取出任务 ?
  • 向队列里放入一个任务时 发生了什么事情 ?

带着这几个问题,我们来看一下DelayQueeu的源码 首先看一下主要的参数:

    //锁
    private final transient ReentrantLock lock = new ReentrantLock();
    //优先级队列 执行时间最早的排在第一个
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    //是否有线程在等待任务到执行时间
    private Thread leader;
    //条件唤醒
    private final Condition available = lock.newCondition();

那么我们先看add(E e)方法 ,任务入队列时做了哪些操作

    public boolean add(E e) {
        return offer(e);
    }
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

入队列时做了一下步骤:

  1. 获取锁
  2. 放入元素 (放入优先级队列)
  3. 如果自己排在第一个 则原来标记的leader线程已经失效 直接设置为null,并唤醒消费者
  4. 释放锁

接下来在看出队列时take()方法做了哪些操作

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    first = null; // don‘t retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
            //我拿到元素了 唤醒其他的线程
                available.signal();
            lock.unlock();
        }
    }

出队列做了如下步骤:

  1. 获取锁(可中断的锁 获取这种锁允许其他线程中断此线程)
  2. 取出第一个元素 如果第一个元素为空 则直接 await(),等待被唤醒(如放队列时的唤醒)
  3. 如果第一个元素不为空,查看是否到执行时间,如果没有到执行时间 查看是否有leader已经注意到这个任务 如果他注意到这个任务 我直接await()。如果没人注意,那么我就把自己设置为leader然后设置带时间的await()。
  4. 睡眠到执行时间后 醒来后查看leader是否还是自己 如果是的话 取消自己的leader身份。然后在尝试获取任务。
  5. 如果我获取到了符合要求的元素,那么我应该唤醒大家 来一块竞争获取下一个元素。

带时间的出队列方法 E poll(long timeout, TimeUnit unit) 的实现逻辑与take()方法的唯一区别就是。只有当自己剩余等待时间大于第一个元素剩余执行时间时 才允许把自己设置为leader

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0L)
                        return null;
                    else
                        //睡眠等待时间 有可能提前返回 那么返回的是剩余等待时间
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    if (nanos <= 0L)
                        return null;
                    first = null; // don‘t retain ref while waiting
                    if (nanos < delay || leader != null)
                     //如果剩余等待时间比第一个元素剩余执行时间还短 那么应该睡剩余等待时间
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            //计算剩余等待时间 
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal(http://www.jintianxuesha.com/);
            lock.unlock();
        }
    }

应用场景:

在大多数业务场景中,我们会利用中间件提供的延时消息的功能。比如利用redis zset实现 ,kafka rabbit mq 的延时队列。我们需要根据我们的业务场景,来选择合适的中间件。

  1. 订单超时未支付取消.
  2. 调用其他系统时失败间隔重试.
  3. 调用第三方接口时,过段时间异步获取结果。

凡客传说4-DelayQueue使用方式,源码分析,以及应用场景

标签:main   图片   使用   png   timeout   出队   key   nose   继承   

原文地址:https://www.cnblogs.com/jiusibuiu/p/14163251.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!