码迷,mamicode.com
首页 > 其他好文 > 详细

ForkJoinPool 源码分析

时间:2018-12-15 11:46:53      阅读:138      评论:0      收藏:0      [点我收藏+]

标签:version   process   rom   pac   进入   异步   单向链表   init   ica   

ForkJoinPool

ForkJoinPool 是一个运行 ForkJoinTask 任务、支持工作窃取和并行计算的线程池

核心参数+创建实例

    // 工作者线程驻留任务队列索引位
    static final int SWIDTH       = 16;            
    // 低 16 位掩码
    static final int SMASK        = 0xffff;        
    // 最大并行度:#workers - 1
    static final int MAX_CAP      = 0x7fff;       
    // 最大工作队列数、提交队列数
    static final int SQMASK       = 0x007e;     
    
    // 工作者线程需要唤醒信号
    static final int UNSIGNALLED  = 1 << 31;       // must be negative
    // 防止 ABA 问题的版本号
    static final int SS_SEQ       = 1 << 16;       
    // 提交队列锁
    static final int QLOCK        = 1;             

    // 任务队列拥有所有者【驻留线程】
    static final int OWNED        = 1;             
    // 任务是先进先出的
    static final int FIFO         = 1 << 16;       
    // 线程池正在关闭
    static final int SHUTDOWN     = 1 << 18;
    // 线程池已经停止
    static final int TERMINATED   = 1 << 19;
    // 线程池正在停止
    static final int STOP         = 1 << 31;       // must be negative
    // 任务队列处于静止状态
    static final int QUIET        = 1 << 30; 
    // 任务队列处于静止状态 && 驻留线程已经阻塞、需要唤醒信号
    static final int DORMANT      = QUIET | UNSIGNALLED;

    /**
     *  工作者线程能在驻留的工作队列中一次性处理的最大任务数
     */
    static final int POLL_LIMIT = 1 << 10;

    /**
     *  默认的工作者线程工厂
     */
    public static final ForkJoinWorkerThreadFactory
    defaultForkJoinWorkerThreadFactory;

    /**
     * Common (static) pool. 通用 ForkJoinPool
     */
    static final ForkJoinPool common;

    /**
     *  通用线程池的并行度
     */
    static final int COMMON_PARALLELISM;

    /**
     *  通用线程池最大补偿的工作线程数
     */
    private static final int COMMON_MAX_SPARES;

    /**
     *  线程池序列号,用于生成工作者线程名称 
     */
    private static int poolNumberSequence;

    /**
     *  默认的工作者线程超时时间,以毫秒为单位,即 60 秒
     */
    private static final long DEFAULT_KEEPALIVE = 60_000L;

    /**
     * Undershoot tolerance for idle timeouts【超时微调】
     */
    private static final long TIMEOUT_SLOP = 20L;

    private static final int DEFAULT_COMMON_MAX_SPARES = 256;

    private static final int SEED_INCREMENT = 0x9e3779b9;

    /**
     * 64 位控制变量的组成,从高到低每 16 位一个单元
     * RC: 正在运行的工作者线程数 - parallelism
     * TC: 正在运行的工作者线程数 + 阻塞等待的工作者线程数 - parallelism
     * SS: 版本号 + 最近阻塞的线程状态
     * ID: 最近阻塞的工作者线程所在的工作队列索引
     *
     * ac 为负数:表示运行中的工作者线程不够
     * tc 为负数:表示总的工作者线程不够
     * sp != 0:表示有工作者线程在阻塞等待
     */
    private static final long SP_MASK    = 0xffffffffL;
    private static final long UC_MASK    = ~SP_MASK;

    private static final int  RC_SHIFT   = 48;
    // 单个工作者线程计数,用于 RC
    private static final long RC_UNIT    = 0x0001L << RC_SHIFT;
    // 控制变量 48-64 位掩码
    private static final long RC_MASK    = 0xffffL << RC_SHIFT;

    private static final int  TC_SHIFT   = 32;
    // 单个工作者线程计数,用于 TC
    private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
    // 控制变量 32-48 位掩码
    private static final long TC_MASK    = 0xffffL << TC_SHIFT;
    // 尝试增加一个工作者线程,工作者线程数 < parallelism 
    private static final long ADD_WORKER = 0x0001L << TC_SHIFT + 15; // sign

    // 工作者线程的窃取任务总数
    volatile long stealCount;            
    // 工作者线程的空闲存活时间
    final long keepAlive;                
    // 下一个工作队列索引
    int indexSeed;                   
    // 最大、最小线程数
    final int bounds;
    /**
     * runstate:第 18、19、30、31 位
     * queue mode:第 16 位
     * parallelism:1-15 位,最大并行度为 1<<15-1
     */
    volatile int mode;                 
    // 已注册的工作队列
    WorkQueue[] workQueues;            
    // 工作者线程名称前缀,同时作为创建工作队列时的 synchronized 锁
    final String workerNamePrefix;       
    // 创建工作者线程的工厂
    final ForkJoinWorkerThreadFactory factory;
    // 异常处理器
    final UncaughtExceptionHandler ueh;
    // 线程池饱和断言
    final Predicate<? super ForkJoinPool> saturate;
    // 核心控制变量
    @jdk.internal.vm.annotation.Contended("fjpctl") 
    volatile long ctl;

    public ForkJoinPool() {
        /**
         * 1)并行度为当前 JVM 的 CPU 总数 Runtime.getRuntime().availableProcessors()
         * 2)使用默认的线程工厂
         * 3)无异常处理器
         * 4)工作队列的任务处理方式为 FILO
         * 提交队列的任务处理方式为 FIFO,工作窃取
         * 5)默认的核心工作者线程数为 0
         * 6)默认的最大工作者线程数为 32767
         * 7)默认工作者线程空闲超时时间为 60 秒
         */
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
                defaultForkJoinWorkerThreadFactory, null, false,
                0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
    }

    /**
     * @param parallelism 并行度【影响任务队列的长度和工作者线程数】
     */
    public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
                0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
    }

    /**
     * @param parallelism 并行度
     * @param factory 工作者线程工厂
     * @param handler 异常处理器
     * @param asyncMode 任务处理模式
     */
    public ForkJoinPool(int parallelism,
            ForkJoinWorkerThreadFactory factory,
            UncaughtExceptionHandler handler,
            boolean asyncMode) {
        this(parallelism, factory, handler, asyncMode,
                0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
    }

    /**
     * @param parallelism ForkJoinPool 的并行度
     * @param factory 工作者线程工厂
     * @param handler 每个工作者线程的异常处理器
     * @param asyncMode 工作队列的任务处理模式,默认是 false【FILO】,
     *  消息模式下可以指定为 FIFO
     * @param corePoolSize 核心工作者线程数,默认等于 parallelism
     * @param maximumPoolSize 最大工作者线程数
     * @param minimumRunnable 最小可用工作者线程数
     * @param saturate 当线程池尝试创建 > maximumPoolSize 的工作者线程时,目标任务将被拒绝,
     *  如果饱和断言返回 true,则该任务将继续执行
     * @param keepAliveTime 工作线程的空闲超时时间
     * @param unit keepAliveTime 的时间单位
     * @since 9
     */
    public ForkJoinPool(int parallelism,
            ForkJoinWorkerThreadFactory factory,
            UncaughtExceptionHandler handler,
            boolean asyncMode,
            int corePoolSize,
            int maximumPoolSize,
            int minimumRunnable,
            Predicate<? super ForkJoinPool> saturate,
            long keepAliveTime,
            TimeUnit unit) {
        // check, encode, pack parameters
        if (parallelism <= 0 || parallelism > MAX_CAP ||
                maximumPoolSize < parallelism || keepAliveTime <= 0L) {
            throw new IllegalArgumentException();
        }
        if (factory == null) {
            throw new NullPointerException();
        }
        // 计算超时时间
        final long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
        // 核心工作者线程数
        final int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
        // 计算 ctl
        final long c = (long)-corep << TC_SHIFT & TC_MASK |
                (long)-parallelism << RC_SHIFT & RC_MASK;
        // 计算 mode
        final int m = parallelism | (asyncMode ? FIFO : 0);
        final int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
        // 最小可用工作者线程数
        final int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
        final int b = minAvail - parallelism & SMASK | maxSpares << SWIDTH;
        int n = parallelism > 1 ? parallelism - 1 : 1; // at least 2 slots
        n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
        n = n + 1 << 1; // power of two, including space for submission queues

        workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
        workQueues = new WorkQueue[n];
        this.factory = factory;
        ueh = handler;
        this.saturate = saturate;
        keepAlive = ms;
        bounds = b;
        mode = m;
        ctl = c;
        checkPermission();
    }

提交任务

    /**
     *  往线程池中提交一个 Runnable 任务
     */
    @Override
    @SuppressWarnings("unchecked")
    public ForkJoinTask<?> submit(Runnable task) {
        if (task == null) {
            throw new NullPointerException();
        }
        // 如果 task 不是 ForkJoinTask 子类实例,则执行适配
        return externalSubmit(task instanceof ForkJoinTask<?>
        ? (ForkJoinTask<Void>) task // avoid re-wrap
                : new ForkJoinTask.AdaptedRunnableAction(task));
    }

    private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
        Thread t; ForkJoinWorkerThread w; WorkQueue q;
        if (task == null) {
            throw new NullPointerException();
        }
        /**
         * 1)如果任务提交线程是 ForkJoinWorkerThread 实例 && 
         * 工作者线程关联的 ForkJoinPool 就是当前线程池 && 
         * 工作者线程驻留的任务队列不为 null
         * 则优先提交到自己的任务队列
         */
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
                (w = (ForkJoinWorkerThread)t).pool == this &&
                (q = w.workQueue) != null) {
            q.push(task);
        // 2)将任务提交到共享提交队列   
        } else {
            externalPush(task);
        }
        return task;
    }



