跳到主要内容

JEP 485: Stream 收集器

QWen Max 中英对照 JEP 485: Stream Gatherers

概述

增强 Stream API 以支持自定义中间操作。这将允许流管道以现有内置中间操作难以实现的方式转换数据。

历史

Stream Gatherers 在 JDK 22 中由 JEP 461 作为预览功能提出,并在 JDK 23 中由 JEP 473 再次预览。我们在此提议在 JDK 24 中最终确定该 API,不做任何更改。

目标

  • 使流管道更灵活和富有表现力。

  • 在可能的情况下,允许自定义中间操作处理无限大小的流。

非目标

  • 不以更改 Java 编程语言来更好地促进流处理为目标。

  • 不以特例化使用 Stream API 的代码的编译为目标。

动机

Java 8 引入了第一个专门为 lambda 表达式设计的 API:Stream API,java.util.stream。流是一个延迟计算的、潜在无限的值序列。该 API 支持以顺序或并行方式处理流的能力。

流管道由三部分组成:元素源、任意数量的中间操作和一个终端操作。例如:

long numberOfWords =
Stream.of("the", "", "fox", "jumps", "over", "the", "", "dog") // (1)
.filter(Predicate.not(String::isEmpty)) // (2)
.collect(Collectors.counting()); // (3)

这种编程风格既表达清晰又高效。使用构建器风格的 API,每个中间操作都会返回一个新的流;只有当终端操作被调用时,才会开始计算。在这个例子中,第 (1) 行创建了一个流,但并没有计算它,第 (2) 行设置了一个中间的 filter 操作,但仍然没有计算流,最后在第 (3) 行的终端 collect 操作会计算整个流管道。

流 API 提供了一组相当丰富但固定的中间操作和终端操作:映射、过滤、归约、排序等等。它还包括一个可扩展的终端操作 Stream::collect,该操作能够以多种方式对流水线的输出进行汇总。

在 Java 生态系统中,流的使用现在已经非常普遍,并且非常适合许多任务,但是固定的中间操作集意味着一些复杂任务不能轻易地表示为流管道。要么是所需的中间操作不存在,要么是存在但不直接支持该任务。

例如,假设任务是获取字符串流并使其唯一,但唯一性基于字符串长度而不是内容。也就是说,最多应发出一个长度为 1 的字符串,最多应发出一个长度为 2 的字符串,最多应发出一个长度为 3 的字符串,依此类推。理想情况下,代码看起来像这样:

var result = Stream.of("foo", "bar", "baz", "quux")
.distinctBy(String::length) // Hypothetical
.toList();

// result ==> [foo, quux]

不幸的是,distinctBy 并不是内置的中间操作。最接近的内置操作是 distinct,它通过使用对象相等性来比较已经看到的元素来进行跟踪。也就是说,distinct 是有状态的,但在这个情况下使用了错误的状态:我们希望它根据字符串长度的相等性来跟踪元素,而不是字符串内容的相等性。我们可以通过声明一个类来解决这个限制,该类将对象相等性定义为字符串长度,将每个字符串包装在该类的一个实例中,并对这些实例应用 distinct。然而,这种任务表达方式并不直观,并且使得代码难以维护:

record DistinctByLength(String str) {

@Override public boolean equals(Object obj) {
return obj instanceof DistinctByLength(String other)
&& str.length() == other.length();
}

@Override public int hashCode() {
return str == null ? 0 : Integer.hashCode(str.length());
}

}

var result = Stream.of("foo", "bar", "baz", "quux")
.map(DistinctByLength::new)
.distinct()
.map(DistinctByLength::str)
.toList();

// result ==> [foo, quux]

作为另一个示例,假设任务是将元素分组为固定大小的三个一组,但只保留前两组:[0, 1, 2, 3, 4, 5, 6, ...] 应该生成 [[0, 1, 2], [3, 4, 5]]。理想情况下,代码应该如下所示:

var result = Stream.iterate(0, i -> i + 1)
.windowFixed(3) // Hypothetical
.limit(2)
.toList();

