并发编程 Fork/Join

本文将介绍并发编程中的 Fork/Join,它们是 JUC 对分治任务模型的支持。

一、分治

分治,分而治之,是一种解决复杂问题的思维方法。

具体来说,就是将一个复杂的问题分解成多个更小的子问题,直到子问题简单至可以直接求解,再将这些结果汇总为最终结果。

二、分治任务模型

分治任务模型可以分为两个阶段:

  • 任务分解:将任务迭代地分解为子任务,直至可以直接计算
  • 结果合并:逐层合并子任务地执行结果,直至获得最终结果。

三、Fork/Join

1. 什么是 Fork/Join?

Fork/Join 是 Java 提供的支持分治任务模型的并发计算框架,Fork 对应任务分解,Join 对应结果合并。

2. ForkJoinPool

(1) ForkJoinPool

分治任务的线程池,可以类比 ThreadPoolExecutor。

(2) ForkJoinPool 与 ThreadPoolExecutor 的异同点

  • ForkJoinPool 和 ThreadPoolExecutor 一样,都是 “生产者 - 消费者” 的实现
  • ForkJoinPool 内部有多个任务队列,ThreadPoolExecutor 内部只有一个任务队列

(3) 工作原理

当通过 invoke()sumbit() 提交任务至 ForkJoinPool 后,ForkJoinPool会根据一定的路由规则将任务提交到某个任务队列中。

当任务在执行过程中创建子任务时,子任务会提交到对应的任务队列中。

ForkJoinPool 还支持 “任务窃取” 机制,如果工作线程对应的任务队列空闲,它可以 “窃取” 其它任务队列中的任务。

ForkJoinPool 通过双端队列避免 “窃取” 时的不必要竞争,正常获取任务和 “窃取” 任务是从任务队列的不同端进行消费。

3. ForkJoinTask

(1) ForkJoinTask

分治任务,可以类比 Runnable。

ForkJoinTask 是一个抽象类,其中最核心的方法有以下两个:

  • fork():异步执行子任务
  • join():阻塞当前线程,以获取子任务的执行结果

(2) RecursiveAction / RecursiveTask

ForkJoinTask 有两个子类:

  • RecursiveAction:以递归方式处理分治任务;抽象类;需要实现抽象方法 compute(),该方法没有返回值
  • RecursiveTask:以递归方式处理分治任务;抽象类;需要实现抽象方法 compute()

因此,RecursiveAction 和 RecursiveTask 共有三个核心方法:

  • fork():异步执行子任务
  • join():阻塞当前线程,以获取子任务的执行结果
  • compute():执行子任务,获取子任务执行结果

四、示例

通过 Fork/Join 计算斐波那契数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Test {

public static void main(String[] args) {
//创建分治任务线程池
ForkJoinPool fjp = new ForkJoinPool(4);

//创建分治任务
Fibonacci fib = new Fibonacci(30);

//启动分治任务
Integer result = fjp.invoke(fib);

//输出结果
System.out.println(result);
}

static class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}

@Override
protected Integer compute(){
if (n <= 1) {
return n;
}
//创建子任务
Fibonacci f1 = new Fibonacci(n - 1);
Fibonacci f2 = new Fibonacci(n - 2);
// 异步执行子任务1
f1.fork();
//等待子任务结果,并合并结果
return f2.compute() + f1.join();
}
}
}

参考

  • Java 并发编程实战