ForkJoinTask#
    /**
     *  将 Runnable 任务适配为 ForkJoinTask 任务
     */
    static final class AdaptedRunnableAction extends ForkJoinTask<Void>
    implements RunnableFuture<Void> {
        /**
         * 实际运行的任务
         */
        final Runnable runnable;
        AdaptedRunnableAction(Runnable runnable) {
            if (runnable == null) {
                throw new NullPointerException();
            }
            this.runnable = runnable;
        }
        @Override
        public Void getRawResult() { return null; }
        @Override
        public void setRawResult(Void v) { }
        // 运行 Runnable 任务
        @Override
        public boolean exec() { runnable.run(); return true; }
        @Override
        public void run() { invoke(); }
        @Override
        public String toString() {
            return super.toString() + "[Wrapped task = " + runnable + "]";
        }
        private static final long serialVersionUID = 5232453952276885070L;
    }

    /**
     *  将一个 ForkJoinTask 提交到一个提交队列中
     */
    final void externalPush(ForkJoinTask<?> task) {
        int r;                                // initialize caller‘s probe
        // 如果调用线程的探测值为 0
        if ((r = ThreadLocalRandom.getProbe()) == 0) {
            // 初始化线程局部随机数的探测值
            ThreadLocalRandom.localInit();
            /**
             * 读取探测值【同一个线程只要不调用 advanceProbe 方法,探测值是不变的,
             * 同一个线程提交任务时只要不出现竞争,就会将任务提交到相同的提交队列中】
             */
            r = ThreadLocalRandom.getProbe();
        }
        for (;;) {
            // 读取 mode
            final int md = mode;
            int n;
            // 读取工作队列
            WorkQueue[] ws = workQueues;
            // 线程池已经设置了 SHUTDOWN 标识 || 工作队列为空,则拒绝提交任务
            if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0) {
                throw new RejectedExecutionException();
            } else {
                WorkQueue q;
                // push 是否添加成功,grow 是否需要进行扩容
                boolean push = false, grow = false;
                // 1)基于探测值掩码定位到指定索引的任务队列,如果该任务队列还未创建
                if ((q = ws[n - 1 & r & SQMASK]) == null) {
                    // 读取工作者名称前缀
                    final Object lock = workerNamePrefix;
                    /**
                     * 基于探测值、队列静止标识、非 FIFO 标识、无所有者标识
                     * 生成队列 ID
                     */
                    final int qid = (r | QUIET) & ~(FIFO | OWNED);
                    // 创建提交队列
                    q = new WorkQueue(this, null);
                    // 写入队列 ID
                    q.id = qid;
                    // 队列设置为静止状态
                    q.source = QUIET;
                    // 锁定队列
                    q.phase = QLOCK;          // lock queue
                    if (lock != null) {
                        // 同步锁定队列
                        synchronized (lock) { // lock pool to install
                            int i;
                            /**
                             * 再次判断指定目标索引下的任务队列是否还为空,
                             * 出现竞争时可能被其他线程提前写入
                             */
                            if ((ws = workQueues) != null &&
                                    (n = ws.length) > 0 &&
                                    ws[i = qid & n - 1 & SQMASK] == null) {
                                // 写入新建队列
                                ws[i] = q;
                                // 新建队列后需要扩容,并提交任务
                                push = grow = true;
                            }
                        }
                    }
                }
                // 2)提交队列已经存在,则尝试获得共享锁
                else if (q.tryLockSharedQueue()) {
                    // 获取锁成功,读取 base 和 top 值
                    final int b = q.base, s = q.top;
                    int al, d; ForkJoinTask<?>[] a;
                    // 1)提交队列的任务数组不为空 && 任务数组未满
                    if ((a = q.array) != null && (al = a.length) > 0 &&
                            al - 1 + (d = b - s) > 0) {
                        // 基于 top 值定位数组索引后写入任务
                        a[al - 1 & s] = task;
                        // 递增 top 值并写入
                        q.top = s + 1;        // relaxed writes OK here
                        // 释放共享锁
                        q.phase = 0;
                        // 队列中有多于 1 个任务
                        if (d < 0 && q.base - s < -1)
                        {
                            break;            // no signal needed
                        }
                    // 数组已满则需要进行扩容  
                    } else {
                        grow = true;
                    }
                    // 添加任务成功
                    push = true;
                }
                if (push) {
                    // 如果需要执行扩容
                    if (grow) {
                        try {
                            // 执行任务队列的扩容
                            q.growArray();
                            // 读取 top 值
                            final int s = q.top;
                            int al; ForkJoinTask<?>[] a;
                            if ((a = q.array) != null && (al = a.length) > 0) {
                                // 写入任务
                                a[al - 1 & s] = task;
                                // 递增 top 值并写入
                                q.top = s + 1;
                            }
                        } finally {
                            // 是否共享锁
                            q.phase = 0;
                        }
                    }
                    // 添加任务成功,则尝试添加或唤醒工作者线程
                    signalWork();
                    break;
                } else {
                    // 出现线程竞争时,重新生成探测值并进行重试
                    r = ThreadLocalRandom.advanceProbe(r);
                }
            }
        }
    }


    static final class WorkQueue {
        /**
         *  任务数组的初始化容量为 8192
         */
        static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
        /**
         *  任务数组的最大容量
         */
        static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
        /**
         *  值为负数:(任务队列的 ID +累计版本号) | UNSIGNALLED
         *  0:队列未锁定
         *  1:队列被锁定
         */
        volatile int phase;      
        // 队列进入静止状态时的控制变量值,可能包含前一个静止的任务队列信息
        int stackPred;            
        // 工作者线程窃取的任务数
        int nsteals;               
        int id;                    // index, mode, tag
        // 上一次窃取的工作队列 ID 或哨兵值
        volatile int source;       
        // 执行 poll 操作的索引
        volatile int base;         
        // 执行 push 或 pop 操作的索引
        int top;                   
        // 底层存储 ForkJoinTask 的数组
        ForkJoinTask<?>[] array;   
        // 任务队列所在的线程池
        final ForkJoinPool pool;   
        // 工作队列驻留的工作者线程,共享提交队列为 null
        final ForkJoinWorkerThread owner; 

        /**
         * 尝试锁定共享提交队列
         */
        boolean tryLockSharedQueue() {
            return PHASE.compareAndSet(this, 0, QLOCK);
        }

        /**
         *  初始化队列或执行双倍扩容
         */
        ForkJoinTask<?>[] growArray() {
            // 读取旧队列
            final ForkJoinTask<?>[] oldA = array;
            // 读取旧 size
            final int oldSize = oldA != null ? oldA.length : 0;
            // 计算新 size,初始化容量为 8192
            final int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
            // size 小于 8192 或大于 64M
            if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY) {
                // 扩容失败则抛出 RejectedExecutionException 异常,任务被拒绝
                throw new RejectedExecutionException("Queue capacity exceeded");
            }
            int oldMask, t, b;
            // 基于新的 size 创建 ForkJoinTask 数组
            final ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
            // 旧数组中有任务存在,则执行迁移操作
            if (oldA != null && (oldMask = oldSize - 1) > 0 &&
                    (t = top) - (b = base) > 0) {
                // 计算新任务数组的 mask
                final int mask = size - 1;
                do { // emulate poll from old array, push to new array
                    // 从 base 开始迁移任务
                    final int index = b & oldMask;
                    // 读取任务
                    final ForkJoinTask<?> x = (ForkJoinTask<?>)
                            QA.getAcquire(oldA, index);
                    // 将旧数组中对应的 slot 置为 null
                    if (x != null &&
                            QA.compareAndSet(oldA, index, x, null)) {
                        // 写入新数组
                        a[b & mask] = x;
                    }
                // 循环迁移 
                } while (++b != t);
                VarHandle.releaseFence();
            }
            // 返回新数组
            return a;
        }
    }

    /**
     *  尝试创建或唤醒一个工作者
     */
    final void signalWork() {
        for (;;) {
            long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
            // 1)活跃工作者线程数已经 >= 并行度
            if ((c = ctl) >= 0L) {
                break;
                // 2)核心工作者线程未满 && 无空闲工作者线程
            } else if ((sp = (int)c) == 0) {            // no idle workers
                if ((c & ADD_WORKER) != 0L) {
                    // 尝试增加一个工作者线程
                    tryAddWorker(c);
                }
                break;
            }
            // 3)workQueues 为 null 表示线程池未启动或已经终止
            else if ((ws = workQueues) == null) {
                break;                                // unstarted/terminated
                // 4)线程池已经终止
            } else if (ws.length <= (i = sp & SMASK)) {
                break;                                // terminated
                // 5)线程池正在终止,目标工作队列已经被回收
            } else if ((v = ws[i]) == null) {
                break;                                // terminating
                // 6)核心工作者线程未满 && 有工作者线程静止或已经阻塞
            } else {
                // 读取最近静止的工作队列 ID 及其工作队列索引
                final int np = sp & ~UNSIGNALLED;
                // 读取工作队列的 phase 值
                final int vp = v.phase;
                /**
                 * 读取在这个工作队列静止或工作者线程阻塞前,
                 * 上一个静止的工作者队列或阻塞的工作者线程所处的控制变量
                 */
                final long nc = v.stackPred & SP_MASK | UC_MASK & c + RC_UNIT;
                // 读取工作队列驻留线程
                final Thread vt = v.owner;
                // 如果当前工作队列是最近静止的或其工作者线程是最近阻塞的,则尝试恢复为静止之前的控制变量
                if (sp == vp && CTL.compareAndSet(this, c, nc)) {
                    // 写入 phase 值
                    v.phase = np;
                    // 如果 source 为 DORMANT【工作者线程阻塞前一刻写入】
                    if (v.source < 0) {
                        // 唤醒阻塞的工作者线程
                        LockSupport.unpark(vt);
                    }
                    break;
                }
            }
        }
    }

    /**
     *  尝试增加一个工作者线程
     * Tries to add one worker, incrementing ctl counts before doing
     * so, relying on createWorker to back out on failure.
     */
    private void tryAddWorker(long c) {
        do {
            // 活跃工作者线程数和总的工作者线程数,递增 1
            final long nc = RC_MASK & c + RC_UNIT |
                    TC_MASK & c + TC_UNIT;
            // 如果控制变量未更新 && 原子更新当前控制变量成功
            if (ctl == c && CTL.compareAndSet(this, c, nc)) {
                // 创建一个新的工作者线程
                createWorker();
                break;
            }
            // 出现竞争,则重新判断并重试
        } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
    }

    /**
     *  尝试创建并启动一个新的工作者线程
     */
    private boolean createWorker() {
        // 读取工作者线程工厂
        final ForkJoinWorkerThreadFactory fac = factory;
        Throwable ex = null;
        ForkJoinWorkerThread wt = null;
        try {
            // 创建一个新的工作者线程
            if (fac != null && (wt = fac.newThread(this)) != null) {
                // 启动工作线程
                wt.start();
                return true;
            }
        } catch (final Throwable rex) {
            ex = rex;
        }
        // 创建失败或启动失败,则注销当前工作者
        deregisterWorker(wt, ex);
        return false;
    }

    final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
        WorkQueue w = null;
        int phase = 0;
        // 目标工作者线程不为 null && 任务队列不为 null
        if (wt != null && (w = wt.workQueue) != null) {
            final Object lock = workerNamePrefix;
            // 计算窃取任务数
            final long ns = w.nsteals & 0xffffffffL;
            // 计算工作队列索引
            final int idx = w.id & SMASK;
            if (lock != null) {
                WorkQueue[] ws;                       // remove index from array
                synchronized (lock) {
                    // 将工作者线程驻留的任务队列回收
                    if ((ws = workQueues) != null && ws.length > idx &&
                            ws[idx] == w) {
                        ws[idx] = null;
                    }
                    // 累积总的窃取任务数
                    stealCount += ns;
                }
            }
            // 读取工作队列 phase 值
            phase = w.phase;
        }
        // 工作队列不是静止状态
        if (phase != QUIET) {                         // else pre-adjusted
            long c;                                   // decrement counts
            // 活跃工作者线程数和总工作者线程数递减 1
            do {} while (!CTL.weakCompareAndSet
                    (this, c = ctl, RC_MASK & c - RC_UNIT |
                    TC_MASK & c - TC_UNIT |
                    SP_MASK & c));
        }
        // 如果工作者线程驻留的工作队列不为 null
        if (w != null)
        {
            // 取消所有的任务
            w.cancelAll();                            // cancel remaining tasks
        }
        /**
         * 终止线程池失败 && 工作队列不为 null && 任务数组不为 null
         */
        if (!tryTerminate(false, false) &&            // possibly replace worker
                w != null && w.array != null) {
            // 尝试创建或唤醒一个工作者线程
            signalWork();
        }
        // 1)如果不是异常终止,则删除过时的异常信息
        if (ex == null) {
            ForkJoinTask.helpExpungeStaleExceptions();
        } else {
            // 2)重新抛出异常
            ForkJoinTask.rethrow(ex);
        }
    }

     * @param now
     *  true 表示无条件终止,
     *  false 表示等到线程池无任务或无活跃工作者线程之后终止
     * @param enable true 可能在下次终止
     * @return true if terminating or terminated
     */
    private boolean tryTerminate(boolean now, boolean enable) {
        int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED

        // 线程池状态不是 SHUTDOWN
        while (((md = mode) & SHUTDOWN) == 0) {
            /**
             *  enable 为 false 和通用线程池不允许终止
             */
            if (!enable || this == common) {
                return false;
            } else {
                // 设置线程池状态为 SHUTDOWN
                MODE.compareAndSet(this, md, md | SHUTDOWN);
            }
        }

        while (((md = mode) & STOP) == 0) {       // try to initiate termination
            // 如果不是立刻终止
            if (!now) {                           // check if quiescent & empty
                for (long oldSum = 0L;;) {        // repeat until stable
                    boolean running = false;
                    long checkSum = ctl;
                    final WorkQueue[] ws = workQueues;
                    // 1)有活跃的工作者线程
                    if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0) {
                        running = true;
                        // 2)工作队列不为 null
                    } else if (ws != null) {
                        WorkQueue w; int b;
                        // 遍历所有的工作队列,看是否有未完成的任务
                        for (int i = 0; i < ws.length; ++i) {
                            if ((w = ws[i]) != null) {
                                checkSum += (b = w.base) + w.id;
                                if (b != w.top ||
                                        (i & 1) == 1 && w.source >= 0) {
                                    // 工作队列中有任务未完成
                                    running = true;
                                    break;
                                }
                            }
                        }
                    }
                    // 1)线程池已经停止,则直接退出
                    if (((md = mode) & STOP) != 0) {
                        break;                 // already triggered
                        // 2)有活跃工作者线程或有任务未完成,则返回 false
                    } else if (running) {
                        return false;
                    } else if (workQueues == ws && oldSum == (oldSum = checkSum)) {
                        break;
                    }
                }
            }
            // 设置线程池状态为 STOP
            if ((md & STOP) == 0) {
                MODE.compareAndSet(this, md, md | STOP);
            }
        }

        // 线程池状态不是 TERMINATED
        while (((md = mode) & TERMINATED) == 0) { // help terminate others
            for (long oldSum = 0L;;) {            // repeat until stable
                WorkQueue[] ws; WorkQueue w;
                long checkSum = ctl;
                // workQueues 不为空
                if ((ws = workQueues) != null) {
                    for (final WorkQueue element : ws) {
                        // 当前任务队列不为 null
                        if ((w = element) != null) {
                            final ForkJoinWorkerThread wt = w.owner;
                            // 取消所有的任务
                            w.cancelAll();        // clear queues
                            // 如果是工作队列
                            if (wt != null) {
                                try {             // unblock join or park
                                    // 中断工作者线程
                                    wt.interrupt();
                                } catch (final Throwable ignore) {
                                }
                            }
                            // 累加校验和
                            checkSum += w.base + w.id;
                        }
                    }
                }
                if (((md = mode) & TERMINATED) != 0 ||
                        workQueues == ws && oldSum == (oldSum = checkSum)) {
                    break;
                }
            }
            // 1)线程池已经终止
            if ((md & TERMINATED) != 0) {
                break;
                // 2)还有工作者线程未中断
            } else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0) {
                break;
                // 3)设置线程池状态为 TERMINATED
            } else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
                synchronized (this) {
                    notifyAll();                  // for awaitTermination
                }
                break;
            }
        }
        // 终止成功返回 true
        return true;
    }