// result ==> [[0, 1, 2], [3, 4, 5]]

不幸的是,没有内置的中间操作支持此任务。最好的选择是通过使用自定义的 Collector 调用 collect 方法,将固定窗口分组逻辑放在终端操作中。然而,我们必须在 collect 操作之前加上一个固定大小的 limit 操作,因为当新元素不断出现时(无限流的情况),收集器无法向 collect 信号表示它已经完成。此外,该任务本质上是关于有序数据的,因此让收集器并行执行分组是不可行的,并且如果调用其组合器,必须通过抛出异常来表示这一点。最终的代码难以理解:

var result
= Stream.iterate(0, i -> i + 1)
.limit(3 * 2)
.collect(Collector.of(
() -> new ArrayList<ArrayList<Integer>>(),
(groups, element) -> {
if (groups.isEmpty() || groups.getLast().size() == 3) {
var current = new ArrayList<Integer>();
current.add(element);
groups.addLast(current);
} else {
groups.getLast().add(element);
}
},
(left, right) -> {
throw new UnsupportedOperationException("Cannot be parallelized");
}
));

// result ==> [[0, 1, 2], [3, 4, 5]]

多年来,人们为 Stream API 提出了许多新的中间操作。单独来看,其中大多数都是有意义的,但将所有这些操作都添加进去会使(已经很庞大的)Stream API 更加难以学习,因为它的操作会变得更难发现。

Stream API 的设计者们明白,拥有一个扩展点 是可取的,这样任何人都可以定义中间流操作。然而,在当时他们还不清楚这个扩展点应该是什么样子。最终,终端操作的扩展点,即 Stream::collect(Collector),被证明是有效的。现在我们可以对中间操作采取类似的方法。

总之,更多的中间操作创造了更多的场景价值,使流更适合处理更多的任务。我们应该提供一个用于自定义中间操作的 API,允许开发人员以他们喜欢的方式转换有限和无限的流。

描述

Stream::gather(Gatherer) 是一种新的流中间操作,它通过应用一个称为 gatherer 的用户定义实体来处理流中的元素。通过 gather 操作,我们可以构建高效的、支持并行的流,实现几乎任何中间操作。Stream::gather(Gatherer) 对于中间操作来说,就像 Stream::collect(Collector) 对于终端操作一样。

收集器表示流中元素的转换;它是 java.util.stream.Gatherer 接口的一个实例。收集器可以以一对一、一对多、多对一或 多对多的方式转换元素。它们可以跟踪先前看到的元素,以便影响后续元素的转换,它们可以短路以将无限流转换为有限流,并且它们可以启用并行执行。例如,收集器可以将一个输入元素转换为一个输出元素,直到某个条件变为真,此时它开始将一个输入元素转换为两个输出元素。

收集器由四个协同工作的函数定义:

  • 可选的 initializer 函数提供了一个在处理流元素时维护私有状态的对象。例如,收集器可以存储当前元素,以便下次应用时,它可以将新元素与现在的前一个元素进行比较,并且只输出较大的那个。实际上,这样的收集器将两个输入元素转换为一个输出元素。

  • integrator 函数整合来自输入流的新元素,可能会检查私有状态对象,并可能向输出流发射元素。它还可以在到达输入流末尾之前终止处理;例如,搜索整数流中最大值的收集器如果检测到 Integer.MAX_VALUE 就可以终止。

  • 可选的 combiner 函数可以在输入流被标记为并行时用于并行评估收集器。如果收集器不具备并行能力,则它仍然可以是并行流管道的一部分,但会被顺序评估。这对于某些本质上具有顺序性的操作是有用的,因此不能并行化。

  • 可选的 finisher 函数在没有更多的输入元素要消耗时被调用。该函数可以检查私有状态对象,并可能发射额外的输出元素。例如,在其输入元素中搜索特定元素的收集器在其完成器被调用时可以通过抛出异常来报告失败。

