标签:unit 技术 eric fail 修改 abstract read his throws
netty Future是基于jdk Future扩展,以监听完成任务触发执行
Promise是对Future修改任务数据
DefaultPromise是重要的模板类,其它不同类型实现基本是一层简单的包装,如DefaultChannelPromise
主要是分析await是如何等侍结果的
public interface Future<V> extends java.util.concurrent.Future<V> { Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); } public interface Promise<V> extends Future<V> { Promise<V> setSuccess(V result); boolean trySuccess(V result); Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); boolean setUncancellable(); } public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { //已完成任务直接忽略 if (isDone()) { return true; } //没有等侍时间返回处理记录 if (timeoutNanos <= 0) { return isDone(); } //已中断抛异常 if (interruptable && Thread.interrupted()) { throw new InterruptedException(toString()); } //checkDeadLock(); //netty 认为是当前线程是死锁状态 EventExecutor e = executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(toString()); } long startTime = System.nanoTime(); long waitTime = timeoutNanos; boolean interrupted = false; try { for (;;) { synchronized (this) { if (isDone()) { return true; } //最大检查次数为 Short.MAX_VALUE //很奇怪的逻辑,处理完后又自减 if (waiters == Short.MAX_VALUE) { throw new IllegalStateException("too many waiters: " + this); } ++waiters; try { //阻塞的代码只是一行参数1是milliseconds,参数2是辅助用的大于0时milliseconds+1,如果是0的话会无限制阻塞 wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } finally { waiters--; } } //这里是double check跟并发无影响的逻辑放在synchronized外面 if (isDone()) { return true; } else { waitTime = timeoutNanos - (System.nanoTime() - startTime); if (waitTime <= 0) { return isDone(); } } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } } public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint { private final Channel channel; public DefaultChannelPromise(Channel channel) { this.channel = channel; } public DefaultChannelPromise(Channel channel, EventExecutor executor) { super(executor); this.channel = channel; } }
[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现
标签:unit 技术 eric fail 修改 abstract read his throws
原文地址:http://www.cnblogs.com/solq111/p/7071177.html