WorkQueue#

        /**
         *  工作者线程将任务提交到自己驻留的工作队列中
         */
        void push(ForkJoinTask<?> task) {
            // 读取 top 索引
            final int s = top; ForkJoinTask<?>[] a; int al, d;
            /**
             * 1)保存任务的 ForkJoinTask 数组不为 null && 长度 > 0
             */
            if ((a = array) != null && (al = a.length) > 0) {
                // 基于旧的 top 索引计算任务存放的索引
                final int index = al - 1 & s;
                final ForkJoinPool p = pool;
                // 递增 top 索引
                top = s + 1;
                // 将任务存储到 ForkJoinTask 数组中
                QA.setRelease(a, index, task);
                // 1)如果是工作队列的第一个任务 && pool 不为 null
                if ((d = base - s) == 0 && p != null) {
                    VarHandle.fullFence();
                    // 尝试新增或唤醒工作者线程
                    p.signalWork();
                }
                // 2)如果任务队列已满,则执行扩容
                else if (d + al == 1) {
                    growArray();
                }
            }
        }

工作者线程的执行逻辑

public class ForkJoinWorkerThread extends Thread {
    // 工作者线程所属的线程池
    final ForkJoinPool pool;                
    // 工作者驻留的任务队列
    final ForkJoinPool.WorkQueue workQueue; 

