JEP 485:流收集器
概括
增强Stream API以支持自定义中间操作。这将允许流管道以现有内置中间操作难以实现的方式转换数据。
历史
流收集器 (Stream Gatherers) 由 JDK 22 中的JEP 461作为预览功能提出,并由 JDK 23 中的JEP 473重新预览。我们在此建议在 JDK 24 中完成 API,不做任何更改。
目标
-
使流水线更加灵活和富有表现力。
-
尽可能地允许自定义中间操作来操纵无限大小的流。
非目标
-
改变 Java 编程语言以更好地促进流处理并不是目标。
-
对使用 Stream API 的代码编译进行特殊处理并不是我们的目标。
动机
Java 8 引入了第一个专为 lambda 表达式设计的 API:Stream API。java.util.stream
流是惰性计算的、可能无限制的值序列。该 API 支持按顺序或并行处理流。
_流管道_由三部分组成:元素源、任意数量的中间操作和终端操作。例如:
long numberOfWords =
Stream.of("the", "", "fox", "jumps", "over", "the", "", "dog") // (1)
.filter(Predicate.not(String::isEmpty)) // (2)
.collect(Collectors.counting()); // (3)
这种编程风格既富有表现力又高效。使用构建器样式的 API,每个中间操作都会返回一个新的流;只有调用终止操作时才会开始求值。在此示例中,行 (1) 创建了一个流,但并未求值;行 (2) 设置了一个中间filter
操作,但仍然未求值该流;最后,行 (3) 上的终止collect
操作求值整个流管道。
Stream API 提供了一组相当丰富(虽然是固定的)中间和终端操作:映射、过滤、缩减、排序等。它还包括一个可扩展的终端操作,Stream::collect
它允许以多种方式汇总管道的输出。
Java 生态系统中流的使用如今已十分普遍,并且是许多任务的理想选择,但中间操作的固定集合意味着某些复杂任务无法轻松表达为流管道。所需的中间操作要么不存在,要么存在但不直接支持该任务。
举个例子,假设任务是获取一个字符串流并使其具有区别性,但区别性基于字符串长度而不是内容。也就是说,最多应发出一个长度为 1 的字符串,最多一个长度为 2 的字符串,最多一个长度为 3 的字符串,依此类推。理想情况下,代码看起来应该像这样:
var result = Stream.of("foo", "bar", "baz", "quux")
.distinctBy(String::length) // Hypothetical
.toList();
// result ==> [foo, quux]
不幸的是,distinctBy
不是内置的中间操作。最接近的内置操作是distinct
使用对象相等性来比较它们,以跟踪它已经看到的元素。也就是说,distinct
是有状态的,但在这种情况下使用了错误的状态:我们希望它根据字符串长度的相等性来跟踪元素,而不是字符串内容。我们可以通过声明一个根据字符串长度定义对象相等性的类来解决这个限制,将每个字符串包装在该类的实例中并应用于distinct
这些实例。然而,这种任务表达并不直观,并且会使代码难以维护:
record DistinctByLength(String str) {
@Override public boolean equals(Object obj) {
return obj instanceof DistinctByLength(String other)
&& str.length() == other.length();
}
@Override public int hashCode() {
return str == null ? 0 : Integer.hashCode(str.length());
}
}
var result = Stream.of("foo", "bar", "baz", "quux")
.map(DistinctByLength::new)
.distinct()
.map(DistinctByLength::str)
.toList();
// result ==> [foo, quux]
再举一个例子,假设任务是将元素分组为三个固定大小的组,但只保留前两组:[0, 1, 2, 3, 4, 5, 6, ...]
应该产生[[0, 1, 2], [3, 4, 5]]
。理想情况下,代码如下所示:
var result = Stream.iterate(0, i -> i + 1)
.windowFixed(3) // Hypothetical
.limit(2)
.toList();
// result ==> [[0, 1, 2], [3, 4, 5]]
不幸的是,没有内置的中间操作支持此任务。最好的选择是将固定窗口分组逻辑放在终端操作中,通过调用collect
自定义的Collector
。但是,我们必须在collect
操作之前进行固定大小的limit
操作,因为收集器无法collect
在出现新元素时发出信号表示它已完成 - 这在无限流中永远发生。此外,该任务本质上是关于有序数据的,因此让收集器并行执行分组是不可行的,并且它必须通过在调用其组合器时抛出异常来发出这一事实的信号。生成的代码很难理解:
var result
= Stream.iterate(0, i -> i + 1)
.limit(3 * 2)
.collect(Collector.of(
() -> new ArrayList<ArrayList<Integer>>(),
(groups, element) -> {
if (groups.isEmpty() || groups.getLast().size() == 3) {
var current = new ArrayList<Integer>();
current.add(element);
groups.addLast(current);
} else {
groups.getLast().add(element);
}
},
(left, right) -> {
throw new UnsupportedOperationException("Cannot be parallelized");
}
));
// result ==> [[0, 1, 2], [3, 4, 5]]
多年来,人们为 Stream API 提出了许多新的中间操作。其中大部分操作单独考虑时都是有意义的,但将它们全部添加会使(已经很大的)Stream API 更难学习,因为其操作更难发现。
Stream API 的设计者知道最好有一个扩展点,以便任何人都可以定义中间流操作。然而,当时他们并不知道这个扩展点应该是什么样子。最终他们发现,终端操作的扩展点,即Stream::collect(Collector)
,是有效的。我们现在可以对中间操作采取类似的方法。
总而言之,更多的中间操作可以创造更多的情境价值,使流更适合更多的任务。我们应该为自定义中间操作提供 API,让开发人员能够以自己喜欢的方式转换有限和无限流。
描述
Stream::gather(Gatherer)
_是一种新的中间流操作,它通过应用用户定义的实体(称为收集器)_来处理流的元素。通过此gather
操作,我们可以构建高效、可并行的流,这些流可实现几乎任何中间操作。Stream::gather(Gatherer)
中间操作与Stream::collect(Collector)
终端操作有何区别?
_收集器_表示对流元素的转换;它是java.util.stream.Gatherer
接口的一个实例。收集器可以以一对一、一对多、多对一或多对多的方式转换元素。它们可以跟踪先前看到的元素以影响后续元素的转换,它们可以短路以将无限流转换为有限流,并且可以启用并行执行。例如,收集器可以将一个输入元素转换为一个输出元素,直到某个条件变为真,此时它开始将一个输入元素转换为两个输出元素。
收集器由四个协同工作的函数定义:
-
可选的_初始化_函数提供了一个在处理流元素时保持私有状态的对象。例如,收集器可以存储当前元素,以便下次应用时,它可以将新元素与现在的上一个元素进行比较,并且只发出两者中较大的一个。实际上,这样的收集器将两个输入元素转换为一个输出元素。
-
集成_器_函数集成来自输入流的新元素,可能检查私有状态对象,也可能将元素发送到输出流。它还可以在到达输入流末尾之前终止处理;例如,搜索整数流中最大值的收集器可以在检测到时终止
Integer.MAX_VALUE
。 -
当输入流标记为并行时,可选的_组合函数可用于并行评估收集器。如果收集器不具备并行能力,那么它仍然可以成为并行流管道的一部分,但会按顺序进行评估。这对于操作本质上是有序的,因此无法并行化的情况很有用。_
-
当没有更多输入元素可供使用时,将调用可选的_完成器_函数。此函数可以检查私有状态对象,并可能发出额外的输出元素。例如,当调用其完成器时,在其输入元素中搜索特定元素的收集器可以报告失败,例如通过抛出异常。
调用时,Stream::gather
执行与以下步骤等效的操作:
-
创建一个
Downstream
对象,当给定收集器输出类型的元素时,将其传递到管道中的下一个阶段。 -
get()
通过调用其初始化方法获取收集器的私有状态对象。 -
通过调用其方法获取收集器的集成器
integrator()
。 -
当有更多输入元素时,调用集成器的integrate(...)方法,向其传递状态对象、下一个元素和下游对象。如果该方法返回,则终止
false
。 -
获取收集器的完成器并使用状态和下游对象调用它。
接口中声明的每个现有中间操作Stream
都可以通过调用实现该操作的收集器来实现gather
。例如,给定一个T
-typed 元素流,通过应用函数Stream::map
将每个T
元素转换为元素,然后将元素传递到下游;这只是一个无状态的一对一收集器。再举一个例子,采用一个谓词来确定是否应将输入元素传递到下游;这只是一个无状态的一对多收集器。事实上,每个流管道在概念上都等同于U``U``Stream::filter
source.gather(...).gather(...).gather(...).collect(...)
内置收集器
我们在java.util.stream.Gatherers类中引入了以下内置收集器:
-
fold
是一个有状态的多对一收集器,它逐步构建一个聚合,并在不再存在输入元素时发出该聚合。 -
mapConcurrent
是一个有状态的一对一收集器,它同时为每个输入元素调用一个提供的函数,直到达到提供的限制。 -
scan
是一个有状态的一对一收集器,它将提供的函数应用于当前状态和当前元素以生成下一个元素,然后将其传递给下游。 -
windowFixed
是一个有状态的多对多收集器,它将输入元素分组到所提供大小的列表中,当列表已满时向下游发射窗口。 -
windowSliding
是一个有状态的多对多收集器,它将输入元素分组到指定大小的列表中。在第一个窗口之后,每个后续窗口都是通过删除第一个元素并从输入流中附加下一个元素,从其前一个窗口的副本创建的。
并行评估
收集器的并行评估分为两种不同的模式。当未提供组合器时,流库仍可以通过并行执行上游和下游操作来提取并行性,类似于可短路parallel().forEachOrdered()
操作。当提供组合器时,并行评估类似于可短路操作parallel().reduce()
。
组合收集器
收集器通过方法支持组合andThen(Gatherer)
,该方法将两个收集器连接起来,第一个收集器生成第二个收集器可以使用的元素。这样可以通过组合更简单的收集器来创建复杂的收集器,就像函数组合一样。从语义上讲,
source.gather(a).gather(b).gather(c).collect(...)
相当于
source.gather(a.andThen(b).andThen(c)).collect(...)
采集者与收藏者
界面设计Gatherer
深受 设计的影响Collector
。主要区别在于:
-
Gatherer
使用Integrator
而不是BiConsumer
进行每个元素的处理,因为它需要对象的额外输入参数Downstream
,并且它需要返回来boolean
指示是否应该继续处理。 -
Gatherer
使用BiConsumer
作为其完成器而不是 ,Function
因为它需要为其Downstream
对象提供额外的输入参数,并且因为它无法返回结果,因此是void
。
示例:拥抱潮流
有时,由于缺乏适当的中间操作,我们不得不将流评估为列表,并循环运行分析逻辑。例如,假设我们有一个按时间顺序排列的温度读数流:
record Reading(Instant obtainedAt, int kelvins) {
Reading(String time, int kelvins) {
this(Instant.parse(time), kelvins);
}
static Stream<Reading> loadRecentReadings() {
// In reality these could be read from a file, a database,
// a service, or otherwise
return Stream.of(
new Reading("2023-09-21T10:15:30.00Z", 310),
new Reading("2023-09-21T10:15:31.00Z", 312),
new Reading("2023-09-21T10:15:32.00Z", 350),
new Reading("2023-09-21T10:15:33.00Z", 310)
);
}
}
进一步假设我们想要检测此流中的可疑变化,定义为在五秒时间窗口内两次连续读数的温度变化超过 30° 开尔文:
boolean isSuspicious(Reading previous, Reading next) {
return next.obtainedAt().isBefore(previous.obtainedAt().plusSeconds(5))
&& (next.kelvins() > previous.kelvins() + 30
|| next.kelvins() < previous.kelvins() - 30);
}
这需要对输入流进行顺序扫描,因此我们必须避免声明式流处理并命令式地实现我们的分析:
List<List<Reading>> findSuspicious(Stream<Reading> source) {
var suspicious = new ArrayList<List<Reading>>();
Reading previous = null;
boolean hasPrevious = false;
for (Reading next : source.toList()) {
if (!hasPrevious) {
hasPrevious = true;
previous = next;
} else {
if (isSuspicious(previous, next))
suspicious.add(List.of(previous, next));
previous = next;
}
}
return suspicious;
}
var result = findSuspicious(Reading.loadRecentReadings());
// result ==> [[Reading[obtainedAt=2023-09-21T10:15:31Z, kelvins=312],
// Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350]],
// [Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350],
// Reading[obtainedAt=2023-09-21T10:15:33Z, kelvins=310]]]
然而,使用收集器,我们可以更简洁地表达这一点:
List<List<Reading>> findSuspicious(Stream<Reading> source) {
return source.gather(Gatherers.windowSliding(2))
.filter(window -> (window.size() == 2
&& isSuspicious(window.get(0),
window.get(1))))
.toList();
}
示例:定义收集器
windowFixed
类中声明的收集器可以Gatherers
写为Gatherer
接口的直接实现:
record WindowFixed<TR>(int windowSize)
implements Gatherer<TR, ArrayList<TR>, List<TR>>
{
public WindowFixed {
// Validate input
if (windowSize < 1)
throw new IllegalArgumentException("window size must be positive");
}
@Override
public Supplier<ArrayList<TR>> initializer() {
// Create an ArrayList to hold the current open window
return () -> new ArrayList<>(windowSize);
}
@Override
public Integrator<ArrayList<TR>, TR, List<TR>> integrator() {
// The integrator is invoked for each element consumed
return Gatherer.Integrator.ofGreedy((window, element, downstream) -> {
// Add the element to the current open window
window.add(element);
// Until we reach our desired window size,
// return true to signal that more elements are desired
if (window.size() < windowSize)
return true;
// When the window is full, close it by creating a copy
var result = new ArrayList<TR>(window);
// Clear the window so the next can be started
window.clear();
// Send the closed window downstream
return downstream.push(result);
});
}
// The combiner is omitted since this operation is intrinsically sequential,
// and thus cannot be parallelized
@Override
public BiConsumer<ArrayList<TR>, Downstream<? super List<TR>>> finisher() {
// The finisher runs when there are no more elements to pass from
// the upstream
return (window, downstream) -> {
// If the downstream still accepts more elements and the current
// open window is non-empty, then send a copy of it downstream
if(!downstream.isRejecting() && !window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
};
}
}
使用示例:
jshell> Stream.of(1,2,3,4,5,6,7,8,9).gather(new WindowFixed(3)).toList()
$1 ==> [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
示例:临时收集器
windowFixed
另外,也可以通过工厂方法以临时方式编写收集器Gatherer.ofSequential(...)
:
/**
* Gathers elements into fixed-size groups. The last group may contain fewer
* elements.
* @param windowSize the maximum size of the groups
* @return a new gatherer which groups elements into fixed-size groups
* @param <TR> the type of elements the returned gatherer consumes and produces
*/
static <TR> Gatherer<TR, ?, List<TR>> fixedWindow(int windowSize) {
// Validate input
if (windowSize < 1)
throw new IllegalArgumentException("window size must be non-zero");
// This gatherer is inherently order-dependent,
// so it should not be parallelized
return Gatherer.ofSequential(
// The initializer creates an ArrayList which holds the current
// open window
() -> new ArrayList<TR>(windowSize),
// The integrator is invoked for each element consumed
Gatherer.Integrator.ofGreedy((window, element, downstream) -> {
// Add the element to the current open window
window.add(element);
// Until we reach our desired window size,
// return true to signal that more elements are desired
if (window.size() < windowSize)
return true;
// When window is full, close it by creating a copy
var result = new ArrayList<TR>(window);
// Clear the window so the next can be started
window.clear();
// Send the closed window downstream
return downstream.push(result);
}),
// The combiner is omitted since this operation is intrinsically sequential,
// and thus cannot be parallelized
// The finisher runs when there are no more elements to pass from the upstream
(window, downstream) -> {
// If the downstream still accepts more elements and the current
// open window is non-empty then send a copy of it downstream
if(!downstream.isRejecting() && !window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
}
);
}
使用示例:
jshell> Stream.of(1,2,3,4,5,6,7,8,9).gather(fixedWindow(3)).toList()
$1 ==> [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
示例:可并行化的收集器
在并行流中使用时,仅当收集器提供组合器函数时,才会并行评估收集器。例如,此可并行化的收集器根据提供的选择器函数最多发出一个元素:
static <TR> Gatherer<TR, ?, TR> selectOne(BinaryOperator<TR> selector) {
// Validate input
Objects.requireNonNull(selector, "selector must not be null");
// Private state to track information across elements
class State {
TR value; // The current best value
boolean hasValue; // true when value holds a valid value
}
// Use the `of` factory method to construct a gatherer given a set
// of functions for `initializer`, `integrator`, `combiner`, and `finisher`
return Gatherer.of(
// The initializer creates a new State instance
State::new,
// The integrator; in this case we use `ofGreedy` to signal
// that this integerator will never short-circuit
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
if (!state.hasValue) {
// The first element, just save it
state.value = element;
state.hasValue = true;
} else {
// Select which value of the two to save, and save it
state.value = selector.apply(state.value, element);
}
return true;
}),
// The combiner, used during parallel evaluation
(leftState, rightState) -> {
if (!leftState.hasValue) {
// If no value on the left, return the right
return rightState;
} else if (!rightState.hasValue) {
// If no value on the right, return the left
return leftState;
} else {
// If both sides have values, select one of them to keep
// and store it in the leftState, as that will be returned
leftState.value = selector.apply(leftState.value,
rightState.value);
return leftState;
}
},
// The finisher
(state, downstream) -> {
// Emit the selected value, if there is one, downstream
if (state.hasValue)
downstream.push(state.value);
}
);
}
在随机整数流上的示例用法:
jshell> Stream.generate(() -> ThreadLocalRandom.current().nextInt())
.limit(1000) // Take the first 1000 elements
.gather(selectOne(Math::max)) // Select the largest value seen
.parallel() // Execute in parallel
.findFirst() // Extract the largest value
$1 ==> Optional[99822]
替代方案
我们在单独的设计文档中探讨了替代方案。
风险和假设
-
使用自定义收集器以及类中声明的内置收集器
Gatherers
不会像使用类中声明的内置中间操作那样简洁Stream
。但是,自定义收集器的定义在复杂性上与终端collect
操作的自定义收集器的定义相似。此外,使用自定义和内置收集器在复杂性上与使用自定义收集器和类中声明的内置收集器相似Collectors
。 -
我们可能会在预览此功能的过程中修改内置收集器集,并且我们可能会在未来的版本中修改内置收集器集。
-
我们不会
Stream
为类中定义的每个内置收集器添加新的中间操作Gatherers
,尽管为了统一起见,这样做很诱人。为了保持类的易学性,Stream
我们只会在经验表明它们广泛有用后才考虑向其中添加新的中间操作。我们可能会在以后的预览轮中添加此类方法,甚至在此功能最终确定后。现在公开新的内置收集器并不妨碍以后添加专用Stream
方法。