编辑
2023-12-25
学习记录
00
请注意,本文编写于 465 天前,最后修改于 465 天前,其中某些信息可能已经过时。

目录

介绍
ForkJoinPool
ForkJoinTask
ForkJoinWorkerThread
使用

介绍

在JDK1.7中引入了一种新的Fork/Join线程池,它可以将一个大的任务拆分成多个小的任务并行执行并汇总执行结果。 Fork/Join采用的是分而治之的基本思想,分而治之就是将一个复杂的任务,按照规定的阈值划分成多个简单的小任务,然后将这些小任务的结果再进行汇总返回,得到最终的任务。

ForkJoinPool

ForkJoinPool是用于运行ForkJoinTasks的线程池,实现了Executor接口。 可以通过new ForkJoinPool()直接创建ForkJoinPool对象。

java
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode){ this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }

通过查看构造方法源码我们可以发现,在创建ForkJoinPool时,有以下4个参数:

  • parallelism:期望并发数。默认会使用Runtime.getRuntime().availableProcessors()的值
  • factory:创建ForkJoin工作线程的工厂,默认为defaultForkJoinWorkerThreadFactory
  • handler:执行任务时遇到不可恢复的错误时的处理程序,默认为null
  • asyncMode:工作线程获取任务使用FIFO模式还是LIFO模式,默认为LIFO

ForkJoinTask

ForkJoinTask是一个对于在ForkJoinPool中运行任务的抽象类定义。 可以通过少量的线程处理大量任务和子任务,ForkJoinTask实现了Future接口。主要通过fork()方法安排异步任务执行,通过join()方法等待任务执行的结果。 想要使用ForkJoinTask通过少量的线程处理大量任务,需要接受一些限制。

  • 拆分的任务中避免同步方法或同步代码块;
  • 在细分的任务中避免执行阻塞I/O操作,理想情况下基于完全独立于其他正在运行的任务访问的变量;
  • 不允许在细分任务中抛出受检异常。

因为ForkJoinTask是抽象类不能被实例化,所以在使用时JDK为我们提供了三种特定类型的ForkJoinTask父类供我们自定义时继承使用。

  • RecursiveAction:子任务不返回结果
  • RecursiveTask:子任务返回结果
  • CountedCompleter:在任务完成执行后会触发执行

ForkJoinWorkerThread

ForkJoinPool中用于执行ForkJoinTask的线程。

ForkJoinPool既然实现了Executor接口,那么它和我们常用的ThreadPoolExecutor之前又有什么差异呢?

如果们使用ThreadPoolExecutor来完成分治法的逻辑,那么每个子任务都需要创建一个线程,当子任务的数量很大的情况下,可能会达到上万个,那么使用ThreadPoolExecutor创建出上万个线程,这显然是不可行、不合理的;

而ForkJoinPool在处理任务时,并不会按照任务开启线程,只会按照指定的期望并行数量创建线程。在每个线程工作时,如果需要继续拆分子任务,则会将当前任务放入ForkJoinWorkerThread的任务队列中,递归处理直到最外层的任务。

使用

java
public class ForkJoinMain { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> rootTask = forkJoinPool.submit(new SumForkJoinTask(1L, 10_0000_0000L)); System.out.println("计算结果:" + rootTask.get()); } } class SumForkJoinTask extends RecursiveTask<Long> { private final Long min; private final Long max; private Long threshold = 1000L; public SumForkJoinTask(Long min, Long max) { this.min = min; this.max = max; } @Override protected Long compute() { // 小于阈值时直接计算 if ((max - min) <= threshold) { long sum = 0; for (long i = min; i < max; i++) { sum = sum + i; } return sum; } // 拆分成小任务 long middle = (max + min) >>> 1; SumForkJoinTask leftTask = new SumForkJoinTask(min, middle); leftTask.fork(); SumForkJoinTask rightTask = new SumForkJoinTask(middle, max); rightTask.fork(); // 汇总结果 return leftTask.join() + rightTask.join(); } }

本文作者:Weee

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!