    /**
     *  创建一个在目标 pool 中执行任务的 ForkJoinWorkerThread 实例
     */
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        super("aForkJoinWorkerThread");
        this.pool = pool;
        // 将当前工作者线程注册到线程池中
        this.workQueue = pool.registerWorker(this);
    }

    /**
     *  工作线程启动前的钩子函数
     */
    protected void onStart() {
    }

    /**
     *  工作者线程退出后的钩子函数
     */
    protected void onTermination(Throwable exception) {
    }

    /**
     *  运行工作者线程
     */
    public void run() {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                // 前置钩子
                onStart();
                // 运行工作队列
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    // 后置工资
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    // 注销工作者
                    pool.deregisterWorker(this, exception);
                }
            }
        }
    }

    /**
     *  工作者线程执行完驻留队列的任务后,会触发一次 afterTopLevelExec 回调
     */
    void afterTopLevelExec() {
    }
}
ForkJoinPool#
    /**
     *  工作者线程的核心运行逻辑
     */
    final void runWorker(WorkQueue w) {
        WorkQueue[] ws;
        // 执行任务数组的初始化
        w.growArray();                                  // allocate queue
        // 计算随机窃取任务的任务队列索引
        int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
        if (r == 0) {
            r = 1;
        }
        int lastSignalId = 0;                           // avoid unneeded signals
        // 循环处理
        while ((ws = workQueues) != null) {
            boolean nonempty = false;                   // scan
            /**
             * n:length
             * m:mask
             * b:base
             * i:index
             * a:array
             * q:WorkQueue
             * al:array length
             */
            for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
                WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
                /**
                 * 1)基于随机索引定位的任务队列不为 null &&
                 * 目标任务队列中有任务需要处理 &&
                 * 目标任务队列的任务数组不为空
                 */
                if ((i = r & m) >= 0 && i < n &&        // always true
                        (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
                        (a = q.array) != null && (al = a.length) > 0) {
                    // 被窃取任务队列的 ID
                    final int qid = q.id;                     // (never zero)
                    // 计算任务索引,窃取任务是从 base 开始的
                    final int index = al - 1 & b;
                    // 读取目标任务
                    final ForkJoinTask<?> t = (ForkJoinTask<?>)
                            QA.getAcquire(a, index);
                    // 任务被当前线程获取,未出现竞争
                    if (t != null && b++ == q.base &&
                            QA.compareAndSet(a, index, t, null)) {
                        /**
                         * 当前任务队列里还有任务待处理 &&
                         * 第一次窃取该任务队列里的任务
                         */
                        if ((q.base = b) - q.top < 0 && qid != lastSignalId)
                        {
                            // 尝试增加或唤醒一个工作者线程来帮忙处理任务
                            signalWork();               // propagate signal
                        }
                        // 写入上一次窃取的任务队列 ID
                        w.source = lastSignalId = qid;
                        // 执行目标任务
                        t.doExec();
                        // 1)当前工作队列如果是 FIFO 模式
                        if ((w.id & FIFO) != 0) {
                            // 则从 base 位置开始处理自己的任务
                            w.localPollAndExec(POLL_LIMIT);
                        } else {
                            // 则从 top 位置开始处理自己的任务
                            w.localPopAndExec(POLL_LIMIT);
                        }
                        // 读取工作队列驻留线程
                        final ForkJoinWorkerThread thread = w.owner;
                        // 递增窃取任务数
                        ++w.nsteals;
                        // 重置窃取任务队列 ID
                        w.source = 0;                   // now idle
                        if (thread != null) {
                            // 驻留线程不为 null,则执行 afterTopLevelExec 回调
                            thread.afterTopLevelExec();
                        }
                    }
                    // 处理完一个任务之后,再次尝试窃取该任务队列里的任务
                    nonempty = true;
                }
                // 2)如果目标任务队列里的任务已经处理完毕,则退出此次扫描【一次只处理一个任务队列】
                else if (nonempty) {
                    break;
                // 3)定位到的任务队列无任务可处理,则扫描下一个任务队列
                } else {
                    ++r;
                }
            }
            
            // 1)如果成功处理完一个任务队列里的任务,则重新进行定位
            if (nonempty) {                             // move (xorshift)
                r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
            }
            // 2)扫描了所有的任务队列都没有任务可处理
            else {
                int phase;
                // 重置 lastSignalId
                lastSignalId = 0;                       // clear for next scan
                // 1)如果队列还未进入静止状态
                if ((phase = w.phase) >= 0) {           // enqueue
                    // 写入队列 ID 及其工作队列索引
                    final int np = w.phase = phase + SS_SEQ | UNSIGNALLED;
                    long c, nc;
                    do {
                        // 记录先前的控制变量到 stackPred 中
                        w.stackPred = (int)(c = ctl);
                        // 活跃工作者数递减 1,同时写入 np
                        nc = c - RC_UNIT & UC_MASK | SP_MASK & np;
                    // 将当前队列的状态写入控制变量中,写入成功后尝试执行最后一次扫描  
                    } while (!CTL.weakCompareAndSet(this, c, nc));
                }
                else {                                  // already queued
                    // 读取 stackPred
                    final int pred = w.stackPred;
                    // 工作队列置为静止 && 需要唤醒信号
                    w.source = DORMANT;                 // enable signal
                    for (int steps = 0;;) {
                        int md, rc; long c;
                        if (w.phase >= 0) {
                            w.source = 0;
                            break;
                        }
                        // 2)线程池正在停止,则当前工作者需要退出
                        else if ((md = mode) < 0) {
                            return;
                        // 3)线程池正在关闭,则当前工作者需要退出 
                        } else if ((rc = (md & SMASK) +  // possibly quiescent
                                (int)((c = ctl) >> RC_SHIFT)) <= 0 &&
                                (md & SHUTDOWN) != 0 &&
                                tryTerminate(false, false)) {
                            return;                     // help terminate
                        // 4)在多次阻塞之间清空中断状态  
                        } else if ((++steps & 1) == 0) {
                            Thread.interrupted();       // clear between parks
                        /**
                         * 5)当前已经无活跃工作者线程 && 
                         * 已经有工作队列静止 && 
                         * 当前队列是最近静止的工作队列   
                         */
                        } else if (rc <= 0 && pred != 0 && phase == (int)c) {
                            // 计算截止时间
                            final long d = keepAlive + System.currentTimeMillis();
                            // 阻塞到截止时间
                            LockSupport.parkUntil(this, d);
                            // 阻塞过程中一直无任务提交
                            if (ctl == c &&
                                    d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
                                // 递减总工作者线程数
                                final long nc = UC_MASK & c - TC_UNIT |
                                        SP_MASK & pred;
                                // 更新控制变量
                                if (CTL.compareAndSet(this, c, nc)) {
                                    // 将工作队列设置为静止状态
                                    w.phase = QUIET;
                                    // 当前工作者退出工作
                                    return;             // drop on timeout
                                }
                            }
                        // 6)阻塞当前工作者等待唤醒,ForkJoinPool 保证至少会有一个工作者线程不会退出
                        } else {
                            LockSupport.park(this);
                        }
                    }
                }
            }
        }
    }

    /**
     *  将工作者线程 wt 注册到当前线程池中
     */
    final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
        UncaughtExceptionHandler handler;
        // 设置为守护线程
        wt.setDaemon(true);                             
        // 如果存在异常处理器
        if ((handler = ueh) != null) {
            // 写入异常处理器
            wt.setUncaughtExceptionHandler(handler);
        }
        // 为工作者线程创建一个工作队列
        final WorkQueue w = new WorkQueue(this, wt);
        int tid = 0;                                    // for thread name
        // 线程池是否为 FIFO 模式
        final int fifo = mode & FIFO;
        // 读取工作者线程名称前缀
        final String prefix = workerNamePrefix;
        if (prefix != null) {
            synchronized (prefix) {
                final WorkQueue[] ws = workQueues; int n;
                // 计算索引种子
                final int s = indexSeed += SEED_INCREMENT;
                if (ws != null && (n = ws.length) > 1) {
                    // 计算掩码
                    final int m = n - 1;
                    // 基于索引种子、掩码计算队列 ID
                    tid = s & m;
                    // 计算奇数索引值
                    int i = m & (s << 1 | 1);         // odd-numbered indices
                    // 查找空闲的 slot
                    for (int probes = n >>> 1;;) {      // find empty slot
                        WorkQueue q;
                        // 1)当前索引定位的任务队列为 null || 任务队列为静止状态
                        if ((q = ws[i]) == null || q.phase == QUIET) {
                            break;
                        // 2)所有的奇数索引位都已被占用,则需要进行扩容
                        } else if (--probes == 0) {
                            i = n | 1;                  // resize below
                            break;
                        // 3)计算下一个奇数索引位 
                        } else {
                            i = i + 2 & m;
                        }
                    }
                    
                    // 写入工作队列索引、模式等
                    final int id = i | fifo | s & ~(SMASK | FIFO | DORMANT);
                    // 写入队列 ID
                    w.phase = w.id = id;                // now publishable
                    // 如果索引 i 所在的 slot 为空或工作队列为静止状态
                    if (i < n) {
                        // 写入工作队列
                        ws[i] = w;
                    } else {                              // expand array
                        // 执行 WorkQueue 的扩容
                        final int an = n << 1;
                        // 双倍扩容
                        final WorkQueue[] as = new WorkQueue[an];
                        // 写入工作队列
                        as[i] = w;
                        final int am = an - 1;
                        for (int j = 0; j < n; ++j) {
                            WorkQueue v;                // copy external queue
                            // 迁移旧 workQueues 中的任务队列
                            if ((v = ws[j]) != null) {
                                as[v.id & am & SQMASK] = v;
                            }
                            if (++j >= n) {
                                break;
                            }
                            as[j] = ws[j];              // copy worker
                        }
                        // 写入 workQueues
                        workQueues = as;
                    }
                }
            }
            // 设置工作者线程名称
            wt.setName(prefix.concat(Integer.toString(tid)));
        }
        return w;
    }

