标签:
先看下Future的整个继承体系,还有一个ChannelFuture不在里面;
private final EventExecutor executor; //任务执行器private volatile Object result; //不仅仅是结果,也有可能是异常* 一个或多个监听器,可能是GenericFutureListener或者DefaultFutureListeners。如果是NULL有两种可能
* 1:没有添加触发器
* 2:已经出发了private Object listeners;private LateListeners lateListeners;private short waiters;
private static boolean isDone0(Object result) {return result != null && result != UNCANCELLABLE;}
public boolean isSuccess() {Object result = this.result;if (result == null || result == UNCANCELLABLE) {return false;}return !(result instanceof CauseHolder);}
public V getNow() {Object result = this.result;if (result instanceof CauseHolder || result == SUCCESS) {return null;}return (V) result;}
@Overridepublic Promise<V> sync() throws InterruptedException {await();rethrowIfFailed();return this;}
@Overridepublic Promise<V> await() throws InterruptedException {if (isDone()) {return this;}if (Thread.interrupted()) {throw new InterruptedException(toString());}synchronized (this) {while (!isDone()) {checkDeadLock(); //判断当前线程是否是执行线程。如果是抛出异常。incWaiters(); //添加等待个数try {wait(); //释放锁,等待唤醒,阻塞该线程} finally {decWaiters();}}}return this;}
@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {Object result = this.result;if (isDone0(result) || result == UNCANCELLABLE) {return false;}synchronized (this) {// Allow only once.result = this.result;if (isDone0(result) || result == UNCANCELLABLE) {return false;}this.result = CANCELLATION_CAUSE_HOLDER;if (hasWaiters()) {notifyAll();}}notifyListeners();return true;}
/*** 该方法不需要异步,为啥呢* 1:这个方法在同步代码块里面调用,因此任何监听器列表的改变都happens-before该方法* 2:该方法只有isDone==true的时候调用,一但 isDone==true 那么监听器列表将不会改变*/private void notifyListeners() {Object listeners = this.listeners;if (listeners == null) {return;}EventExecutor executor = executor();if (executor.inEventLoop()) {final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();final int stackDepth = threadLocals.futureListenerStackDepth();if (stackDepth < MAX_LISTENER_STACK_DEPTH) {threadLocals.setFutureListenerStackDepth(stackDepth + 1);try {if (listeners instanceof DefaultFutureListeners) {notifyListeners0(this, (DefaultFutureListeners) listeners);} else {final GenericFutureListener<? extends Future<V>> l =(GenericFutureListener<? extends Future<V>>) listeners;notifyListener0(this, l);}} finally {this.listeners = null;threadLocals.setFutureListenerStackDepth(stackDepth);}return;}}if (listeners instanceof DefaultFutureListeners) {final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;execute(executor, new Runnable() {@Overridepublic void run() {notifyListeners0(DefaultPromise.this, dfl);DefaultPromise.this.listeners = null;}});} else {final GenericFutureListener<? extends Future<V>> l =(GenericFutureListener<? extends Future<V>>) listeners;execute(executor, new Runnable() {@Overridepublic void run() {notifyListener0(DefaultPromise.this, l);DefaultPromise.this.listeners = null;}});}}
@Overridepublic boolean setUncancellable() {Object result = this.result;if (isDone0(result)) {return !isCancelled0(result);}synchronized (this) {// Allow only once.result = this.result;if (isDone0(result)) {return !isCancelled0(result);}this.result = UNCANCELLABLE;}return true;}private boolean setFailure0(Throwable cause) {if (cause == null) {throw new NullPointerException("cause");}if (isDone()) {return false;}synchronized (this) {// Allow only once.if (isDone()) {return false;}result = new CauseHolder(cause);if (hasWaiters()) {notifyAll();}}return true;}private boolean setSuccess0(V result) {if (isDone()) {return false;}synchronized (this) {// Allow only once.if (isDone()) {return false;}if (result == null) {this.result = SUCCESS;} else {this.result = result;}if (hasWaiters()) {notifyAll();}}return true;}
CompleteFuture的几个子类是状态Promise
PromiseTask:该类继承了RunnableFuture接口,该类表示异步操作的结果也可以异步获得,类似JDK中的FutureTask,实例化该对象时候需要传一个Callable的对象,如果没有该对象可以传递一个Runnable和一个Result构造一个Callable对象。
private static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}@Overridepublic T call() {task.run();return result;}@Overridepublic String toString() {return "Callable(task: " + task + ", result: " + result + ‘)‘;}}看下他的run方法。
@Overridepublic void run() {try {if (setUncancellableInternal()) {V result = task.call();setSuccessInternal(result);}} catch (Throwable e) {setFailureInternal(e);}该类的setFailure,setSuccess等方法都会抛出一异常,而加了internal的方法会成功执行,他们是protected,子类或者同一个package中可以调用。ScheduledFutureTask:该类是定时任务返回的ChannelFuture是这个结构中最重要的类,从名称可以知道是通道异步执行的结果:在netty中所有的IO操作都是异步的。这意味这所有的IO调用都会立即返回,且不保证IO操作完成。IO调用会返回一个ChannelFuture的实例,通过该实例可以查看IO操作的结果和状态,ChannelFuture有完成和未完成两种状态,当IO操作开始,就会创建一个ChannelFuture的实例,该实例初始是未完成状态,它不是成功,失败,或者取消,因为IO操作还没有完成,如果IO操作完成了那么将会有成功,失败,和取消状态,* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = <b>true</b> |
* +--------------------------+ | | isSuccess() = <b>true</b> |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = <b>false</b> | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = <b>true</b> |
* | isCancelled() = false | | | cause() = <b>non-null</b> |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = <b>true</b> |
* | isCancelled() = <b>true</b> |
* +---------------------------+该类提供了很多方法用来检查IO操作是否完成,等待完成,和接受IO操作的结果。还可以添加ChannelFutureListener的监听器,这样IO操作完成时就可以得到提醒 * 强烈建议使用addListener而不是await。 * addListener是非阻塞的,它简单的添加指定的ChannelFutureListener到ChannelFuture中, * IO线程将在当绑定在这个future的IO操作完成时,触发这个触发器,优点是提高效率和资源的利用率 * await()是一个阻塞方法,一旦调用,调用线程将会阻塞直到IO操作完成。优点是容易实现顺序逻辑
标签:
原文地址:http://www.cnblogs.com/gaoxing/p/4401794.html