标签:线程通信 核心 技术 exception term useful current error 维护
这是Java并发包提供的最后一个线程池实现,也是最复杂的一个线程池。针对这一部分的代码太复杂,由于目前理解有限,只做简单介绍。通常大家说的Fork/Join框架其实就是指由ForkJoinPool作为线程池、ForkJoinTask(通常实现其三个抽象子类)为任务、ForkJoinWorkerThread作为执行任务的具体线程实体这三者构成的任务调度机制。通俗的说,ForkJoin框架的作用主要是为了实现将大型复杂任务进行递归的分解,直到任务足够小才直接执行,从而递归的返回各个足够小的任务的结果汇集成一个大任务的结果,依次类推最终得出最初提交的那个大型复杂任务的结果,这和方法的递归调用思想是一样的。当然ForkJoinPool线程池为了提高任务的并行度和吞吐量做了非常多而且复杂的设计实现,其中最著名的就是任务窃取机制。
对照前面介绍的ThreadPoolExecutor执行的任务是Future的实现类FutureTask、执行线程的实体是内部类Worker,ForkJoinPool执行的任务就是Future的实现类ForkJoinTask、执行线程就是ForkJoinWorkerThread。
该类直接继承了Thread,但是仅仅是为了增加一些额外的功能,并没有对线程的调度执行做任何更改。ForkJoinWorkerThread是被ForkJoinPool管理的工作线程,由它来执行ForkJoinTasks。该类主要为了维护创建线程实例时通过ForkJoinPool为其创建的任务队列,与其他两个线程池整个线程池只有一个任务队列不同,ForkJoinPool管理的所有工作线程都拥有自己的工作队列,为了实现任务窃取机制,该队列被设计成一个双端队列,而ForkJoinWorkerThread的首要任务就是执行自己的这个双端任务队列中的任务,其次是窃取其他线程的工作队列,以下是其代码片段:
1 public class ForkJoinWorkerThread extends Thread { 2 3 final ForkJoinPool pool; // 这个线程工作的ForkJoinPool池 4 final ForkJoinPool.WorkQueue workQueue; // 这个线程拥有的工作窃取机制的工作队列 5 6 //创建在给定ForkJoinPool池中执行的ForkJoinWorkerThread。 7 protected ForkJoinWorkerThread(ForkJoinPool pool) { 8 // Use a placeholder until a useful name can be set in registerWorker 9 super("aForkJoinWorkerThread"); 10 this.pool = pool; 11 this.workQueue = pool.registerWorker(this); //向ForkJoinPool执行池注册当前工作线程,ForkJoinPool为其分配一个工作队列 12 } 13 14 //该工作线程的执行内容就是执行工作队列中的任务 15 public void run() { 16 if (workQueue.array == null) { // only run once 17 Throwable exception = null; 18 try { 19 onStart(); 20 pool.runWorker(workQueue); //执行工作队列中的任务 21 } catch (Throwable ex) { 22 exception = ex; //记录异常 23 } finally { 24 try { 25 onTermination(exception); 26 } catch (Throwable ex) { 27 if (exception == null) 28 exception = ex; 29 } finally { 30 pool.deregisterWorker(this, exception); //撤销工作 31 } 32 } 33 } 34 } 35 36 ..... 37 }
与FutureTask一样, ForkJoinTask也是Future的子类,不过它是一个抽象类,其实现过程中与ForkJoinPool相互交叉,因此其源码在不理解ForkJoinPool的情况下很难全部看明白,这里只了解大概,ForkJoinTask的作用就是根据任务的分解实现(exec抽象方法),将任务进行拆分,并等待子任务的执行结果,由此可以组合成父任务的结果,以此类推。
ForkJoinTask有一个int类型的status字段,其高16位存储任务执行状态例如NORMAL、CANCELLED或EXCEPTIONAL,低16位预留用于用户自定义的标记。任务未完成之前status大于等于0,完成之后就是NORMAL、CANCELLED或EXCEPTIONAL这几个小于0的值,这几个值也是按大小顺序的:0(初始状态) > NORMAL > CANCELLED > EXCEPTIONAL.
1 public abstract class ForkJoinTask<V> implements Future<V>, Serializable { 2 3 /** 该任务的执行状态 */ 4 volatile int status; // accessed directly by pool and workers 5 static final int DONE_MASK = 0xf0000000; // mask out non-completion bits 6 static final int NORMAL = 0xf0000000; // must be negative 7 static final int CANCELLED = 0xc0000000; // must be < NORMAL 8 static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED 9 static final int SIGNAL = 0x00010000; // must be >= 1 << 16 10 static final int SMASK = 0x0000ffff; // short bits for tags 11 12 // 异常哈希表 13 14 //被任务抛出的异常数组,为了报告给调用者。因为异常很少见,所以我们不直接将它们保存在task对象中,而是使用弱引用数组。注意,取消异常不会出现在数组,而是记录在statue字段中 15 //注意这些都是 static 类属性,所有的ForkJoinTask共用的。 16 private static final ExceptionNode[] exceptionTable; //异常哈希链表数组 17 private static final ReentrantLock exceptionTableLock; 18 private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收之后,相应的异常节点对象的引用队列 19 20 /** 21 * 固定容量的exceptionTable. 22 */ 23 private static final int EXCEPTION_MAP_CAPACITY = 32; 24 25 26 //异常数组的键值对节点。 27 //该哈希链表数组使用线程id进行比较,该数组具有固定的容量,因为它只维护任务异常足够长,以便参与者访问它们,所以在持续的时间内不应该变得非常大。但是,由于我们不知道最后一个joiner何时完成,我们必须使用弱引用并删除它们。我们对每个操作都这样做(因此完全锁定)。此外,任何ForkJoinPool池中的一些线程在其池变为isQuiescent时都会调用helpExpungeStaleExceptions 28 static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { 29 final Throwable ex; 30 ExceptionNode next; 31 final long thrower; // 抛出异常的线程id 32 final int hashCode; // 在弱引用消失之前存储hashCode 33 ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) { 34 super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收之后,会将该节点加入队列exceptionTableRefQueue 35 this.ex = ex; 36 this.next = next; 37 this.thrower = Thread.currentThread().getId(); 38 this.hashCode = System.identityHashCode(task); 39 } 40 } 41 42 ................. 43 }
除了status记录任务的执行状态之外,其他字段主要是为了对任务执行的异常的处理,ForkJoinTask采用了哈希数组 + 链表的数据结构(JDK8以前的HashMap实现方法)存放所有(以为这些字段是static)的ForkJoinTask任务的执行异常。
源码很简单,就不贴了,该方法其实就是将任务通过push方法加入到当前工作线程的工作队列或者提交队列(外部非ForkJoinWorkerThread线程通过submit、execute方法提交的任务),等待被线程池调度执行,这是一个非阻塞的立即返回方法。这里需要知道,ForkJoinPool线程池通过哈希数组+双端队列的方式将所有的工作线程拥有的任务队列和从外部提交的任务分别映射到哈希数组的不同槽位上,下一篇会介绍。将新任务始终push到队列一端的方式可以保证比较大的任务在队列的头部,越小的任务越在尾部,这时候拥有该任务队列的线程如果按照先进后出的方式pop弹出任务执行的话(这时候的任务队列就是当着栈来使用),将会是先从小任务开始,逐渐往大任务进行。而窃取任务的其他线程从对列头部开始窃取的话,将会帮助它完成大任务。
1 //当计算完成时返回计算结果。此方法与get()的不同之处在于,异常完成会导致RuntimeException或Error,而不是ExecutionException,调用线程被中断不会通过抛出InterruptedException导致方法突然返回。 2 public final V join() { 3 int s; 4 if ((s = doJoin() & DONE_MASK) != NORMAL) 5 reportException(s); //非正常结束,抛出相关的异常堆栈信息 6 return getRawResult(); //正常结束,返回结果 7 } 8 9 //等待任务执行结束并返回其状态status,该方法实现了join, get, quietlyJoin. 直接处理已经完成的,外部等待和unfork+exec的情况,其它情况转发到ForkJoinPool.awaitJoin 10 //如果 status < 0 则返回s; 11 //否则,若不是ForkJoinWorkerThread ,则等待 externalAwaitDone() 返回 12 //否则,若 (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 则 返回s; 13 //否则,返回 wt.pool.awaitJoin(w, this, 0L) 14 private int doJoin() { 15 int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; 16 return (s = status) < 0 ? s : //status为负数表示任务已经执行结束,直接返回status。 17 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 18 (w = (wt = (ForkJoinWorkerThread)t).workQueue). 19 tryUnpush(this) && (s = doExec()) < 0 ? s : //调用pool的执行逻辑,并等待返回执行结果状态 20 wt.pool.awaitJoin(w, this, 0L) : //调用pool的等待机制 21 externalAwaitDone(); //不是ForkJoinWorkerThread, 22 } 23 24 //抛出与给定状态关联的异常(如果有),被取消是CancellationException。 25 private void reportException(int s) { 26 if (s == CANCELLED) 27 throw new CancellationException(); 28 if (s == EXCEPTIONAL) 29 rethrow(getThrowableException()); 30 } 31 32 public abstract V getRawResult(); 33 34 //返回给定任务的执行异常(如果有的话),为了提供准确的异常堆栈信息,若异常不是由当前线程抛出的,将尝试以记录的异常为原因创建一个与抛出异常类型相同的新异常。 35 //如果没有那样的构造方法将尝试使用无参的构造函数,并通过设置initCause方法以达到同样的效果,尽管它可能包含误导的堆栈跟踪信息。 36 private Throwable getThrowableException() { 37 if ((status & DONE_MASK) != EXCEPTIONAL) 38 return null; 39 40 //1. 通过当前任务对象的哈希值到哈希链表数组中找到相应的异常节点 41 int h = System.identityHashCode(this); //当前任务的hash值 42 ExceptionNode e; 43 final ReentrantLock lock = exceptionTableLock; 44 lock.lock(); //加锁 45 try { 46 expungeStaleExceptions(); //清理被GC回收的任务的异常节点 47 ExceptionNode[] t = exceptionTable; 48 e = t[h & (t.length - 1)]; //通过取模对应得索引获取哈希数组槽位中得节点 49 while (e != null && e.get() != this) 50 e = e.next; //遍历找到当前任务对应的异常节点 51 } finally { 52 lock.unlock(); 53 } 54 Throwable ex; 55 if (e == null || (ex = e.ex) == null) //表示没有出现任何异常 56 return null; 57 if (e.thrower != Thread.currentThread().getId()) { //有异常但是不是由当前线程抛出的 58 Class<? extends Throwable> ec = ex.getClass(); 59 try { 60 Constructor<?> noArgCtor = null; 61 Constructor<?>[] cs = ec.getConstructors();// public ctors only 62 //通过反射找到构造方法,并构造新异常 63 for (int i = 0; i < cs.length; ++i) { 64 Constructor<?> c = cs[i]; 65 Class<?>[] ps = c.getParameterTypes(); 66 if (ps.length == 0) 67 noArgCtor = c; //记录下无参构造方法,以备没有找到期望的构造方法时使用 68 else if (ps.length == 1 && ps[0] == Throwable.class) { 69 Throwable wx = (Throwable)c.newInstance(ex); //发现了我们期望的Throwable类型的参数的构造方法 70 return (wx == null) ? ex : wx; 71 } 72 } 73 if (noArgCtor != null) { //没有找到期望的构造方法,只能通过无参构造方法创建新异常 74 Throwable wx = (Throwable)(noArgCtor.newInstance()); 75 if (wx != null) { 76 wx.initCause(ex); //将原始异常设置进去 77 return wx; 78 } 79 } 80 } catch (Exception ignore) { 81 } 82 } 83 return ex; 84 } 85 86 87 88 //清除哈希链表数组中已经被GC回收掉的任务的异常节点。从exceptionTableRefQueue节点引用队列中获取异常节点并移除哈希链表数组中得对应节点 89 private static void expungeStaleExceptions() { 90 for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { 91 if (x instanceof ExceptionNode) { 92 int hashCode = ((ExceptionNode)x).hashCode; //节点hash 93 ExceptionNode[] t = exceptionTable; 94 int i = hashCode & (t.length - 1); //取模得到哈希表索引 95 ExceptionNode e = t[i]; 96 ExceptionNode pred = null; 97 while (e != null) { 98 ExceptionNode next = e.next; 99 if (e == x) { //找到了目标节点 100 if (pred == null) 101 t[i] = next; 102 else 103 pred.next = next; 104 break; 105 } 106 pred = e; //往后遍历链表 107 e = next; 108 } 109 } 110 } 111 } 112 113 114 //窃取任务的主要执行方法,除非已经完成了,否则调用exec()并记录完成时的状态。 115 final int doExec() { 116 int s; boolean completed; 117 if ((s = status) >= 0) { //任务还未完成 118 try { 119 completed = exec(); 调用exec()并记录完成时的状态。 120 } catch (Throwable rex) { 121 return setExceptionalCompletion(rex); //记录异常并返回相关状态,并唤醒通过join等待此任务的线程。 122 } 123 if (completed) 124 s = setCompletion(NORMAL); //更新状态为正常结束,并唤醒通过join等待此任务的线程。 125 } 126 return s; 127 } 128 129 //立即执行此任务的基本操作。返回true表示该任务已经正常完成,否则返回false表示此任务不一定完成(或不知道是否完成)。 130 //此方法还可能抛出(未捕获的)异常,以指示异常退出。此方法旨在支持扩展,一般不应以其他方式调用。 131 protected abstract boolean exec(); 132 133 //等待未完成的非ForkJoinWorkerThread线程提交的任务执行结束,并返回任务状态status 134 private int externalAwaitDone() { 135 136 //若是CountedCompleter任务,等待ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>)this, 0) 返回 137 //否则,若ForkJoinPool.common.tryExternalUnpush(this),返回 doExec() 结果; 138 //否则,返回0 139 int s = ((this instanceof CountedCompleter) ? // try helping 140 ForkJoinPool.common.externalHelpComplete( 141 (CountedCompleter<?>)this, 0) : //辅助完成外部提交的CountedCompleter任务 142 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); //辅助完成外部提交的非CountedCompleter任务 143 if (s >= 0 && (s = status) >= 0) { //表示任务还没结束,需要阻塞等待。 144 boolean interrupted = false; 145 do { 146 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //标记有线程需要被唤醒 147 synchronized (this) { 148 if (status >= 0) { 149 try { 150 wait(0L); //任务还没结束,无限期阻塞直到被唤醒 151 } catch (InterruptedException ie) { 152 interrupted = true; 153 } 154 } 155 else 156 notifyAll(); //已经结束了唤醒所有阻塞的线程 157 } 158 } 159 } while ((s = status) >= 0); 160 if (interrupted) 161 Thread.currentThread().interrupt(); //恢复中断标识 162 } 163 return s; 164 } 165 166 167 //记录异常,更新status状态,唤醒所有等待线程 168 private int setExceptionalCompletion(Throwable ex) { 169 int s = recordExceptionalCompletion(ex); 170 if ((s & DONE_MASK) == EXCEPTIONAL) 171 internalPropagateException(ex); //调用钩子函数传播异常 172 return s; 173 } 174 175 /** 176 * 对任务异常结束的异常传播支持的钩子函数 177 */ 178 void internalPropagateException(Throwable ex) { 179 } 180 181 //记录异常并设置状态status 182 final int recordExceptionalCompletion(Throwable ex) { 183 int s; 184 if ((s = status) >= 0) { 185 int h = System.identityHashCode(this); //哈希值 186 final ReentrantLock lock = exceptionTableLock; 187 lock.lock(); //加锁 188 try { 189 expungeStaleExceptions(); 190 ExceptionNode[] t = exceptionTable; 191 int i = h & (t.length - 1); 192 for (ExceptionNode e = t[i]; ; e = e.next) { 193 if (e == null) { //遍历完了都没找到,说明哈希链表数组中不存在该任务对于的异常节点 194 t[i] = new ExceptionNode(this, ex, t[i]); //创建一个异常节点用头插法插入哈希链表数组 195 break; 196 } 197 if (e.get() == this) // 哈希链表数组中已经存在相应的异常节点,退出 198 break; 199 } 200 } finally { 201 lock.unlock(); 202 } 203 s = setCompletion(EXCEPTIONAL); 204 } 205 return s; 206 } 207 208 //标记任务完成标志,并唤醒通过join等待此任务的线程。 209 private int setCompletion(int completion) { 210 for (int s;;) { 211 if ((s = status) < 0) 212 return s; 213 if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { //更新状态 214 if ((s >>> 16) != 0) 215 synchronized (this) { notifyAll(); } //唤醒所有等待线程 216 return completion; 217 } 218 } 219 }
join方法就是ForkJoinTask最核心也最复杂的方法,就是等待任务执行结束并返回执行结果,若任务被取消抛出CancellationException异常,若是其他异常导致异常结束则抛出相关堆栈信息,这些异常还可能包括由于内部资源耗尽而导致的RejectedExecutionException,比如分配内部任务队列失败。异常的处理利用了另一个哈希数组 + 链表的结构。该方法不会由于线程被中断而抛出InterruptedException异常,而是会在等到任务执行结束之后再将中断状态复位。
该方法的执行过程中调用了一些未实现的抽象方法:exec方法就是执行任务的入口,任务的逻辑与拆分策略都由该方法实现,只有返回true才表示任务正常完成。该方法可以抛出异常以指示异常结束。getRawResult方法用于返回任务正常结束的执行结果。internalPropagateException方法则是当任务异常的回调钩子函数。一般来讲,我们都会在exec方法里面实现如下的貌似递归的拆分逻辑(伪代码):
1 if 任务足够小 then 2 执行任务; 3 返回结果; 4 else 5 拆分成两个子任务t1、t2 6 t1.fork(); //提交到任务队列 7 t2.fork(); //提交到任务队列 8 Object result = t1.join() + t2.join(); //合并结果,这里的加号仅仅代表合并结果,并不是做加法运行 9 return result; //返回最终结果
我们知道,fork负责将任务推入队列,等待被调度执行,join则是等待执行任务,并返回结果,而join在执行任务的时候最终就是调用的exec,而exec中任务已经足够小就直接执行,否则会拆分任务之后通过fork将拆分出的子任务再次加入队列,其子任务执行的时候依然会执行exec(假设子任务的exec也是这样的实现),到时候又会继续拆分,或者足够小就直接执行,两个子任务合并结果之后是其父任务的结果,两个父任务的结果又合并成祖父任务的结果,以此类推就是递归的完成了整个任务。
既然ForkJoinTask也是Future的子类,那么Future最重要的获取异步任务结果的get方法也必然要实现:
1 //如果需要,等待计算完成,然后检索其结果。 2 public final V get() throws InterruptedException, ExecutionException { 3 int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : //是ForkJoinWorkerThread,执行doJoin 4 externalInterruptibleAwaitDone(); //执行externalInterruptibleAwaitDone 5 Throwable ex; 6 if ((s &= DONE_MASK) == CANCELLED) 7 throw new CancellationException(); //被取消的抛出CancellationException 8 if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) 9 throw new ExecutionException(ex); //执行中出现异常的抛出相应的异常 10 return getRawResult(); //返回正常结果 11 } 12 13 //阻塞非ForkJoinWorkerThread线程,直到完成或中断。 14 private int externalInterruptibleAwaitDone() throws InterruptedException { 15 int s; 16 if (Thread.interrupted()) 17 throw new InterruptedException(); 18 if ((s = status) >= 0 && 19 (s = ((this instanceof CountedCompleter) ? 20 ForkJoinPool.common.externalHelpComplete( 21 (CountedCompleter<?>)this, 0) : 22 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 23 0)) >= 0) { //根据不同的任务类型 返回执行或暂时等待被执行的状态 24 while ((s = status) >= 0) { //需要阻塞等待 25 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { 26 synchronized (this) { 27 if (status >= 0) 28 wait(0L); //阻塞等待 29 else 30 notifyAll(); //唤醒所有等待线程 31 } 32 } 33 } 34 } 35 return s; 36 }
get方法也是通过实现join方法的doJoin方法实现的,不同的是,调用get方法的线程如果被中断的话,get方法会立即抛出InterruptedException异常,而join方法则不会。get方法采用的wait/notifyAll这种线程通信机制来实现阻塞与唤醒。另外还有超时版本的get方法这里就不贴代码了,由此可见get支持可中断和/或定时等待完成。
1 //开始执行此任务,如果需要等待其完成,并返回其结果,如果底层执行此任务时出现异常,则抛出相应的(未捕获的)RuntimeException或Error。 2 public final V invoke() { 3 int s; 4 if ((s = doInvoke() & DONE_MASK) != NORMAL) 5 reportException(s); 6 return getRawResult(); 7 } 8 9 // invoke, quietlyInvoke的实现 10 private int doInvoke() { 11 int s; Thread t; ForkJoinWorkerThread wt; 12 return (s = doExec()) < 0 ? s : //执行此任务,完成返回其status 13 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //若未完成或需要等待就根据不同任务类型执行不同的等待逻辑 14 (wt = (ForkJoinWorkerThread)t).pool. 15 awaitJoin(wt.workQueue, this, 0L) : 16 externalAwaitDone(); 17 }
invoke的实现会利用当前调用invoke的线程立即执行exec方法,当然如果exec方法的实现使用了fork/join,其还是会利用ForkJoinPool线程池的递归调度执行策略,等待子任务执行完成,一步步的合并成最终的任务结果,并返回。
1 //执行两个任务 2 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { 3 int s1, s2; 4 t2.fork(); //t2任务交给线程池调度执行 5 if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) //t1任务立即由当前线程执行 6 t1.reportException(s1); //若t1异常结束,则抛出异常,包括被取消的CancellationException 7 if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) //等待t2执行结束 8 t2.reportException(s2); //若t2异常结束,则抛出异常,包括被取消的CancellationException 9 } 10 11 //执行任务数组 12 public static void invokeAll(ForkJoinTask<?>... tasks) { 13 Throwable ex = null; 14 int last = tasks.length - 1; 15 for (int i = last; i >= 0; --i) { 16 ForkJoinTask<?> t = tasks[i]; 17 if (t == null) { 18 if (ex == null) //都不能为null 19 ex = new NullPointerException(); 20 } 21 else if (i != 0) 22 t.fork(); //除了第一个任务都交给线程池调度执行 23 else if (t.doInvoke() < NORMAL && ex == null) //由当前线程执行第一个任务 24 ex = t.getException(); //记录第一个任务的异常 25 } 26 for (int i = 1; i <= last; ++i) { 27 ForkJoinTask<?> t = tasks[i]; 28 if (t != null) { 29 if (ex != null) //第一个任务异常结束,取消其他所有任务 30 t.cancel(false); 31 else if (t.doJoin() < NORMAL) //有任务异常结束,记录异常 32 ex = t.getException(); 33 } 34 } 35 if (ex != null) 36 rethrow(ex); //若有任务异常结束,抛出数组最前面那个异常结束的任务的异常 37 } 38 39 //批量执行任务,返回每个任务对应的ForkJoinTask实例, 40 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) { 41 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) { 42 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()])); //将任务封装成ForkJoinTask,调用上面那个方法实现 43 return tasks; 44 } 45 //下面的逻辑与上面那个invokeAll也是一样的。 46 @SuppressWarnings("unchecked") 47 List<? extends ForkJoinTask<?>> ts = (List<? extends ForkJoinTask<?>>) tasks; 48 Throwable ex = null; 49 int last = ts.size() - 1; 50 for (int i = last; i >= 0; --i) { 51 ForkJoinTask<?> t = ts.get(i); 52 if (t == null) { 53 if (ex == null) 54 ex = new NullPointerException(); 55 } 56 else if (i != 0) 57 t.fork(); 58 else if (t.doInvoke() < NORMAL && ex == null) 59 ex = t.getException(); 60 } 61 for (int i = 1; i <= last; ++i) { 62 ForkJoinTask<?> t = ts.get(i); 63 if (t != null) { 64 if (ex != null) 65 t.cancel(false); 66 else if (t.doJoin() < NORMAL) 67 ex = t.getException(); 68 } 69 } 70 if (ex != null) 71 rethrow(ex); 72 return tasks; 73 }
批量任务的执行其实现都是排在前面的任务(只有两个参数是,第一个参数就是排在前面的任务,是数组或者队列时,索引越小的就是排在越前面的)由当前线程执行,后面的任务交给线程池调度执行,如果有多个任务都出现异常,只会抛出排在最前面那个任务的异常。
源码就不贴了,quietlyInvoke(),quietlyJoin()这两个方法就仅仅了是调用了doInvoke和doJoin,然后就没有然后了,它们就是不关心执行结果版本的invoke和Join,当然异常结束的也不会将异常抛出来,当执行一组任务并且需要将结果或异常的处理延迟到全部任务完成时,这可能很有用。
public boolean cancel(boolean mayInterruptIfRunning) { return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; }
其主要通过setCompletion标记尚未完成的任务的状态为CANCELLED,并唤醒通过join等待此任务的线程。已经执行完成的任务无法被取消,返回true表示取消成功。注意该方法传入的mayInterruptIfRunning并没有使用,因此,ForkJoinTask不支持在取消任务时中断已经开始执行的任务,当然ForkJoinTask的子类可以重写实现。
1 //取消任务的执行计划。如果此任务是当前线程最近才刚刚通过fork安排执行,并且尚未在另一个线程中开始执行,则此方法通常会成功,但也不是100%保证会成功。 2 public boolean tryUnfork() { 3 Thread t; 4 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 5 ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : //针对ForkJoinWorkerThread的取消逻辑 6 ForkJoinPool.common.tryExternalUnpush(this)); //针对外部提交任务的取消逻辑 7 }
tryUnfork尝试将该任务从任务队列中弹出,弹出之后线程池自然不会再调度该任务。该方法的实现只会在任务刚刚被推入任务队列,并且还处于任务队列的栈顶时才可能会成功,否则100%失败。
1 public void reinitialize() { 2 if ((status & DONE_MASK) == EXCEPTIONAL) //有异常 3 clearExceptionalCompletion(); //从哈希链表数组中移除当前任务的异常节点,并将status重置为0 4 else 5 status = 0; 6 }
如果任务异常结束,会从异常哈希表中清除该任务的异常记录,该方法仅仅是将任务状态status重置为0,使得该任务可以被重新执行。
任务的执行状态可以在多个详细级别上查询:
adapt方法主要是为了兼容传统的Runnable和Callable任务,通过adapt方法可以将它们封装成ForkJoinTask任务,当将 ForkJoinTask 与其他类型的任务混合执行时,可以使用这些方法。
getPool可以返回执行该任务的线程所在的线程池实例,inForkJonPool可以判定当前任务是否是由ForkJoinWorkerThread线程提交的,一般来说这意味着当前任务是内部拆分之后的子任务。
getQueuedTaskCount方法返回已经通过fork安排给当前工作线程执行,但还没有被执行的任务数量,该值是一个瞬间值。因为工作线程调度执行的任务通过fork提交的任务还是进入的该工作线程的任务队列,因此可以通过该任务得知该值。
其它一些方法:
1 //可能会在承载当前任务的执行池处于静默(空闲)状态时执行任务。这个方法可能在有很多任务都通过fork被安排执行,但是一个显示的join调用都没有,直到它们都被执行完的设计中使用。 2 //其实就是如果有一批任务被安排执行,并且不知道它们什么时候结束,如果希望在这些任务都执行结束之后再安排一个任务,就可以使用helpQuiesce。 3 public static void helpQuiesce() { 4 Thread t; 5 //根据执行线程的不同类型,调用不同的静默执行逻辑 6 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { 7 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; 8 wt.pool.helpQuiescePool(wt.workQueue); 9 } 10 else 11 ForkJoinPool.quiesceCommonPool(); 12 } 13 14 //返回被当前工作线程持有的任务数a比其它可能窃取其任务的其它工作线程持有的任务数b多多少的估计值,就是 a - b 的差值。若当前工作线程不是在ForkJoinPool中,则返回0 15 //通常该值被恒定在一个很小的值3,若超过这个阈值,则就在本地处理。 16 public static int getSurplusQueuedTaskCount() { 17 return ForkJoinPool.getSurplusQueuedTaskCount(); 18 } 19 20 //获取但不移除(即不取消执行计划)安排给当前线程的可能即将被执行的下一个任务。但不能保证该任务将在接下来实际被立即执行。该方法可能在即使任务存在但因为竞争而不可访问而返回null 21 //该方法主要是为了支持扩展,否则可能不会被使用。 22 protected static ForkJoinTask<?> peekNextLocalTask() { 23 Thread t; ForkJoinPool.WorkQueue q; 24 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 25 q = ((ForkJoinWorkerThread)t).workQueue; 26 else 27 q = ForkJoinPool.commonSubmitterQueue(); 28 return (q == null) ? null : q.peek(); 29 } 30 31 //获取并且移除(即取消执行)安排给当前线程的可能即将被执行的下一个任务。 32 //该方法主要是为了支持扩展,否则可能不会被使用。 33 protected static ForkJoinTask<?> pollNextLocalTask() { 34 Thread t; 35 return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 36 ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : 37 null; 38 } 39 40 //如果当前线程被ForkJoinPool运行,获取并且移除(即取消执行)当前线程即将可能执行的下一个任务。该任务可能是从其它线程中窃取来的。 41 //返回nulll并不一定意味着此任务正在操作的ForkJoinPool处于静止状态。该方法主要是为了支持扩展,否则可能不会被使用。 42 protected static ForkJoinTask<?> pollTask() { 43 Thread t; ForkJoinWorkerThread wt; 44 return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 45 (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) : 46 null; 47 }
通常ForkJoinTask只适用于非循环依赖的纯函数的计算或孤立对象的操作,否则,执行可能会遇到某种形式的死锁,因为任务循环地等待彼此。但是,这个框架支持其他方法和技术(例如使用Phaser、helpQuiesce和complete),这些方法和技术可用于构造解决这种依赖任务的ForkJoinTask子类,为了支持这些用法,可以使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地标记一个short类型的值,并使用getForkJoinTaskTag进行检查。ForkJoinTask实现没有将这些受保护的方法或标记用于任何目的,但是它们可以用于构造专门的子类,由此可以使用提供的方法来避免重新访问已经处理过的节点/任务。
ForkJoinTask应该执行相对较少的计算,并且应该避免不确定的循环。大任务应该被分解成更小的子任务,通常通过递归分解。如果任务太大,那么并行性就不能提高吞吐量。如果太小,那么内存和内部任务维护开销可能会超过处理开销。
ForkJoinTask是可序列化的,这使它们能够在诸如远程执行框架之类的扩展中使用。只在执行之前或之后序列化任务才是明智的,而不是在执行期间。
通常我们不会直接实现ForkJoinTask,而是实现其三个抽象子类,ForkJoinTask仅仅是为了配合ForkJoinPool实现任务的调度执行,通常我们使用的时候,仅仅只需要提供任务的拆分与执行即可,RecursiveAction 用于大多数不返回结果的计算, RecursiveTask 用于返回结果的计算, CountedCompleter 用于那些操作完成之后触发其他操作的操作。
1 public abstract class RecursiveAction extends ForkJoinTask<Void> { 2 private static final long serialVersionUID = 5232453952276485070L; 3 4 /** 5 * The main computation performed by this task. 6 */ 7 protected abstract void compute(); 8 9 /** 10 * Always returns {@code null}. 11 * 12 * @return {@code null} always 13 */ 14 public final Void getRawResult() { return null; } 15 16 /** 17 * Requires null completion value. 18 */ 19 protected final void setRawResult(Void mustBeNull) { } 20 21 /** 22 * Implements execution conventions for RecursiveActions. 23 */ 24 protected final boolean exec() { 25 compute(); 26 return true; 27 } 28 29 }
RecursiveAction很简单,作为不返回结果的任务,其getRawResult方法永远返回null,setRawResult方法则什么都不做,它增加了一个无返回值的compute抽象方法,用于当ForkJoinTask被调度执行exec方法时调用,exec方法在执行完compute之后直接返回true,表示任务正常结束,而compute方法就是留给我们去实现大任务如何拆小任务,小任务怎么执行的逻辑。
1 public abstract class RecursiveTask<V> extends ForkJoinTask<V> { 2 private static final long serialVersionUID = 5232453952276485270L; 3 4 /** 5 * The result of the computation. 6 */ 7 V result; 8 9 /** 10 * The main computation performed by this task. 11 * @return the result of the computation 12 */ 13 protected abstract V compute(); 14 15 public final V getRawResult() { 16 return result; 17 } 18 19 protected final void setRawResult(V value) { 20 result = value; 21 } 22 23 /** 24 * Implements execution conventions for RecursiveTask. 25 */ 26 protected final boolean exec() { 27 result = compute(); 28 return true; 29 } 30 31 }
RecursiveTask也很简单,既然要返回结果,所以它定义了一个表示执行结果的result字段,getRawResult/setRawResult就用来操作该字段,它增加了一个有返回值的compute抽象方法,用于当ForkJoinTask被调度执行exec方法时调用,exec方法在执行完compute之后,将compute的返回结果作为任务的执行结果赋值给result,并最终返回true表示任务正常结束,同样compute方法也是留给我们去实现大任务如何拆小任务,小任务怎么执行,并且返回任务执行结果的逻辑。
CountedCompleter是Java8才新增的一个ForkJoinTask的抽象子类,比前面两个要复杂的多(反正我目前是没怎么弄明白),它是为了在任务完成之后调用钩子函数,它可以支持返回结果(这时应该重写方法getRawResult()以提供join()、invoke()和相关方法的结果),也可以不返回结果,默认其getRawResult方法永远返回null,setRawResult方法也什么都不做。
它也增加了一个无返回值的compute抽象方法,用于当ForkJoinTask被调度执行exec方法时调用,但它的exec方法在正常执行完compute之后,永远都返回的是false,表示任务没有正常结束,根据ForkJoinTask的doExec方法实现,对于CountedCompleter正常结束但返回false这种实现,将导致不会执行setCompletion即改变任务的状态也不会唤醒等待该任务的线程,这都交给了CountedCompleter自己来完成,而compute若异常结束则还是按原来的逻辑记录异常到哈希链表数组中,然后改变任务的状态为EXCEPTIONAL,因此只有在显式调用complete(T)、ForkJoinTask.cancel(boolean)、ForkJoinTask.completeExceptionally(Throwable)或compute异常完成时,任务状态才会更改。
CountedCompleter在创建实例的时候还可以传入一个CountedCompleter实例,因此可以形成树状的任务结构,树上的所有任务是可以并行执行的,且每一个子任务完成后都可以通过tryComplete辅助其父任务的完成,CountedCompleter的代码量很少区区300行,但其设计理念却很有用,CountedCompleters在存在子任务停滞和阻塞的情况下比其他形式的ForkJoinTasks更健壮,但编程的直观性较差。CountedCompleter的使用类似于其他基于完成的组件(例如java.nio.channel.completionhandler),除了在完成时触发onCompletion(CountedCompleter)可能需要多个挂起任务的完成,而不仅仅是一个。除非进行了其他初始化,否则挂起计数从0开始,但是可以使用setPendingCount、addToPendingCount和compareAndSetPendingCount方法(原子性地)更改挂起计数。在调用tryComplete时,如果挂起的操作计数非零,则递减;否则,将触发onCompletion操作,如果该CountedCompleters本身还有一个CountedCompleters,则继续对其CountedCompleters进行该过程。
一个CountedCompleter实现类必须实现compute方法,在大多数情况下(如下所示),在其返回之前应该调用一次tryComplete()。该类还可以选择性地重写方法onCompletion(CountedCompleter),以便在正常完成时执行想要的操作,以及方法onExceptionalCompletion(Throwable, CountedCompleter),以便在任何异常时执行操作。
CountedCompleters通常不返回结果,在这种情况下,它们通常被声明为CountedCompleter<Void>,并且总是返回null作为结果值。在其他情况下,您应该重写方法getRawResult来提供来自join()、invoke()和相关方法的结果。通常,该方法应该返回CountedCompleter对象的一个字段(或一个或多个字段的函数)的值,该对象在完成时保存结果。方法setRawResult在CountedCompleters中默认什么也不做。重写此方法以维护包含结果数据的其他对象或字段是可能的,但很少适用。
一个没有其他CountedCompleter(例如getCompleter返回null)的CountedCompleter可以作为一个具有附加功能的常规的ForkJoinTask。然而,任何一个有其他CountedCompleter的CountedCompleter,那么它只能作为其他计算的内部辅助,因此它自身的任务状态(例如ForkJoinTask.isDone等方法报告的)将是无意义的,其状态只在显示的调用complete,ForkJoinTask.cancel, ForkJoinTask.completeExceptionally(Throwable) 或者compute方法抛出异常时才会被改变。在任何异常完成之后,如果存在一个并且还没有以其他方式完成的任务,则可能会将异常传播给任务的 CountedCompleter(以及它的CountedCompleter,以此类推)。类似地,取消一个内部CountedCompleter只会对该completer产生局部影响,因此通常不太有用。
CountedCompleters是一个抽象类,要理解它有点难度,其编程实现可以达到的目的也灵活多变,就Java doc提供了几个示例可以看出,CountedCompleters可以将任务递归分解成基于树的形状,基于树的技术通常也比直接fork叶子任务更可取,因为它们减少了线程间的通信并提高了负载平衡。它还可以用于搜索查找过程中,当一个线程搜索到目标就可以让根节点任务完成,其他子节点任务自发终止。还可以用于计算任务时子节点任务合并结果得父节点任务又与其兄弟节点合并结果得到祖父任务结果,以此类推这样的场景等等,总之CountedCompleter类的使用非常灵活,如java并行流中涉及到的运行集拆分,结果合并,运算调度等就有用到它。
Java并发包线程池之ForkJoinPool即ForkJoin框架(一)
标签:线程通信 核心 技术 exception term useful current error 维护
原文地址:https://www.cnblogs.com/txmfz/p/11234119.html