ForkJoinTask.fork/join/invoke

  • fork:将任务提交到 ForkJoinPool 中异步执行
    /**
     *  1)如果当前线程是 ForkJoinPool 工作者线程,则将其提交到驻留的工作队列中。
     *  2)否则将当前 ForkJoinTask 任务提交到 common 池中
     */
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        } else {
            ForkJoinPool.common.externalPush(this);
        }
        return this;
    }
  • join
    /**
     *  阻塞等待当前 ForkJoinTask 执行完成并返回结果,
     *  任务执行过程中可以抛出 RuntimeException 或 Error 异常。
     *  任务执行线程可以是当前线程或其他工作者线程。
     */
    public final V join() {
        int s;
        // 1)阻塞等待任务执行完成,如果是异常完成,则将抛出 RuntimeException 或 Error。
        if (((s = doJoin()) & ABNORMAL) != 0) {
            reportException(s);
        }
        // 2)执行成功则返回原始结果
        return getRawResult();
    }

    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        /**
         * 1)如果任务已经完成,则返回其状态
         * 2)如果当前线程是 ForkJoinWorkerThread &&
         *  则尝试从驻留工作队列顶部拉取此任务 &&
         *  在当前线程中执行此任务 &&
         *  执行成功则返回任务状态
         * 3)如果当前线程是 ForkJoinWorkerThread,但是拉取任务失败,
         *  则表示【目标任务不在顶部、或其他的工作者线程窃取了此任务在执行】,则等待任务完成。
         * 4)如果当前线程不是 ForkJoinWorkerThread,则阻塞等待任务完成。
         */
        return (s = status) < 0 ? s :
            (t = Thread.currentThread()) instanceof ForkJoinWorkerThread ?
                    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                    tryUnpush(this) && (s = doExec()) < 0 ? s :
                        wt.pool.awaitJoin(w, this, 0L) :
                            externalAwaitDone();
    }

