注意
本文最后更新于 2024-05-09,文中内容可能已过时。
一、ForkJoin 是什么
ForkJoinPool 是 JDK7 引入的分治任务线程池。
核心思想:
- Fork:大任务拆分成若干小任务
- Join:小任务执行完,结果合并汇总
底层采用 工作窃取算法(Work-Stealing):
空闲线程偷偷去别的线程队列偷任务执行,充分利用CPU,适合密集计算、递归拆分场景。
二、适用场景
- 大数据量递归计算:求和、排序、遍历树、批量数据处理
- 适合 CPU密集型 任务
- 不适合 IO阻塞型 任务(阻塞会浪费工作窃取机制)
三、ForkJoin 两大核心类
- RecursiveTask:有返回值(常用)
- RecursiveAction:无返回值
四、完整可运行示例:1~100 累加求和
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
// 1. 定义拆分任务:有返回值用 RecursiveTask
class SumTask extends RecursiveTask<Long> {
// 阈值:小于等于10个数就直接计算,不再拆分
private static final int THRESHOLD = 10;
private final long start;
private final long end;
public SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
// 区间小,直接计算
if (end - start <= THRESHOLD) {
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
// 2. 拆分:Fork
long mid = (start + end) / 2;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
// 异步执行左任务
left.fork();
// 同步执行右任务
Long rightResult = right.compute();
// 获取左任务结果
Long leftResult = left.join();
// 3. 合并结果:Join
return leftResult + rightResult;
}
}
public class ForkJoinDemo {
public static void main(String[] args) {
// 创建 ForkJoin 线程池
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 提交总任务:计算 1~100 累加
SumTask totalTask = new SumTask(1, 100);
Long result = forkJoinPool.invoke(totalTask);
System.out.println("1~100 累加结果:" + result);
// 关闭线程池
forkJoinPool.shutdown();
}
}
|
五、无返回值示例 RecursiveAction
适合只做批量处理、不需要返回结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import java.util.concurrent.RecursiveAction;
class PrintTask extends RecursiveAction {
private static final int THRESHOLD = 5;
private final int start;
private final int end;
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i <= end; i++) {
System.out.print(i + " ");
}
return;
}
int mid = (start + end) / 2;
PrintTask left = new PrintTask(start, mid);
PrintTask right = new PrintTask(mid + 1, end);
left.fork();
right.compute();
}
}
|
六、底层工作窃取 Work-Stealing 原理 🔴
- ForkJoinPool 里每个线程都有双端队列
- 线程优先处理自己队列头部任务
- 空闲线程会从其他线程队列尾部偷任务执行
- 充分均衡CPU,减少线程空闲等待,比普通线程池更适合分治计算
七、ForkJoin 缺点 & 注意点
- 不适合IO阻塞任务:阻塞会占住工作线程,窃取机制失效
- 拆分阈值要合理:太小拆分太碎、开销大;太大并行度不够
- 避免在任务里频繁创建对象、加重GC
八、总结
- ForkJoin 是JDK7提供的分治线程池,核心是 Fork拆分、Join合并;
- 基于 工作窃取算法,每个线程有双端队列,空闲线程偷其他线程任务,CPU利用率高;
- 有返回值用 RecursiveTask,无返回值用 RecursiveAction;
- 适合大数据量CPU密集型递归计算,不适合IO阻塞场景。