ForkJoin框架
forkjoin类似于一个单机版的mapreduce,只是把多节点多进程换成了多线程。
分治法(dfs): 把大任务划分成多个子任务,然后单线程执行、合并结果;
mapreduce: 把大任务划分成多个子任务,然后多节点多进程执行、合并结果;
forkjoin: 把大任务划分成多个子任务,然后多线程执行、合并结果。
优化:
mapreduce
: 可以通过配置开启预测执行,如果有任务算得慢,会启动新的attempt,取算的快的结果,kill跑得慢的attempt;
forjoin
: 通过双端队列存储每个线程的任务,如果有线程结束得慢,空闲的线程会进行工作窃取
,从双端队列的尾部拿任务执行(但是不会重复计算同一个任务,这一点与MR不同)。
示例使用代码
范围求和:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class CountTask extends RecursiveTask<Integer> { private final static int THREDSHOLD = 2; private final int start; private final int end;
public CountTask(int start, int end) { this.start = start; this.end = end; }
@Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THREDSHOLD; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { int mid = start + (end - start) / 2; CountTask left = new CountTask(start, mid); CountTask right = new CountTask(mid + 1, end); left.fork(); right.fork(); int leftRes = left.join(); int rightRes = right.join(); sum = leftRes + rightRes; } return sum; }
public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1, 10); Future<Integer> res = forkJoinPool.submit(task); System.out.print(res.get());
} }
|
相关类
可以看到代码里主要是继承RecursiveTask
定义一个计算任务类,定义分治和合并计算结果的操作,然后交给ForkJoinPool
进行计算即可。(类似于归并排序)
实际上forkjoin
框架中涉及到的类大致如下:
1 2 3 4 5 6 7 8 9
|
RecursiveTask => ForkJoinTask implements Future<V>, Serializable
RecursiveAction => ForkJoinTask implements Future<V>, Serializable
ForkJoinPool => extends AbstractExecutorService => implements ExecutorService
|
ForkJoinPool
中使用的双端队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @sun.misc.Contended static final class WorkQueue { volatile int scanState; int stackPred; volatile int qlock; volatile int base; int top; ForkJoinTask<?>[] array; volatile Thread parker; volatile ForkJoinTask<?> currentJoin; volatile ForkJoinTask<?> currentSteal; ... ... }
|