ForkJoin示例及原理

注意
本文最后更新于 2024-05-09,文中内容可能已过时。

一、ForkJoin 是什么

ForkJoinPool 是 JDK7 引入的分治任务线程池。 核心思想:

  • Fork:大任务拆分成若干小任务
  • Join:小任务执行完,结果合并汇总

底层采用 工作窃取算法(Work-Stealing): 空闲线程偷偷去别的线程队列偷任务执行,充分利用CPU,适合密集计算、递归拆分场景。

二、适用场景

  1. 大数据量递归计算:求和、排序、遍历树、批量数据处理
  2. 适合 CPU密集型 任务
  3. 不适合 IO阻塞型 任务(阻塞会浪费工作窃取机制)

三、ForkJoin 两大核心类

  1. RecursiveTask:有返回值(常用)
  2. RecursiveAction:无返回值

四、完整可运行示例:1~100 累加求和

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

// 1. 定义拆分任务:有返回值用 RecursiveTask
class SumTask extends RecursiveTask<Long> {

    // 阈值:小于等于10个数就直接计算,不再拆分
    private static final int THRESHOLD = 10;
    private final long start;
    private final long end;

    public SumTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        // 区间小,直接计算
        if (end - start <= THRESHOLD) {
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }

        // 2. 拆分:Fork
        long mid = (start + end) / 2;
        SumTask left = new SumTask(start, mid);
        SumTask right = new SumTask(mid + 1, end);

        // 异步执行左任务
        left.fork();
        // 同步执行右任务
        Long rightResult = right.compute();
        // 获取左任务结果
        Long leftResult = left.join();

        // 3. 合并结果:Join
        return leftResult + rightResult;
    }
}

public class ForkJoinDemo {
    public static void main(String[] args) {
        // 创建 ForkJoin 线程池
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 提交总任务:计算 1~100 累加
        SumTask totalTask = new SumTask(1, 100);

        Long result = forkJoinPool.invoke(totalTask);
        System.out.println("1~100 累加结果:" + result);
        // 关闭线程池
        forkJoinPool.shutdown();
    }
}

五、无返回值示例 RecursiveAction

适合只做批量处理、不需要返回结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.RecursiveAction;

class PrintTask extends RecursiveAction {
    private static final int THRESHOLD = 5;
    private final int start;
    private final int end;

    public PrintTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            for (int i = start; i <= end; i++) {
                System.out.print(i + " ");
            }
            return;
        }
        int mid = (start + end) / 2;
        PrintTask left = new PrintTask(start, mid);
        PrintTask right = new PrintTask(mid + 1, end);
        left.fork();
        right.compute();
    }
}

六、底层工作窃取 Work-Stealing 原理 🔴

  1. ForkJoinPool 里每个线程都有双端队列
  2. 线程优先处理自己队列头部任务
  3. 空闲线程会从其他线程队列尾部偷任务执行
  4. 充分均衡CPU,减少线程空闲等待,比普通线程池更适合分治计算

七、ForkJoin 缺点 & 注意点

  1. 不适合IO阻塞任务:阻塞会占住工作线程,窃取机制失效
  2. 拆分阈值要合理:太小拆分太碎、开销大;太大并行度不够
  3. 避免在任务里频繁创建对象、加重GC

八、总结

  • ForkJoin 是JDK7提供的分治线程池,核心是 Fork拆分、Join合并
  • 基于 工作窃取算法,每个线程有双端队列,空闲线程偷其他线程任务,CPU利用率高;
  • 有返回值用 RecursiveTask,无返回值用 RecursiveAction;
  • 适合大数据量CPU密集型递归计算,不适合IO阻塞场景。