当被调用时,Stream::gather 执行相当于以下步骤的操作:

  • 创建一个Downstream对象,当给定收集器的输出类型的一个元素时,将其传递到管道的下一个阶段。

  • 通过调用其初始化器get() 方法来获取收集器的私有状态对象。

  • 通过调用其 integrator() 方法来获取收集器的集成器

  • 当有更多的输入元素时,调用集成器的 integrate(...) 方法,将状态对象、下一个元素和下游对象传递给它。如果该方法返回 false,则终止。

  • 获取收集器的完成器并使用状态对象和下游对象调用它。

Stream 接口中声明的每一个现有的中间操作都可以通过使用实现该操作的收集器来调用 gather 来实现。例如,给定一个 T 类型元素的流,Stream::map 通过对每个 T 元素应用一个函数将其转换为 U 元素,然后将 U 元素传递到下游;这只是一个无状态的一对一收集器。另一个例子是,Stream::filter 使用一个谓词来确定输入元素是否应该传递到下游;这只是一个无状态的一对多收集器。事实上,从概念上讲,每一个流管道都等价于

source.gather(...).gather(...).gather(...).collect(...)

内置收集器

我们在java.util.stream.Gatherers类中引入了以下内置收集器:

  • fold 是一个有状态的多对一收集器,它逐步构建一个聚合,并在没有更多输入元素时发出该聚合。

  • mapConcurrent 是一个有状态的一对一收集器,它为每个输入元素并行调用提供的函数,最多达到提供的限制。

  • scan 是一个有状态的一对一收集器,它将提供的函数应用于当前状态和当前元素以生成下一个元素,并将其传递给下游。

  • windowFixed 是一个有状态的多对多收集器,它将输入元素分组为指定大小的列表,并在窗口满时向下游发出这些窗口。

  • windowSliding 是一个有状态的多对多收集器,它将输入元素分组为指定大小的列表。在第一个窗口之后,每个后续窗口都是通过复制其前驱窗口并丢弃第一个元素以及追加输入流中的下一个元素来创建的。

并行计算

收集器的并行评估分为两种不同的模式。当未提供组合器时,流库仍然可以通过并行执行上游和下游操作来提取并行性,这类似于一个可短路的 parallel().forEachOrdered() 操作。当提供了组合器时,并行评估类似于一个可短路的 parallel().reduce() 操作。

组合收集器

收集器通过 andThen(Gatherer) 方法支持组合,该方法将两个收集器连接在一起,其中第一个收集器生成的元素可以被第二个收集器消费。这使得可以通过组合更简单的收集器来创建复杂的收集器,就像函数组合 一样。从语义上讲,

source.gather(a).gather(b).gather(c).collect(...)

等同于

source.gather(a.andThen(b).andThen(c)).collect(...)

采集者与收集者

Gatherer 接口的设计很大程度上受到了 Collector 设计的影响。主要的区别在于:

  • Gatherer 使用 Integrator 而不是 BiConsumer 来处理每个元素,因为它需要一个额外的输入参数用于 Downstream 对象,并且因为它需要返回一个 boolean 值来指示是否应继续处理。

  • Gatherer 使用 BiConsumer 作为其终结器,而不是使用 Function,因为它需要一个额外的输入参数用于其 Downstream 对象,并且因为它不能返回结果,因此是 void

示例:拥抱流

有时,由于缺少合适的中间操作,我们不得不将一个流评估为列表,并在循环中运行我们的分析逻辑。例如,假设我们有一个按时间顺序排列的温度读数流:

record Reading(Instant obtainedAt, int kelvins) {

Reading(String time, int kelvins) {
this(Instant.parse(time), kelvins);
}

static Stream<Reading> loadRecentReadings() {
// In reality these could be read from a file, a database,
// a service, or otherwise
return Stream.of(
new Reading("2023-09-21T10:15:30.00Z", 310),
new Reading("2023-09-21T10:15:31.00Z", 312),
new Reading("2023-09-21T10:15:32.00Z", 350),
new Reading("2023-09-21T10:15:33.00Z", 310)
);
}

}

