接口 | 描述 |
---|---|
BaseStream<T,S extends BaseStream<T,S>> |
用于流的基本接口,它是支持顺序和并行聚合操作的元素的序列。
|
Collector<T,A,R> |
一个
mutable reduction operation积累输入元素到一个可变的结果的容器,可随意变换积累的结果到最后表示,毕竟输入元素已被处理。
|
DoubleStream |
支持序列和并行聚合操作的原始双值元素序列。
|
DoubleStream.Builder |
一
DoubleStream 易变的建设者。
|
IntStream |
原始序列的int值元素支持串行和并行的聚合操作。
|
IntStream.Builder |
一个
IntStream 易变的建设者。
|
LongStream |
支持序列和并行聚合操作的原始长值元素序列。
|
LongStream.Builder |
一
LongStream 易变的建设者。
|
Stream<T> |
支持顺序和并行聚合操作的元素序列。
|
Stream.Builder<T> |
一
Stream 易变的建设者。
|
类 | 描述 |
---|---|
Collectors |
实现
Collector 实现各种有用的还原操作,如积累元素的集合,总结元素根据不同的标准,等等。
|
StreamSupport |
用于创建和操作流的低电平实用方法。
|
Enum | 描述 |
---|---|
Collector.Characteristics |
特征表明一个
Collector 性能,可用于优化还原的实现。
|
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
这里我们使用widgets
,一Collection<Widget>
,作为一个流源,然后进行过滤图减少流获得的红色小部件的重量的总和。(求和是一个reduction操作。例)
在这个包中引入的关键抽象是流。这类Stream
,IntStream
,LongStream
,和DoubleStream
是流对象和原始int
,long
和double
类型。流从不同的集合在几个方面:
Stream
产生一个新的Stream
没有过滤的元素,而不是从源集合中移除元素。String
连续三个元音”不需要检查所有输入的字符串。流的操作分为中级(Stream
-producing)业务和终端(价值或副作用产生)的操作。中间操作总是懒惰的。limit(n)
或findFirst()
可以允许无限流计算在有限的时间内完成。Iterator
,一个新的流必须产生重温源相同的元素。Collection
通过stream()
和parallelStream()
方法;Arrays.stream(Object[])
;Stream.of(Object[])
,IntStream.range(int, int)
或Stream.iterate(Object, UnaryOperator)
;BufferedReader.lines()
;Files
获得方法;Random.ints()
;BitSet.stream()
,Pattern.splitAsStream(java.lang.CharSequence)
,和JarFile.stream()
。额外的流源可通过第三方库使用these techniques提供。
流操作分为中间和终端操作,并组合成流管道。蒸汽管道由一个源(如Collection
、数组、发电功能,或I/O通道);其次是零个或多个如Stream.filter
或Stream.map
中间业务;和终端操作如Stream.forEach
或Stream.reduce
。
中间业务返回一个新的流。他们总是懒洋洋的;执行中间操作如filter()
实际上并没有进行任何的过滤,而是创建一个新的流,当走过,包含的元素的初始流匹配给定的谓词。直到执行管道的终端操作时,管道源的遍历才开始。
终端操作,如Stream.forEach
或IntStream.sum
,可以遍历流产生的结果或副作用。在执行终端操作后,将被视为消耗流管道,并不能使用;如果您需要再次遍历同一数据源,则必须返回数据源以获得新的流。在几乎所有的情况下,终端操作都渴望,完成他们的遍历的数据源和处理的管道,然后返回。只有终端操作iterator()
和spliterator()
不是;这些是作为一个“逃生舱”使任意客户端的事件,现有业务不足以控制管道遍历任务。
处理流懒洋洋地允许显著功效;在管道如过滤图和上面的例子,过滤、映射和总结可以融合到单个数据传递,以最小的中间状态。懒惰也允许避免检查所有的数据,当它是没有必要的操作,如“找到第一个字符串超过1000个字符”,它是唯一必要的检查只是足够的字符串,找到一个具有所需的特性,而不检查所有的字符串从源提供。(当输入流是无限的,而不仅仅是大的,这种行为变得更加重要。)
中间业务进一步分为有状态和无状态的操作。无状态的操作,如filter
和map
,不保留状态从以前见过的元素时,处理的一种新的元素,每个元素可以对其他元素的独立的业务处理。有状态的操作,如distinct
和sorted
,可能包含先前看到的新的元素时,元素的处理状态。
有状态的操作可能需要处理整个输入之前产生的结果。例如,一个不能产生任何排序一个流的结果,直到一个已经看到流的所有元素。因此,在并行计算,一些管道状态中间操作可能需要多个传递数据或可能需要缓冲的重要数据。包含完全无状态中间操作的管道可以在一个单一的通中处理,无论是连续的或并行的,以最小的数据缓冲。
此外,一些业务被视为短路操作。中间业务是短路的话,当面对无限的输入,它可能会产生一个有限流的结果。终端短路的话,当面对无限的输入,它可以在有限的时间内终止。在管道具有短路手术是必要的,但不是充分的,对于一个无限流的正常终止,在有限的时间内处理条件。
一个明确的for-
loop处理元素本质上是串行的。流便于并行执行重新计算为总运营的管道,而不是势在必行的操作单元。所有的流操作可以在串行或并行执行。在JDK的流实现创建并行串行流除非明确要求。例如,Collection
方法Collection.stream()
和Collection.parallelStream()
,产生串行和并行流分别;其他流承载方法如IntStream.range(int, int)
产生顺序流但这些流可以有效地并行化方法通过调用它们的BaseStream.parallel()
。要执行以前的“小部件”查询的权重的总和,我们会做:
int sumOfWeights = widgets.
parallelStream()
.filter(b -> b.getColor() == RED) .mapToInt(b -> b.getWeight()) .sum();
串行和并行版本的这个例子的差别仅为初始流的创作,用“parallelStream()
”而不是“stream()
”。当终端操作开始时,流管道按顺序或并行执行,取决于它被调用的流的方向。流是否将串行或并行执行可与isParallel()
法确定,和一个流的方向可以用行动BaseStream.sequential()
和BaseStream.parallel()
修饰。当发起终端操作时,流管道按顺序或并行执行,取决于调用的流的模式。
除了确定为明确的不确定性的操作,如findAny()
,流是否执行顺序或并行,不应改变计算的结果。
大多数流操作接受描述用户指定的行为参数,这往往是lambda表达式。为了保持正确的行为,这些行为参数必须是不干扰的,在大多数情况下,必须是无状态的。这样的参数一直是一个functional interface如Function
实例,经常是lambda表达式或方法引用。
ArrayList
。这是可能的,只有当我们可以防止在一个流管道的执行过程中的数据源的干扰。除了逃生舱口操作
iterator()
和
spliterator()
,开始执行,当终端操作调用,和两端的终端操作完成时。对于大多数数据源,防止干扰意味着确保在执行过程中的所有过程中的数据源不被修改。这是一个显着的例外是流,其来源是并发集合,这是专门设计来处理并发修改。并发流的来源是那些
Spliterator
报告
CONCURRENT
特性。
因此,在源可能不并发的流管道中的行为参数永远不应该修改流的数据源。一个行为参数被称为干扰一个非并发的数据源,如果它修改,或将要修改的原因,该流的数据源。不干涉的需要适用于所有的管道,而不仅仅是平行的管道。除非流源是并行的,一个流管道执行期间修改一个流的数据源可能导致异常,不正确的答案,或不符合标准的行为。乖巧的流源,源可以修改终端操作开始之前,这些修改将反映在覆盖的元素。例如,考虑下面的代码:
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));
第一个列表创建由两个字符串:“一”和“二”。然后从该列表中创建一个流。下一个列表是通过添加三分之一个字符串修改:“三”。最后,流的元素被收集并结合在一起。由于名单的修改在终端操作的结果
collect
开始将一个字符串“一二三”。所有的河流从JDK集合返回,和大多数其他JDK的类,在这种方式中表现得很好;被其他库生成的流,看到建筑乖流要求
Low-level stream construction。
map()
在参数:
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
这里,如果映射操作的并行执行,对于相同的输入的结果可能会有所不同从跑来跑去,由于线程调度的差异,而与一个无国籍的lambda表达式的结果都是一样的。
还请注意,试图从行为参数访问可变状态呈现给你关于安全和性能的一个不错的选择;如果你不同步访问的状态,你有一个数据竞争和因此你的代码了,但是如果你同步访问的状态,你将有可能破坏你所寻求的竞争受益于并行。最好的方法是避免状态行为参数流操作完全;通常有一种方法来调整流管道,避免有状态。
如果行为参数也有副作用,除非明确声明,没有保证对这些副作用的visibility其他线程,也没有任何保证相同的流管道内的“同一”元素的不同业务在同一个线程中执行。此外,这些影响的顺序可能是令人惊讶的。即使管道约束产生的结果是随着流源遇到的顺序一致(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray()
必须产生[0, 2, 4, 6, 8]
),不保证所作的映射函数应用于单个元素的顺序,或是线程的任何行为参数是给定的元素执行。
很多人可能会使用的副作用能更安全有效地表达无副作用的计算,如使用reduction代替可变蓄电池。然而,副作用,如使用println()
用于调试目的通常是无害的。少量的流操作,如forEach()
和peek()
,只能通过副作用;这些应谨慎使用。
作为一个例子,如何将一个流管道,不当使用的副作用之一,不,下面的代码搜索字符串的流匹配给定的正则表达式,并将匹配列表中的。
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
这个代码不必要使用的副作用。如果并行执行,对
ArrayList
非线程安全会导致不正确的结果,并添加需要同步会造成冲突,破坏并行效益。此外,用在这里是完全不必要的副作用;
forEach()
可以简单地用还原操作,更安全,更有效的替代,更适合并行化:
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!
流可能或可能没有定义的相遇顺序。是否一个流有一个遇到顺序取决于源和中间操作。一定的流源(如List
或数组)本质上是有序的,而其他人(如HashSet
)不。一些中间业务,如sorted()
,可能在一个无序流实施遇到的顺序,以及其他可能呈现有序流无序,如BaseStream.unordered()
。此外,一些终端操作可以忽略遇到的顺序,如forEach()
。
如果流是有序的,大多数的操作限制在他们遇到的元素排序操作;如果一个流的源是一个List
含[1, 2, 3]
,然后执行map(x -> x*2)
结果必须[2, 4, 6]
。然而,如果源没有定义遭遇订单,然后任何排列的值将是一个有效的结果[2, 4, 6]
。
对于连续的流,遇到顺序的存在或不存在不影响性能,只有决定论。如果一个流是有序的,相同的源上的相同的流管道的重复执行将产生一个相同的结果,如果它不被订购,重复执行可能会产生不同的结果。
对于并行流,放宽排序约束有时可以实现更高效的执行。一定的聚合操作,如过滤重复(distinct()
)或减少(Collectors.groupingBy()
)可以更有效地实现如果元素的顺序是不相关的。同样的,本质上是在遇到订单操作,如limit()
可能需要缓冲,以确保适当的排序,破坏并行效益。如果流遇到了一个订单,但用户并不特别在意,遇到秩序,明确的命令流unordered()
可能提高某些状态或终端操作的并行性能。然而,大多数流管道,如“笔重块”上面的例子,还是并行效率甚至在序约束。
reduce()
和
collect()
,以及多个专业的还原形式如
sum()
,
max()
,或
count()
当然,这样的操作可以很容易地实现简单的顺序循环,如:
int sum = 0;
for (int x : numbers) {
sum += x;
}
。然而,我们有充分的理由希望降低操作过变积累如上面。不仅是减少“抽象”--它在流作为一个整体,而不是单独的元素,但构建适当的降低操作本质上是并行的,只要功能(S)用于处理元素
associative和
stateless。例如,因为我们想找的总和的数字流,我们可以写:
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
或:
int sum = numbers.stream().reduce(0, Integer::sum);
这些减少操作可以安全地运行,几乎没有修改:
int sum = numbers.parallelStream().reduce(0, Integer::sum);
还原parallellizes因为实现对并行数据子集的操作,然后将中间结果得到最终的正确答案。(即使语言有一个“分别平行”的结构,突变积累的方法,仍需要开发者对共享变量sum
积累,以及所需的同步提供线程安全的更新将可能消除并行。任何性能增益)而不是使用reduce()
删除所有并行还原操作的负担,和图书馆提供一个高效的并行执行,没有额外的同步要求。
前面所显示的“小部件”示例演示了如何减少与其他操作结合起来,以替换批量操作的循环。如果widgets
是一家集Widget
对象,其中有一个getWeight
方法,我们可以找到最重的小工具:
OptionalInt heaviest = widgets.parallelStream()
.mapToInt(Widget::getWeight)
.max();
在更一般的形式,在类型<T>
产生的结果<U>
型元素reduce
操作需要三个参数:
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
这里的单位元是初始种子值的降低和默认的结果如果没有输入元素。蓄电池的功能需要一个部分的结果和下一个元素,并产生一个新的部分结果。组合功能结合两部分的结果产生一个新的部分结果。(组合是必要的平行减少,其中输入的分区,每个分区的一部分积累,计算后的部分结果相结合以产生一个最终的结果。)
更正式的identity
值必须为组合功能的身份。这意味着,所有u
,combiner.apply(identity, u)
等于u
。此外,该combiner
功能必须associative必须用accumulator
功能兼容:所有u
和t
,combiner.apply(u, accumulator.apply(identity, t))
必须equals()
到accumulator.apply(u, t)
。
三个参数形式是两个参数形式的一个推广,将一个映射步骤合并成一个积累步骤。我们可以重新铸造简单的加权总和的例子,使用更一般的形式如下:
int sumOfWeights = widgets.stream()
.reduce(0,
(sum, b) -> sum + b.getWeight())
Integer::sum);
虽然明确的Map-Reduce形式更具可读性和因此通常应优先。广义的形式提供的情况下,显着的工作可以被优化,通过组合映射和减少到一个单一的功能。
Collection
或
StringBuilder
,正如处理的元素在流。
如果我们想把一个流的字符串拼接成一个长字符串,我们可以达到与普通还原:
String concatenated = strings.reduce("", String::concat)
我们会得到预期的结果,甚至会在平行工作。然而,我们可能不高兴的表现!这样一个实现将做大量的字符串复制,并且运行时间将是O(n(2))的字符数。一个更高效的方式将积累的结果为StringBuilder
,这是一个积累的字符串可变容器。我们可以用同样的技术来并行可变减少因为我们做普通的还原。
可变减速操作称为collect()
,它收集在一起的结果到结果的容器如Collection
。一collect
操作需要三个功能:供应商功能构建结果容器的新实例,累加器函数将输入元件为结果的容器,和结合作用的合并之一结果在另一个容器的内容。这种形式非常类似于普通还原的一般形式:
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
与reduce()
,在这个抽象的方式表达collect
的一个好处是,它是适合并行化:我们可以积累部分结果并结合他们,只要积累和结合功能满足相应的要求。例如,收集流中的元素的字符串表示一个ArrayList
,我们可以写出每形成明显的顺序:
ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
strings.add(element.toString());
}
或者我们可以使用一个并行收集形式:
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
(c, e) -> c.add(e.toString()),
(c1, c2) -> c1.addAll(c2));
或拉映射操作了蓄能器的功能,我们可以更简洁的表达:
List<String> strings = stream.map(Object::toString)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
这里,我们的供应商是
ArrayList constructor
,蓄能器添加字符串化元素的
ArrayList
,和组合只是利用
addAll
来复制ST从一个容器到另一个容器的环。
三个方面collect
--供应商、蓄能器、合路--紧密结合。我们可以用一个Collector
抽象捕获所有三个方面。收集串成List
上面的例子可以改写为使用标准的Collector
:
List<String> strings = stream.map(Object::toString)
.collect(Collectors.toList());
包装可减少到集热器的另一个优势:可组合性。这类Collectors
包含收藏家许多预定义的工厂,包括组合器,变换器进入另一个。例如,假设我们有一个计算员工工资总额的收集器,如下所示:
Collector<Employee, ?, Integer> summingSalaries
= Collectors.summingInt(Employee::getSalary);
(第二类型参数的
?
仅仅表明我们不在乎这个收藏家。中间用表示)如果我们想创造一个集按工资部门的总和,我们可以利用
summingSalaries
使用
groupingBy
:
Map<Department, Integer> salariesByDept
= employees.stream().collect(Collectors.groupingBy(Employee::getDepartment,
summingSalaries));
与常规的还原操作,collect()
操作只能并行如果合适的条件都满足了。对于任何部分积累的结果,将它与一个空的结果容器相结合,必须产生一个等效的结果。那是,一部分积累的结果,是p
因蓄电池系列和组合调用,p
必须相当于combiner.apply(p, supplier.get())
。
此外,计算是分裂的,它必须产生一个等效的结果。对于任何输入元素t1
和t2
,在下面的计算结果r1
和r2
必须等价:
A a1 = supplier.get();
accumulator.accept(a1, t1);
accumulator.accept(a1, t2);
R r1 = finisher.apply(a1); // result without splitting
A a2 = supplier.get();
accumulator.accept(a2, t1);
A a3 = supplier.get();
accumulator.accept(a3, t2);
R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting
在这里,等价性一般是指根据Object.equals(Object)
。但在某些情况下等价可放宽考虑差异。
collect()
产生
Map
,如:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.collect(Collectors.groupingBy(Transaction::getBuyer));
它实际上可能是并行执行的操作会适得其反。这是因为步骤的结合(合并一
Map
到另一个键)可以是昂贵的一些
Map
实现。
假设,然而,用于减少的结果是一个同时可收集容器,如ConcurrentHashMap
。在这种情况下,蓄能器的并行调用实际上可以把结果同时到相同的结果的容器,消除合并器合并结果容器需要。这可能提供了一个升压到并行执行性能。我们称之为并发还原。
一个支持并发的减少是有Collector.Characteristics.CONCURRENT
特征Collector
。然而,并发集合也有一个缺点。如果多个线程将结果存放在一个共享的容器中,则沉积结果的顺序是不确定的。因此,并发减少是唯一可能的,如果排序是不重要的流正在处理。的Stream.collect(Collector)
实施只会执行一个同步还原如果
Collector.Characteristics.CONCURRENT
特性,和;Collector.Characteristics.UNORDERED
。BaseStream.unordered()
方法。例如:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.unordered()
.collect(groupingByConcurrent(Transaction::getBuyer));
(其中
Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>)
是
groupingBy
并行等效)。
请注意,如果它是重要的是一个给定的键的元素出现在他们出现在源中的顺序,那么我们不能使用并发减少,因为顺序是并发插入的人员伤亡之一。然后,我们将被约束,以实现无论是一个连续的减少或合并为基础的并行还原。
op
是联想如果以下认为:
(a op b) op c == a op (b op c)
这平行的重要性评价可以看出,如果我们扩大这四个方面:
a op b op c op d == (a op b) op (c op d)
所以我们可以评估
(a op b)
平行
(c op d)
,然后调用
op
结果。
联想业务的例子包括数字之外,最小,最大,和字符串连接。
Collection.stream()
或
Arrays.stream(Object[])
获得流。这些流轴承方法如何实现?
StreamSupport
类创建一个流有一些低级的方法,都使用某种形式的一个Spliterator
。一spliterator是一个Iterator
并行模拟;它描述了一个(可能是无限的)元素的集合,与顺序推进,支持批量的遍历,并剥离部分输入到另一个spliterator这可以并行处理。在最低水平,所有的数据流是由一个spliterator驱动。
有一些实现的选择实施spliterator,几乎都是权衡实施简单和运行性能的流使用spliterator。最简单的,但至少高性能的方法来创建一个spliterator是使用Spliterators.spliteratorUnknownSize(java.util.Iterator, int)
迭代器创建一个。而这样的spliterator会工作,它可能会提供并行性能差,因为我们已经失去了大小的信息(多大是底层的数据集),以及被约束到一个简单的分裂算法。
一个高质量的spliterator将提供平衡与已知大小的分裂,准确的尺寸信息,和其他一些characteristics
的spliterator或数据,可实现优化执行。
对于可变数据源spliterators有额外的挑战;结合数据时,由于数据的时间spliterator创建时间和蒸汽管道之间执行的变化。理想情况下,一个流的spliterator会报告IMMUTABLE
或CONCURRENT
特征;如果不应该late-binding。如果源不能直接提供推荐spliterator,可以间接地提供了一spliterator使用Supplier
,构建流通过Supplier
-accepting版本的stream()
。是的spliterator从供应商只有在蒸汽管道的终端操作开始了。
这些要求显着减少的流源和执行的流管道的突变之间的潜在干扰的范围。基于期望的特性spliterators溪流,或那些使用基于工厂形式的供应商,是免疫的终端操作开始之前的数据源(提供修改行为参数的操作流满足互不干涉和无国籍要求的标准)。更多细节见Non-Interference。
Submit a bug or feature
For further API reference and developer documentation, see Java SE Documentation. That documentation contains more detailed, developer-targeted descriptions, with conceptual overviews, definitions of terms, workarounds, and working code examples.
Copyright © 1993, 2014, Oracle and/or its affiliates. All rights reserved.