ForkJoinPool#WorkQueue#
        /**
         *  只有当目标任务 task 是工作队列顶部的第一个任务时,才将此任务移除,并返回 true,
         *  否则返回 false。
         */
        boolean tryUnpush(ForkJoinTask<?> task) {
            // 读取 base
            final int b = base;
            // 读取 top
            int s = top, al; ForkJoinTask<?>[] a;
            // 任务数组不为空 && 有任务待处理
            if ((a = array) != null && b != s && (al = a.length) > 0) {
                // 计算读取索引
                final int index = al - 1 & --s;
                // 如果顶部任务就是当前任务 task,则将 slot 置为 null
                if (QA.compareAndSet(a, index, task, null)) {
                    // 更新 top 值,并返回 true
                    top = s;
                    VarHandle.releaseFence();
                    return true;
                }
            }
            return false;
        }

    /**
     *  窃取任务的主要执行方法
     */
    final int doExec() {
        int s; boolean completed;
        // 任务未完成
        if ((s = status) >= 0) {
            try {
                // 立即执行任务
                completed = exec();
            } catch (final Throwable rex) {
                completed = false;
                // 设置异常状态
                s = setExceptionalCompletion(rex);
            }
            // 如果正常完成
            if (completed) {
                // 设置完成状态
                s = setDone();
            }
        }
        return s;
    }

    /**
     *  记录异常信息,触发 internalPropagateException 钩子函数
     */
    private int setExceptionalCompletion(Throwable ex) {
        final int s = recordExceptionalCompletion(ex);
        if ((s & THROWN) != 0) {
            internalPropagateException(ex);
        }
        return s;
    }

    final int recordExceptionalCompletion(Throwable ex) {
        int s;
        // 任务未完成
        if ((s = status) >= 0) {
            // 计算此 ForkJoinTask 的哈希值
            final int h = System.identityHashCode(this);
            // 读取异常表的锁
            final ReentrantLock lock = exceptionTableLock;
            lock.lock();
            try {
                expungeStaleExceptions();
                // 读取异常表
                final ExceptionNode[] t = exceptionTable;
                // 计算索引
                final int i = h & t.length - 1;
                // 遍历单向链表
                for (ExceptionNode e = t[i]; ; e = e.next) {
                    /**
                     * 1)目标 slot 为 null
                     * 2)已经到达链表尾部
                     */
                    if (e == null) {
                        // 则将此异常加入异常表
                        t[i] = new ExceptionNode(this, ex, t[i],
                                exceptionTableRefQueue);
                        break;
                    }
                    // 如果已经加入了,则直接退出
                    if (e.get() == this) {
                        break;
                    }
                }
            } finally {
                lock.unlock();
            }
            // 设置任务状态
            s = abnormalCompletion(DONE | ABNORMAL | THROWN);
        }
        return s;
    }

    /**
     *  尝试将当前 ForkJoinTask 标记为由于取消或异常而完成
     */
    private int abnormalCompletion(int completion) {
        for (int s, ns;;) {
            // 任务已经完成,则返回
            if ((s = status) < 0) {
                return s;
                // 更新任务状态
            } else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) {
                // 如果有线程阻塞依赖该任务完成,则唤醒所有的阻塞线程
                if ((s & SIGNAL) != 0) {
                    synchronized (this) { notifyAll(); }
                }
                return ns;
            }
        }
    }

    /**
     *  将任务状态设置为 DONE,如果有其他线程在阻塞等待该任务完成,则唤醒所有阻塞的线程
     */
    private int setDone() {
        int s;
        /**
         *  1)将任务状态设置为 DONE
         *  2)如果有其他线程在阻塞等待该任务完成,则唤醒所有阻塞的线程
         */
        if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0) {
            synchronized (this) { notifyAll(); }
        }
        return s | DONE;
    }