进一步假设,我们想要检测此数据流中的可疑变化,定义为在五秒时间窗口内连续两次读数之间的温度变化超过 30 开尔文:

boolean isSuspicious(Reading previous, Reading next) {
return next.obtainedAt().isBefore(previous.obtainedAt().plusSeconds(5))
&& (next.kelvins() > previous.kelvins() + 30
|| next.kelvins() < previous.kelvins() - 30);
}

这需要对输入流进行顺序扫描,因此我们必须避免使用声明式流处理,而是通过命令式方法来实现我们的分析:

List<List<Reading>> findSuspicious(Stream<Reading> source) {
var suspicious = new ArrayList<List<Reading>>();
Reading previous = null;
boolean hasPrevious = false;
for (Reading next : source.toList()) {
if (!hasPrevious) {
hasPrevious = true;
previous = next;
} else {
if (isSuspicious(previous, next))
suspicious.add(List.of(previous, next));
previous = next;
}
}
return suspicious;
}

var result = findSuspicious(Reading.loadRecentReadings());

// result ==> [[Reading[obtainedAt=2023-09-21T10:15:31Z, kelvins=312],
// Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350]],
// [Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350],
// Reading[obtainedAt=2023-09-21T10:15:33Z, kelvins=310]]]

然而,使用收集器,我们可以更简洁地表达这一点:

List<List<Reading>> findSuspicious(Stream<Reading> source) {
return source.gather(Gatherers.windowSliding(2))
.filter(window -> (window.size() == 2
&& isSuspicious(window.get(0),
window.get(1))))
.toList();
}

示例:定义收集器

Gatherers 类中声明的 windowFixed 收集器可以被写成 Gatherer 接口的直接实现:

