标签:main 图片 使用 png timeout 出队 key nose 继承
假设我们生产者提交一个任务,消费者5秒钟之后才可以执行,那么我们可以把任务定义为如下格式,并实现Delayed接口,其中data是任务存储的信息。
/**
* 具体的任务
* @author wangshixiang
*/
public class Task implements Delayed {
/**
* 数据
*/
private final String data;
/**
* 任务执行时间
*/
private final long time;
public Task(String data,TimeUnit timeUnit,long time){
this.data=data;
this.time=System.currentTimeMillis()+timeUnit.toMillis(time);
}
@Override
public long getDelay(TimeUnit unit) {
long res= time-System.currentTimeMillis();
return unit.convert(res,TimeUnit.MILLISECONDS);
}
public String getData() {
return data;
}
@Override
public int compareTo(Delayed o) {
if (o instanceof Task ){
Task task= (Task) o;
return (int) (this.time-task.time);
}
return 0;
}
}
定义好任务后,我们需要定义一个任务队列 QUEUE_TASK,来存储消息,实现效果为程序运行后 五秒钟后输出Hello...
private static final DelayQueue<Task> QUEUE_TASK =new DelayQueue<>();
public static void main(String[] args) throws InterruptedException {
QUEUE_TASK .add(new Task("Hello ... ", TimeUnit.SECONDS,5));
System.out.println(QUEUE_TASK .take().getData());
}
public interface DeDlayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
我们发现Delayed接口继承了Comparable接口,并且有一个getDelay方法,在程序运行的过程中,会调用头部任务的这个方法,来返回该任务具体还有多长时间可以执行。当我们任务实现这个接口时 可以存储任务的执行时间,通过执行时间-当前时间 计算出距离执行时间的差值,因此我们Task定义了一个任务的变量,在创建对象时设置任务的执行时间。
2. DelayQueue 延时队列
首先我们看一下DelayQueue类继承实现结构图
可以理解为 DelayQueue 是一个带延迟执行功能的阻塞队列
带着这几个问题,我们来看一下DelayQueeu的源码 首先看一下主要的参数:
//锁
private final transient ReentrantLock lock = new ReentrantLock();
//优先级队列 执行时间最早的排在第一个
private final PriorityQueue<E> q = new PriorityQueue<E>();
//是否有线程在等待任务到执行时间
private Thread leader;
//条件唤醒
private final Condition available = lock.newCondition();
那么我们先看add(E e)方法 ,任务入队列时做了哪些操作
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
入队列时做了一下步骤:
接下来在看出队列时take()方法做了哪些操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
first = null; // don‘t retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
//我拿到元素了 唤醒其他的线程
available.signal();
lock.unlock();
}
}
出队列做了如下步骤:
带时间的出队列方法 E poll(long timeout, TimeUnit unit) 的实现逻辑与take()方法的唯一区别就是。只有当自己剩余等待时间大于第一个元素剩余执行时间时 才允许把自己设置为leader
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0L)
return null;
else
//睡眠等待时间 有可能提前返回 那么返回的是剩余等待时间
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
if (nanos <= 0L)
return null;
first = null; // don‘t retain ref while waiting
if (nanos < delay || leader != null)
//如果剩余等待时间比第一个元素剩余执行时间还短 那么应该睡剩余等待时间
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
//计算剩余等待时间
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(http://www.jintianxuesha.com/);
lock.unlock();
}
}
在大多数业务场景中,我们会利用中间件提供的延时消息的功能。比如利用redis zset实现 ,kafka rabbit mq 的延时队列。我们需要根据我们的业务场景,来选择合适的中间件。
凡客传说4-DelayQueue使用方式,源码分析,以及应用场景
标签:main 图片 使用 png timeout 出队 key nose 继承
原文地址:https://www.cnblogs.com/jiusibuiu/p/14163251.html