ForkJoinPool#WorkQueue#
        /**
         *  从工作队列的 top 位置开始循环扫描目标任务 task,如果找到则将其移除并执行
         */
        void tryRemoveAndExec(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] wa; int s, wal;
            // 此工作队列中有任务待处理
            if (base - (s = top) < 0 && // traverse from top
                    (wa = array) != null && (wal = wa.length) > 0) {
                // 从工作队列的 top 位置开始循环扫描目标任务 task,如果找到则将其移除并执行
                for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
                    final int index = i & m;
                    final ForkJoinTask<?> t = (ForkJoinTask<?>)
                            QA.get(wa, index);
                    // 1)已经没有更多的任务待扫描
                    if (t == null) {
                        break;
                    // 2)当前任务就是目标任务 task
                    } else if (t == task) {
                        // 移除该任务
                        if (QA.compareAndSet(wa, index, t, null)) {
                            top = ns;   // 将已扫描的任务集体下移一个位置
                            for (int j = i; j != ns; ++j) {
                                ForkJoinTask<?> f;
                                final int pindex = j + 1 & m;
                                f = (ForkJoinTask<?>)QA.get(wa, pindex);
                                QA.setVolatile(wa, pindex, null);
                                final int jindex = j & m;
                                QA.setRelease(wa, jindex, f);
                            }
                            VarHandle.releaseFence();
                            // 执行目标任务
                            t.doExec();
                        }
                        break;
                    }
                }
            }
        }

    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
        int s = 0;
        /**
         *  工作队列不为 null && 任务不为 null
         *  1)task 不是 CountedCompleter 任务
         *  2)task 是 CountedCompleter,则尝试窃取和执行目标计算中的任务,直到其完成或无法找到任务为止
         */
        if (w != null && task != null &&
                (!(task instanceof CountedCompleter) ||
                        (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
            // 尝试从工作队列中移除并运行此任务
            w.tryRemoveAndExec(task);
            // 读取上次窃取的任务队列ID和当前队列的ID
            final int src = w.source, id = w.id;
            // 读取任务状态
            s = task.status;
            // 任务未完成
            while (s >= 0) {
                WorkQueue[] ws;
                boolean nonempty = false;
                // 获取随机奇数索引
                final int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
                if ((ws = workQueues) != null) {       // scan for matching id
                    for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
                        WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
                        /**
                         *  目标索引 i 定位到的工作队列不为 null   &&
                         *  此工作队列最近窃取了当前工作队列的任务 &&
                         *  此工作队列有任务待处理 &&
                         *  则帮助其处理任务
                         */
                        if ((i = r + j & m) >= 0 && i < n &&
                                (q = ws[i]) != null && q.source == id &&
                                (b = q.base) - q.top < 0 &&
                                (a = q.array) != null && (al = a.length) > 0) {
                            // 窃取任务的队列ID
                            final int qid = q.id;
                            // 从 base 位置开始窃取
                            final int index = al - 1 & b;
                            // 读取任务
                            final ForkJoinTask<?> t = (ForkJoinTask<?>)
                                    QA.getAcquire(a, index);
                            /**
                             *  目标任务不为 null && 
                             *  没有其他队列并发窃取此任务 &&
                             *  则尝试将此任务从目标工作队列中移除
                             */
                            if (t != null && b++ == q.base && id == q.source &&
                                    QA.compareAndSet(a, index, t, null)) {
                                // 窃取成功,则更新 base 值
                                q.base = b;
                                // 记录最近窃取任务的任务队列 ID
                                w.source = qid;
                                // 执行目标任务
                                t.doExec();
                                // 回写上次窃取任务的队列ID
                                w.source = src;
                            }
                            nonempty = true;
                            // 窃取并处理完一个任务,则退出循环
                            break;
                        }
                    }
                }
                // 1)目标任务已经完成,则返回
                if ((s = task.status) < 0) {
                    break;
                // 2)一个任务都没有窃取到
                } else if (!nonempty) {
                    long ms, ns; int block;
                    // 1)非超时模式
                    if (deadline == 0L) {
                        ms = 0L;                       // untimed
                    // 2)如果已经超时,则返回 
                    } else if ((ns = deadline - System.nanoTime()) <= 0L) {
                        break;                         // timeout
                    // 3)未超时,但是剩余时间 < 1 毫秒,则将其设置为 1 毫秒
                    } else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                    {
                        ms = 1L;                       // avoid 0 for timed wait
                    }
                    // 尝试增加一个补偿工作者来处理任务
                    if ((block = tryCompensate(w)) != 0) {
                        // 阻塞等待
                        task.internalWait(ms);
                        // 如果添加成功,则递增活跃工作者数
                        CTL.getAndAdd(this, block > 0 ? RC_UNIT : 0L);
                    }
                    s = task.status;
                }
            }
        }
        return s;
    }

    /**
     *  如果任务未完成,则阻塞等待
     */
    final void internalWait(long timeout) {
        /**
         * 将(旧 status 值 | SIGNAL) 的值写入 status 中,并返回旧值
         * 如果任务未完成
         */
        if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) {
            synchronized (this) {
                // 1)如果任务未完成,则阻塞等待
                if (status >= 0) {
                    try { wait(timeout); } catch (final InterruptedException ie) { }
                // 2)如果任务已经完成,则唤醒阻塞在此任务上的所有线程   
                } else {
                    notifyAll();
                }
            }
        }
    }

    /**
     *  阻塞一个非工作者线程,直到任务完成
     */
    private int externalAwaitDone() {
        int s = tryExternalHelp();
        // 任务未完成 && 写入唤醒标记
        if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
            boolean interrupted = false;
            synchronized (this) {
                for (;;) {
                    // 1)任务未完成
                    if ((s = status) >= 0) {
                        try {
                            // 阻塞等待任务完成
                            wait(0L);
                        // 工作者线程被中断,可能是线程池终止    
                        } catch (final InterruptedException ie) {
                            interrupted = true;
                        }
                    }
                    // 2)任务完成则唤醒在此任务上阻塞等待的线程
                    else {
                        notifyAll();
                        break;
                    }
                }
            }
            // 工作者线程被设置了中断标记
            if (interrupted) {
                // 则中断此工作者线程
                Thread.currentThread().interrupt();
            }
        }
        // 返回任务状态
        return s;
    }

    private int tryExternalHelp() {
        int s;
        /**
         * 1)当前任务已经完成,则返回其状态
         * 2)任务未完成,此任务是 CountedCompleter,则执行 externalHelpComplete
         * 3)任务未完成,此任务是 ForkJoinTask,则执行 tryExternalUnpush && 拉取任务成功,则执行它
         */
        return (s = status) < 0 ? s:
            this instanceof CountedCompleter ?
                    ForkJoinPool.common.externalHelpComplete(
                            (CountedCompleter<?>)this, 0) :
                                ForkJoinPool.common.tryExternalUnpush(this) ?
                                        doExec() : 0;
    }

    final boolean tryExternalUnpush(ForkJoinTask<?> task) {
        // 读取线程测试值
        final int r = ThreadLocalRandom.getProbe();
        WorkQueue[] ws; WorkQueue w; int n;
        // 定位到的共享队列不为 null,则尝试从目标共享队列中移除此任务
        return (ws = workQueues) != null &&
                (n = ws.length) > 0 &&
                (w = ws[n - 1 & r & SQMASK]) != null &&
                w.trySharedUnpush(task);
    }

