JEP 473: 流收集器(第二预览版)
总结
增强 Stream API,以支持自定义的中间操作。这将允许流管道以现有内置中间操作不易实现的方式转换数据。这是一个 预览 API。
历史
我们在 JEP 461 中提出了流收集器(Stream Gatherers)作为预览功能,并在 JDK 22 中交付了该功能。现在我们提议在 JDK 23 中重新预览此 API,且不做任何更改,以获取更多的经验与反馈。
目标
-
使流管道更灵活、更富有表现力。
-
只要可能,允许自定义中间操作来处理无限大小的流。
非目标
-
目标并非是改变 Java 编程语言以更好地促进流处理。
-
目标并非是对使用 Stream API 的代码进行特殊编译。
动机
Java 8 引入了第一个专门为 lambda 表达式设计的 API:Stream API,java.util.stream
。流是一个惰性计算的、可能无界的值序列。该 API 支持以顺序或并行方式处理流的能力。
一个 流管道 由三部分组成:元素的来源、任意数量的中间操作和一个终端操作。例如:
long numberOfWords =
Stream.of("the", "", "fox", "jumps", "over", "the", "", "dog") // (1)
.filter(Predicate.not(String::isEmpty)) // (2)
.collect(Collectors.counting()); // (3)
这种编程风格既富有表现力又高效。使用构建器风格的 API,每个中间操作都会返回一个新的流;只有在调用终端操作时才会开始评估。在此示例中,第 (1) 行创建了一个流,但并未对其进行评估,第 (2) 行设置了一个中间的 filter
操作,但仍未评估该流,最后,第 (3) 行上的终端 collect
操作评估了整个流管道。
Stream API 提供了一组相当丰富(尽管是固定的)的中间操作和终端操作:映射、过滤、归约、排序等等。它还包括一个可扩展的终端操作 Stream::collect
,该操作能够以多种方式汇总管道的输出。
在 Java 生态系统中,流的使用现在已经非常普遍,并且非常适合许多任务,但中间操作的固定集合意味着某些复杂任务无法轻松地表示为流管道。要么是所需的中间操作不存在,要么是存在但不直接支持该任务。
例如,假设任务是获取字符串流并使其具有唯一性,但这种唯一性是基于字符串长度而不是内容。也就是说,最多应输出一个长度为 1 的字符串,最多一个长度为 2 的字符串,最多一个长度为 3 的字符串,依此类推。理想情况下,代码看起来应该像这样:
var result = Stream.of("foo", "bar", "baz", "quux")
.distinctBy(String::length) // Hypothetical
.toList();
// result ==> [foo, quux]
遗憾的是,distinctBy
并不是一个内置的中间操作。最接近的内置操作 distinct
会通过对象相等性来比较元素,从而跟踪它已经见过的元素。也就是说,distinct
是有状态的,但在这种情况下使用了错误的状态:我们希望它根据字符串长度的相等性而不是字符串内容来跟踪元素。我们可以通过声明一个类来解决这个限制,该类根据字符串长度定义对象相等性,将每个字符串包装在该类的一个实例中,并对这些实例应用 distinct
。然而,这种任务表达方式并不直观,而且会导致代码难以维护:
record DistinctByLength(String str) {
@Override public boolean equals(Object obj) {
return obj instanceof DistinctByLength(String other)
&& str.length() == other.length();
}
@Override public int hashCode() {
return str == null ? 0 : Integer.hashCode(str.length());
}
}
var result = Stream.of("foo", "bar", "baz", "quux")
.map(DistinctByLength::new)
.distinct()
.map(DistinctByLength::str)
.toList();
// result ==> [foo, quux]
再比如,假设任务是将元素分组为固定大小的三元组,但只保留前两组:[0, 1, 2, 3, 4, 5, 6, ...]
应该生成 [[0, 1, 2], [3, 4, 5]]
。理想情况下,代码看起来应该像这样:
var result = Stream.iterate(0, i -> i + 1)
.windowFixed(3) // Hypothetical
.limit(2)
.toList();
// result ==> [[0, 1, 2], [3, 4, 5]]
不幸的是,没有内置的中间操作支持此任务。最好的选择是通过使用自定义的 Collector
调用 collect
,将固定窗口分组逻辑放入终端操作中。然而,我们必须在 collect
操作之前加上一个固定大小的 limit
操作,因为收集器无法在新元素不断出现时向 collect
发出完成信号 —— 这在无限流中会永远发生。此外,该任务本质上与有序数据相关,因此让收集器并行执行分组是不可行的,并且如果调用了它的组合器(combiner),它必须通过抛出异常来表明这一事实。最终的代码难以理解:
var result
= Stream.iterate(0, i -> i + 1)
.limit(3 * 2)
.collect(Collector.of(
() -> new ArrayList<ArrayList<Integer>>(),
(groups, element) -> {
if (groups.isEmpty() || groups.getLast().size() == 3) {
var current = new ArrayList<Integer>();
current.add(element);
groups.addLast(current);
} else {
groups.getLast().add(element);
}
},
(left, right) -> {
throw new UnsupportedOperationException("Cannot be parallelized");
}
));
// result ==> [[0, 1, 2], [3, 4, 5]]
多年来,人们为 Stream API 提出了许多新的中间操作。单独考虑时,其中大多数操作是有意义的,但如果全部添加进去,就会使(已经很庞大的)Stream API 变得更难学习,因为其操作的可发现性会降低。
Stream API 的设计者们明白,拥有一个扩展点 是很有意义的,这样任何人都可以定义中间流操作。然而,当时他们并不知道这个扩展点应该是什么样子。最终人们发现,终端操作的扩展点,即 Stream::collect(Collector)
是有效的。现在,我们可以对中间操作采取类似的方法。
总之,更多的中间操作创造了更多的情境价值,使流更适合更多任务。我们应该提供一个用于自定义中间操作的 API,允许开发人员以他们偏好的方式转换有限和无限流。
描述
Stream::gather(Gatherer)
是一个新的中间流操作,它通过应用一个称为 gatherer 的用户定义实体来处理流的元素。通过 gather
操作,我们可以构建高效、支持并行的流,以实现几乎任何中间操作。Stream::gather(Gatherer)
之于中间操作,正如 Stream::collect(Collector)
之于终端操作。
收集器 表示对流元素的一种转换;它是 java.util.stream.Gatherer
接口的一个实例。收集器可以以一对一、一对多、多对一或多对多的方式转换元素。它们可以跟踪先前见过的元素,以影响后续元素的转换,它们可以短路以将无限流转换为有限流,并且可以启用并行执行。例如,一个收集器可以在某个条件变为真之前将一个输入元素转换为一个输出元素,此时它开始将一个输入元素转换为两个输出元素。
收集器由四个协同工作的函数定义:
-
可选的 initializer 函数提供了一个在处理流元素时维护私有状态的对象。例如,一个收集器可以存储当前元素,这样当下次应用时,它可以将新元素与前一个元素进行比较,并仅发出较大的那个。实际上,这样的收集器将两个输入元素转换为一个输出元素。
-
integrator 函数从输入流中整合一个新的元素,可能会检查私有状态对象,并可能向输出流发出元素。它还可以在到达输入流的末尾之前终止处理;例如,一个在整数流中寻找最大值的收集器如果检测到
Integer.MAX_VALUE
,就可以终止。 -
可选的 combiner 函数可以在输入流被标记为并行时用于并行评估收集器。如果收集器不支持并行能力,那么它仍然可以作为并行流管道的一部分,但会按顺序进行评估。这在某些操作本质上是有序的情况下非常有用,因此无法并行化。
-
可选的 finisher 函数在没有更多输入元素可供消费时被调用。此函数可以检查私有状态对象,并可能发出额外的输出元素。例如,一个在其输入元素中搜索特定元素的收集器可以在其 finisher 被调用时报告失败,比如通过抛出异常。
当调用 Stream::gather
时,它执行的步骤相当于以下内容:
-
创建一个
Downstream
对象,当给定收集器输出类型的一个元素时,将其传递到流水线的下一阶段。 -
通过调用其 初始化器 的
get()
方法,获取收集器的私有状态对象。 -
通过调用其
integrator()
方法,获取收集器的 整合器。 -
当还有更多输入元素时,调用整合器的 integrate(...) 方法,并传入状态对象、下一个元素和下游对象。如果该方法返回
false
,则终止操作。 -
获取收集器的 完成器,并使用状态对象和下游对象调用它。
在 Stream
接口中声明的每个现有的中间操作,都可以通过使用实现了该操作的收集器调用 gather
来实现。例如,给定一个 T
类型元素的流,Stream::map
通过应用一个函数将每个 T
元素转换为 U
元素,然后将 U
元素传递到下游;这仅仅是一个无状态的一对一收集器。再比如,Stream::filter
使用一个谓词来确定输入元素是否应被传递到下游;这仅仅是一个无状态的一对多收集器。事实上,每个流管道在概念上都等价于:
source.gather(...).gather(...).gather(...).collect(...)
内置收集器
我们在 java.util.stream.Gatherers 类中引入了以下内置的收集器:
-
fold
是一个有状态的多对一收集器,它逐步构建聚合,并在没有更多输入元素时发出该聚合。 -
mapConcurrent
是一个有状态的一对一收集器,它为每个输入元素并发调用提供的函数,直到达到提供的限制。 -
scan
是一个有状态的一对一收集器,它将提供的函数应用于当前状态和当前元素以生成下一个元素,并将其传递到下游。 -
windowFixed
是一个有状态的多对多收集器,它将输入元素分组为指定大小的列表,并在窗口满时将其发送到下游。 -
windowSliding
是一个有状态的多对多收集器,它将输入元素分组为指定大小的列表。第一个窗口之后,每个后续窗口都是通过删除第一个元素并追加来自输入流的下一个元素从其前一个窗口的副本创建的。
并行评估
收集器的并行评估分为两种不同的模式。当未提供合并器时,流库仍然可以通过并行执行上游和下游操作来提取并行性,类似于可短路的 parallel().forEachOrdered()
操作。当提供了合并器时,平行评估类似于可短路的 parallel().reduce()
操作。
组合收集器
收集器通过 andThen(Gatherer)
方法支持组合,该方法将两个收集器连接起来,其中第一个收集器生成的元素可以被第二个收集器消费。这使得可以通过组合较简单的收集器来创建复杂的收集器,就像 函数组合 一样。从语义上来说,
source.gather(a).gather(b).gather(c).collect(...)
等价于
source.gather(a.andThen(b).andThen(c)).collect(...)
采集者与收集者
Gatherer
接口的设计深受 Collector
设计的影响。主要区别在于:
-
Gatherer
使用Integrator
而不是BiConsumer
来进行每个元素的处理,因为它需要一个额外的输入参数用于Downstream
对象,并且需要返回一个boolean
值以指示是否应继续处理。 -
Gatherer
使用BiConsumer
作为其完成器而不是Function
,因为它需要一个额外的输入参数用于其Downstream
对象,并且无法返回结果,因此是void
。
示例:拥抱流
有时,由于缺少适当的中间操作,我们被迫将流计算为列表,并在循环中运行分析逻辑。例如,假设我们有一个时间顺序排列的温度读数流:
record Reading(Instant obtainedAt, int kelvins) {
Reading(String time, int kelvins) {
this(Instant.parse(time), kelvins);
}
static Stream<Reading> loadRecentReadings() {
// In reality these could be read from a file, a database,
// a service, or otherwise
return Stream.of(
new Reading("2023-09-21T10:15:30.00Z", 310),
new Reading("2023-09-21T10:15:31.00Z", 312),
new Reading("2023-09-21T10:15:32.00Z", 350),
new Reading("2023-09-21T10:15:33.00Z", 310)
);
}
}
进一步假设,我们希望检测此流中的可疑变化,定义为在 5 秒时间窗口内连续两次读数之间的温度变化超过 30° 开尔文:
boolean isSuspicious(Reading previous, Reading next) {
return next.obtainedAt().isBefore(previous.obtainedAt().plusSeconds(5))
&& (next.kelvins() > previous.kelvins() + 30
|| next.kelvins() < previous.kelvins() - 30);
}
这需要对输入流进行顺序扫描,因此我们必须避免使用声明式流处理,而是命令式地实现我们的分析:
List<List<Reading>> findSuspicious(Stream<Reading> source) {
var suspicious = new ArrayList<List<Reading>>();
Reading previous = null;
boolean hasPrevious = false;
for (Reading next : source.toList()) {
if (!hasPrevious) {
hasPrevious = true;
previous = next;
} else {
if (isSuspicious(previous, next))
suspicious.add(List.of(previous, next));
previous = next;
}
}
return suspicious;
}
var result = findSuspicious(Reading.loadRecentReadings());
// result ==> [[Reading[obtainedAt=2023-09-21T10:15:31Z, kelvins=312],
// Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350]],
// [Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350],
// Reading[obtainedAt=2023-09-21T10:15:33Z, kelvins=310]]]
然而,使用收集器,我们可以更简洁地表达这一点:
List<List<Reading>> findSuspicious(Stream<Reading> source) {
return source.gather(Gatherers.windowSliding(2))
.filter(window -> (window.size() == 2
&& isSuspicious(window.get(0),
window.get(1))))
.toList();
}
示例:定义收集器
在 Gatherers
类中声明的 windowFixed
聚合器可以作为 Gatherer
接口的直接实现来编写:
record WindowFixed<TR>(int windowSize)
implements Gatherer<TR, ArrayList<TR>, List<TR>>
{
public WindowFixed {
// Validate input
if (windowSize < 1)
throw new IllegalArgumentException("window size must be positive");
}
@Override
public Supplier<ArrayList<TR>> initializer() {
// Create an ArrayList to hold the current open window
return () -> new ArrayList<>(windowSize);
}
@Override
public Integrator<ArrayList<TR>, TR, List<TR>> integrator() {
// The integrator is invoked for each element consumed
return Gatherer.Integrator.ofGreedy((window, element, downstream) -> {
// Add the element to the current open window
window.add(element);
// Until we reach our desired window size,
// return true to signal that more elements are desired
if (window.size() < windowSize)
return true;
// When the window is full, close it by creating a copy
var result = new ArrayList<TR>(window);
// Clear the window so the next can be started
window.clear();
// Send the closed window downstream
return downstream.push(result);
});
}
// The combiner is omitted since this operation is intrinsically sequential,
// and thus cannot be parallelized
@Override
public BiConsumer<ArrayList<TR>, Downstream<? super List<TR>>> finisher() {
// The finisher runs when there are no more elements to pass from
// the upstream
return (window, downstream) -> {
// If the downstream still accepts more elements and the current
// open window is non-empty, then send a copy of it downstream
if(!downstream.isRejecting() && !window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
};
}
}
用法示例:
jshell> Stream.of(1,2,3,4,5,6,7,8,9).gather(new WindowFixed(3)).toList()
$1 ==> [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
示例:一个特别的收集器
windowFixed
gatherer 也可以通过 Gatherer.ofSequential(...)
工厂方法以即席的方式编写:
/**
* Gathers elements into fixed-size groups. The last group may contain fewer
* elements.
* @param windowSize the maximum size of the groups
* @return a new gatherer which groups elements into fixed-size groups
* @param <TR> the type of elements the returned gatherer consumes and produces
*/
static <TR> Gatherer<TR, ?, List<TR>> fixedWindow(int windowSize) {
// Validate input
if (windowSize < 1)
throw new IllegalArgumentException("window size must be non-zero");
// This gatherer is inherently order-dependent,
// so it should not be parallelized
return Gatherer.ofSequential(
// The initializer creates an ArrayList which holds the current
// open window
() -> new ArrayList<TR>(windowSize),
// The integrator is invoked for each element consumed
Gatherer.Integrator.ofGreedy((window, element, downstream) -> {
// Add the element to the current open window
window.add(element);
// Until we reach our desired window size,
// return true to signal that more elements are desired
if (window.size() < windowSize)
return true;
// When window is full, close it by creating a copy
var result = new ArrayList<TR>(window);
// Clear the window so the next can be started
window.clear();
// Send the closed window downstream
return downstream.push(result);
}),
// The combiner is omitted since this operation is intrinsically sequential,
// and thus cannot be parallelized
// The finisher runs when there are no more elements to pass from the upstream
(window, downstream) -> {
// If the downstream still accepts more elements and the current
// open window is non-empty then send a copy of it downstream
if(!downstream.isRejecting() && !window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
}
);
}
用法示例:
jshell> Stream.of(1,2,3,4,5,6,7,8,9).gather(fixedWindow(3)).toList()
$1 ==> [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
示例:可并行化的收集器
当在并行流中使用时,收集器只有在提供合并函数时才会并行求值。例如,这个可并行化的收集器根据提供的选择器函数最多发出一个元素:
static <TR> Gatherer<TR, ?, TR> selectOne(BinaryOperator<TR> selector) {
// Validate input
Objects.requireNonNull(selector, "selector must not be null");
// Private state to track information across elements
class State {
TR value; // The current best value
boolean hasValue; // true when value holds a valid value
}
// Use the `of` factory method to construct a gatherer given a set
// of functions for `initializer`, `integrator`, `combiner`, and `finisher`
return Gatherer.of(
// The initializer creates a new State instance
State::new,
// The integrator; in this case we use `ofGreedy` to signal
// that this integerator will never short-circuit
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
if (!state.hasValue) {
// The first element, just save it
state.value = element;
state.hasValue = true;
} else {
// Select which value of the two to save, and save it
state.value = selector.apply(state.value, element);
}
return true;
}),
// The combiner, used during parallel evaluation
(leftState, rightState) -> {
if (!leftState.hasValue) {
// If no value on the left, return the right
return rightState;
} else if (!rightState.hasValue) {
// If no value on the right, return the left
return leftState;
} else {
// If both sides have values, select one of them to keep
// and store it in the leftState, as that will be returned
leftState.value = selector.apply(leftState.value,
rightState.value);
return leftState;
}
},
// The finisher
(state, downstream) -> {
// Emit the selected value, if there is one, downstream
if (state.hasValue)
downstream.push(state.value);
}
);
}
用法示例,针对随机整数流:
jshell> Stream.generate(() -> ThreadLocalRandom.current().nextInt())
.limit(1000) // Take the first 1000 elements
.gather(selectOne(Math::max)) // Select the largest value seen
.parallel() // Execute in parallel
.findFirst() // Extract the largest value
$1 ==> Optional[99822]
替代方案
我们在单独的设计文档中探讨了替代方案。
风险与假设
-
使用自定义收集器以及在
Gatherers
类中声明的内置收集器,不会像使用在Stream
类中声明的内置中间操作那样简洁。然而,自定义收集器的定义复杂度将类似于为终端collect
操作定义自定义收集器的复杂度。此外,使用自定义和内置收集器的复杂度也将类似于使用自定义收集器和在Collectors
类中声明的内置收集器的复杂度。 -
在预览此功能的过程中,我们可能会修改内置收集器的集合,并且在未来版本中也可能会对内置收集器进行修改。
-
尽管为了保持一致性,很想为
Gatherers
类中定义的每个内置收集器在Stream
类中添加一个新的中间操作,但我们不会这样做。为了保持Stream
类的易学性,我们将仅在经验表明某些方法具有广泛适用性时,才考虑向其添加新的中间操作。我们可能会在后续的预览阶段,甚至在此功能最终确定后,再添加这样的方法。现在公开新的内置收集器并不排除以后添加专用的Stream
方法。