/** * Forks the given tasks, returning when {@code isDone} holds for * each task or an (unchecked) exception is encountered, in which * case the exception is rethrown. If more than one task * encounters an exception, then this method throws any one of * these exceptions. If any task encounters an exception, others * may be cancelled. However, the execution status of individual * tasks is not guaranteed upon exceptional return. The status of * each task may be obtained using {@link #getException()} and * related methods to check if they have been cancelled, completed * normally or exceptionally, or left unprocessed. * * @param tasks the tasks * @throws NullPointerException if any task is null */ public static void invokeAll(ForkJoinTask<?>... tasks) { Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) {//从last到1依次fork,而0则直接invork。 ForkJoinTask<?> t = tasks[i]; if (t == null) { if (ex == null) ex = new NullPointerException(); } else if (i != 0)//不为0则fork t.fork(); else if (t.doInvoke() < NORMAL && ex == null)//为0则invork ex = t.getException(); } for (int i = 1; i <= last; ++i) {//注意此处是从1开始, 依次join ForkJoinTask<?> t = tasks[i]; if (t != null) { if (ex != null) t.cancel(false); else if (t.doJoin() < NORMAL && ex == null) ex = t.getException(); } } if (ex != null) UNSAFE.throwException(ex); }
/** * Forks the given tasks, returning when {@code isDone} holds for * each task or an (unchecked) exception is encountered, in which * case the exception is rethrown. If more than one task * encounters an exception, then this method throws any one of * these exceptions. If any task encounters an exception, the * other may be cancelled. However, the execution status of * individual tasks is not guaranteed upon exceptional return. The * status of each task may be obtained using {@link * #getException()} and related methods to check if they have been * cancelled, completed normally or exceptionally, or left * unprocessed. * * @param t1 the first task * @param t2 the second task * @throws NullPointerException if any task is null */ publicstaticvoidinvokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2){ int s1, s2; t2.fork(); if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) t1.reportException(s1); if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) t2.reportException(s2); }
publicclassApplyerextendsRecursiveAction{ staticdoublesumOfSquares(ForkJoinPool pool, double[] array){ int n = array.length; Applyer a = new Applyer(array, 0, n, null); pool.invoke(a); return a.result; }
doubleatLeaf(int l, int h){ double sum = 0; for (int i = l; i < h; ++i) // perform leftmost base step sum += array[i] * array[i]; return sum; }
protectedvoidcompute(){ int l = lo; int h = hi; Applyer right = null; while (h - l > 1 && getSurplusQueuedTaskCount() <= 3) { int mid = (l + h) >>> 1;//无符号右移(除以2)(若(l+h)为负数,mid就变成整数了) right = new Applyer(array, mid, h, right); right.fork(); h = mid; } double sum = atLeaf(l, h); while (right != null) { if (right.tryUnfork()) // directly calculate if not stolen sum += right.atLeaf(right.lo, right.hi); else { right.join(); sum += right.result; } right = right.next; } result = sum; }
Applyer作为一个RecursiveAction,从最初的一个Applyer(array, 0, n, null),分裂为多个任务,放置于任务列表中,空闲的工作线程从列表中取出任务运行compute方法,计算一个叶节点后, 优先再去寻找右边的叶节点进行计算,若靠右的叶节点们先结束运算,再去任务列表中窃取任务进行计算。