record WindowFixed<TR>(int windowSize)
implements Gatherer<TR, ArrayList<TR>, List<TR>>
{

public WindowFixed {
// Validate input
if (windowSize < 1)
throw new IllegalArgumentException("window size must be positive");
}

@Override
public Supplier<ArrayList<TR>> initializer() {
// Create an ArrayList to hold the current open window
return () -> new ArrayList<>(windowSize);
}

@Override
public Integrator<ArrayList<TR>, TR, List<TR>> integrator() {
// The integrator is invoked for each element consumed
return Gatherer.Integrator.ofGreedy((window, element, downstream) -> {

// Add the element to the current open window
window.add(element);

// Until we reach our desired window size,
// return true to signal that more elements are desired
if (window.size() < windowSize)
return true;

// When the window is full, close it by creating a copy
var result = new ArrayList<TR>(window);

// Clear the window so the next can be started
window.clear();

// Send the closed window downstream
return downstream.push(result);

});
}

// The combiner is omitted since this operation is intrinsically sequential,
// and thus cannot be parallelized

@Override
public BiConsumer<ArrayList<TR>, Downstream<? super List<TR>>> finisher() {
// The finisher runs when there are no more elements to pass from
// the upstream
return (window, downstream) -> {
// If the downstream still accepts more elements and the current
// open window is non-empty, then send a copy of it downstream
if(!downstream.isRejecting() && !window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
};
}
}

示例用法:

jshell> Stream.of(1,2,3,4,5,6,7,8,9).gather(new WindowFixed(3)).toList()
$1 ==> [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

示例:一个临时收集器

可以通过 Gatherer.ofSequential(...) 工厂方法以特设方式编写 windowFixed 收集器:

/**
* Gathers elements into fixed-size groups. The last group may contain fewer
* elements.
* @param windowSize the maximum size of the groups
* @return a new gatherer which groups elements into fixed-size groups
* @param <TR> the type of elements the returned gatherer consumes and produces
*/
static <TR> Gatherer<TR, ?, List<TR>> fixedWindow(int windowSize) {

// Validate input
if (windowSize < 1)
throw new IllegalArgumentException("window size must be non-zero");

// This gatherer is inherently order-dependent,
// so it should not be parallelized
return Gatherer.ofSequential(

// The initializer creates an ArrayList which holds the current
// open window
() -> new ArrayList<TR>(windowSize),

// The integrator is invoked for each element consumed
Gatherer.Integrator.ofGreedy((window, element, downstream) -> {

// Add the element to the current open window
window.add(element);

// Until we reach our desired window size,
// return true to signal that more elements are desired
if (window.size() < windowSize)
return true;

// When window is full, close it by creating a copy
var result = new ArrayList<TR>(window);

// Clear the window so the next can be started
window.clear();

// Send the closed window downstream
return downstream.push(result);

}),

// The combiner is omitted since this operation is intrinsically sequential,
// and thus cannot be parallelized

// The finisher runs when there are no more elements to pass from the upstream
(window, downstream) -> {
// If the downstream still accepts more elements and the current
// open window is non-empty then send a copy of it downstream
if(!downstream.isRejecting() && !window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
}

);
}

示例用法:

jshell> Stream.of(1,2,3,4,5,6,7,8,9).gather(fixedWindow(3)).toList()
$1 ==> [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

示例:可并行化的收集器

当在并行流中使用时,只有提供了合并函数的收集器才会被并行评估。例如,这个可并行化的收集器根据提供的选择器函数最多发出一个元素:

static <TR> Gatherer<TR, ?, TR> selectOne(BinaryOperator<TR> selector) {

// Validate input
Objects.requireNonNull(selector, "selector must not be null");

// Private state to track information across elements
class State {
TR value; // The current best value
boolean hasValue; // true when value holds a valid value
}

// Use the `of` factory method to construct a gatherer given a set
// of functions for `initializer`, `integrator`, `combiner`, and `finisher`
return Gatherer.of(

// The initializer creates a new State instance
State::new,

// The integrator; in this case we use `ofGreedy` to signal
// that this integerator will never short-circuit
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
if (!state.hasValue) {
// The first element, just save it
state.value = element;
state.hasValue = true;
} else {
// Select which value of the two to save, and save it
state.value = selector.apply(state.value, element);
}
return true;
}),

// The combiner, used during parallel evaluation
(leftState, rightState) -> {
if (!leftState.hasValue) {
// If no value on the left, return the right
return rightState;
} else if (!rightState.hasValue) {
// If no value on the right, return the left
return leftState;
} else {
// If both sides have values, select one of them to keep
// and store it in the leftState, as that will be returned
leftState.value = selector.apply(leftState.value,
rightState.value);
return leftState;
}
},

// The finisher
(state, downstream) -> {
// Emit the selected value, if there is one, downstream
if (state.hasValue)
downstream.push(state.value);
}

);
}

示例用法,用于随机整数流:

jshell> Stream.generate(() -> ThreadLocalRandom.current().nextInt())
.limit(1000) // Take the first 1000 elements
.gather(selectOne(Math::max)) // Select the largest value seen
.parallel() // Execute in parallel
.findFirst() // Extract the largest value
$1 ==> Optional[99822]

替代方案

我们在另一份设计文档中探讨了替代方案。

风险和假设

  • 使用自定义收集器以及在 Gatherers 类中声明的内置收集器,将不会像使用在 Stream 类中声明的内置中间操作那样简洁。然而,自定义收集器的定义在复杂度上将类似于为终端 collect 操作定义的自定义收集器。此外,使用自定义和内置收集器的复杂度将类似于使用自定义收集器以及在 Collectors 类中声明的内置收集器。

  • 在预览此功能的过程中,我们可能会修订内置收集器的集合,并且在未来版本中也可能会对其进行修订。

  • 即便为了保持一致性而有这种诱惑,我们也不会为 Gatherers 类中定义的每个内置收集器都在 Stream 类中添加一个新的中间操作。为了保持 Stream 类的学习性,我们只会在经验表明它们广泛有用后才会考虑向其中添加新的中间操作。我们可能会在后续的预览轮次中,甚至在此功能最终确定之后,再添加这样的方法。现在暴露新的内置收集器并不排除以后添加专用的 Stream 方法。