ForkJoinPool#WorkQueue
        boolean trySharedUnpush(ForkJoinTask<?> task) {
            boolean popped = false;
            final int s = top - 1;
            int al; ForkJoinTask<?>[] a;
            // 任务数组不为空
            if ((a = array) != null && (al = a.length) > 0) {
                // 计算目标索引
                final int index = al - 1 & s;
                // 读取任务
                final ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index);
                // 读取的任务就是目标任务 task && 尝试锁定共享队列
                if (t == task &&
                        PHASE.compareAndSet(this, 0, QLOCK)) {
                    // 锁定成功 && 确认没有出现竞争 && 将目标 slot 置为 null
                    if (top == s + 1 && array == a &&
                            QA.compareAndSet(a, index, task, null)) {
                        // 成功弹出任务
                        popped = true;
                        // 更新 top 值
                        top = s;
                    }
                    // 释放共享锁
                    PHASE.setRelease(this, 0);
                }
            }
            return popped;
        }
  • invoke
    /**
     *  立即在当前线程中执行此任务,等待任务执行完毕并返回结果,
     *  或抛出 RuntimeException 或 Error 异常。
     */
    public final V invoke() {
        int s;
        if (((s = doInvoke()) & ABNORMAL) != 0) {
            reportException(s);
        }
        return getRawResult();
    }

    private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        return (s = doExec()) < 0 ? s :
            (t = Thread.currentThread()) instanceof ForkJoinWorkerThread ?
                    (wt = (ForkJoinWorkerThread)t).pool.
                    awaitJoin(wt.workQueue, this, 0L) :
                        externalAwaitDone();
    }

ForkJoinPool 源码分析

标签:version   process   rom   pac   进入   异步   单向链表   init   ica   

原文地址:https://www.cnblogs.com/zhuxudong/p/10122688.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!