编辑
2022-12-15
学习记录
00
请注意,本文编写于 840 天前,最后修改于 840 天前,其中某些信息可能已经过时。

目录

什么是并行流?
Fork/Join 框架
Fork/Join 框架与传统线程池有啥区别?
Fork/Join框架实例
在Java8中如何优雅的切换并行流和串行流呢?
并行流优势如此大,那么我们应该始终使用它吗?

什么是并行流?

简单来说,并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。 Java 8 中将并行进行了优化,我们可以很容易的对数据进行并行操作。 Stream API 可以声明性地通过 parallel() 与sequential() 在并行流与顺序流之间进行切换 。

Fork/Join 框架

Fork/Join 框架: 就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总 。

Fork/Join 框架与传统线程池有啥区别?

采用 “工作窃取”模式(work-stealing):

当执行新的任务时它可以将其拆分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。

相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上。在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架的实现中,如果某个子任务由于等待另外一个子任务的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子任务来执行。这种方式减少了线程的等待时间,提高了程序的性能。

Fork/Join框架实例

了解了ForJoin框架的原理之后,我们就来手动写一个使用Fork/Join框架实现累加和的示例程序,以帮助读者更好的理解Fork/Join框架。好了,不废话了,上代码,大家通过下面的代码好好体会下Fork/Join框架的强大。

package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; @Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> { public static final int threshold = 2; private int start; private int end; public ForkJoinTaskExample(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //如果任务足够小就计算任务 boolean canCompute = (end - start) <= threshold; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任务大于阈值,就分裂成两个子任务计算 int middle = (start + end) / 2; ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); // 执行子任务 leftTask.fork(); rightTask.fork(); // 等待任务执行结束合并其结果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合并子任务 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool(); //生成一个计算任务,计算1+2+3+4 ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); //执行一个任务 Future<Integer> result = forkjoinPool.submit(task); try { log.info("result:{}", result.get()); } catch (Exception e) { log.error("exception", e); } } }

Java8中的并行流实例 Java8对并行流进行了大量的优化,并且在开发上也极大的简化了程序员的工作量,我们只需要使用类似如下的代码就可以使用Java8中的并行流来处理我们的数据。

LongStream.rangeClosed(0, 10000000L).parallel().reduce(0, Long::sum);

在Java8中如何优雅的切换并行流和串行流呢?

Stream API 可以声明性地通过 parallel() 与sequential() 在并行流与串行流之间进行切换 。

并行流优势如此大,那么我们应该始终使用它吗?

答案是否定的, 通过上述代码我们可以看到仅通过添加parallel()即可轻松将顺序流转换为并行流,但是这并不意味着我们应该始终使用它。 在使用并行流时我们需要考虑很多因素,否则我们将会遭受并行流所带来的负面影响。 并行流比顺序流具有更高的开销,并且在线程之间的上下文进行协调切换需要花费大量时间。 仅在以下情况下,才需要考虑是否使用并行流:

  • 需要处理大量数据集。
  • 我们知道 Java 使用ForkJoinPool实现并行性,ForkJoinPool派生源流并提交执行,因此源数据流应该是可拆分的。例如:ArrayList非常容易拆分,因为我们可以通过索引找到中间元素并将其拆分,但是LinkedList很难拆分,并且在大多数情况下表现不佳。在这种情况下就不适合用并行流。
  • 我们在处理问题的时候确实遇到流性能问题,否则请不要为了并行而并行。
  • 我们需要确保线程之间的所有共享资源都是正确同步,否则可能会产生数据不一致问题。

本文作者:Weee

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!