跳到主要内容

JEP 461:流收集器(预览)

概括

增强Stream API以支持自定义中间操作。这将允许流管道以现有内置中间操作不易实现的方式转换数据。这是一个预览 API

目标

  • 使流管道更加灵活和富有表现力。

  • 尽可能允许自定义中间操作来操作无限大小的流。

非目标

  • 改变 Java 编程语言以更好地促进流处理并不是目标。

  • 使用 Stream API 的代码编译的特殊情况并不是我们的目标。

动机

Java 8 引入了第一个专门为 lambda 表达式设计的 API:Stream API java.util.stream、.流是一个延迟计算的、可能无限的值序列。 API 支持顺序或并行处理流的能力。

_流管道_由三部分组成:元素源、任意数量的中间操作和终端操作。例如:

long numberOfWords =
Stream.of("the", "", "fox", "jumps", "over", "the", "", "dog") // (1)
.filter(Predicate.not(String::isEmpty)) // (2)
.collect(Collectors.counting()); // (3)

这种编程风格既富有表现力又高效。使用构建器风格的API,每个中间操作都会返回一个新的流;仅当调用终端操作时才开始评估。在此示例中,第 (1) 行创建了一个流,但不评估它,第 (2) 行设置了一个中间filter操作,但仍然不评估流,最后第collect(3) 行的终端操作评估整个流管道。

Stream API 提供了相当丰富(尽管是固定的)的中间和终端操作集:映射、过滤、缩减、排序等。它还包括一个可扩展的终端操作,Stream::collect它允许以多种方式汇总管道的输出。

目前,Java 生态系统中流的使用非常普遍,并且对于许多任务来说都是理想的选择,但是固定的中间操作集意味着一些复杂的任务无法轻松地表示为流管道。所需的中间操作不存在,或者存在但不直接支持该任务。

举个例子,假设任务是获取字符串流并使其不同,但不同之处是基于字符串长度而不是内容。也就是说,最多应发出长度为 1 的字符串,最多应发出长度为 2 的字符串,最多应发出长度为 3 的字符串,依此类推。理想情况下,代码看起来像这样:

var result = Stream.of("foo", "bar", "baz", "quux")
.distinctBy(String::length) // Hypothetical
.toList();

// result ==> [foo, quux]

不幸的是,distinctBy这不是一个内置的中间操作。最接近的内置操作 ,distinct通过使用对象相等性来比较它们来跟踪它已经看到的元素。也就是说,distinct是有状态的,但在这种情况下使用了错误的状态:我们希望它根据字符串长度的相等性而不是字符串内容来跟踪元素。我们可以通过声明一个类来解决这个限制,该类根据字符串长度定义对象相等性,将每个字符串包装在该类的实例中并应用于distinct这些实例。然而,这种任务的表达并不直观,并且导致代码难以维护:

record DistinctByLength(String str) {

@Override public boolean equals(Object obj) {
return obj instanceof DistinctByLength(String other)
&& str.length() == other.length();
}

@Override public int hashCode() {
return str == null ? 0 : Integer.hashCode(str.length());
}

}

var result = Stream.of("foo", "bar", "baz", "quux")
.map(DistinctByLength::new)
.distinct()
.map(DistinctByLength::str)
.toList();

// result ==> [foo, quux]

再举一个例子,假设任务是将元素分组为固定大小的三组,但仅保留前两组:[0, 1, 2, 3, 4, 5, 6, ...]应该产生[[0, 1, 2], [3, 4, 5]].理想情况下,代码如下所示:

var result = Stream.iterate(0, i -> i + 1)
.windowFixed(3) // Hypothetical
.limit(2)
.toList();

// result ==> [[0, 1, 2], [3, 4, 5]]

不幸的是,没有内置的中间操作支持此任务。最好的选择是通过使用collect自定义Collector.然而,我们必须在该collect操作之前进行固定大小的操作,因为当新元素出现时,limit收集器无法发出信号表明它已经完成——这种情况在无限流中永远发生。collect此外,该任务本质上是关于有序数据的,因此让收集器并行执行分组是不可行的,并且如果调用其组合器,则必须通过抛出异常来表明这一事实。结果代码很难理解:

var result
= Stream.iterate(0, i -> i + 1)
.limit(3 * 2)
.collect(Collector.of(
() -> new ArrayList<ArrayList<Integer>>(),
(groups, element) -> {
if (groups.isEmpty() || groups.getLast().size() == 3) {
var current = new ArrayList<Integer>();
current.add(element);
groups.addLast(current);
} else {
groups.getLast().add(element);
}
},
(left, right) -> {
throw new UnsupportedOperationException("Cannot be parallelized");
}
));

// result ==> [[0, 1, 2], [3, 4, 5]]

多年来,人们为 Stream API 建议了许多新的中间操作。当单独考虑时,它们中的大多数都是有意义的,但添加所有它们将使(已经很大的)Stream API 更难学习,因为它的操作将更难被发现。

Stream API 的设计者明白,希望有一个扩展点,以便任何人都可以定义中间流操作。然而,当时他们并不知道该扩展点应该是什么样子。最终结果表明,终端操作的扩展点,即Stream::collect(Collector),是有效的。我们现在可以对中间操作采取类似的方法。

总之,更多的中间操作可以创造更多的情境价值,使流更适合更多的任务。我们应该为自定义中间操作提供一个 API,允许开发人员以他们喜欢的方式转换有限和无限流。

描述

Stream::gather(Gatherer)_是一种新的中间流操作,它通过应用称为Gatherer_的用户定义实体来处理流的元素。通过该gather操作,我们可以构建高效、并行就绪的流,以实现几乎任何中间操作。Stream::gather(Gatherer)中间操作的含义与Stream::collect(Collector)终端操作的含义相同。

_收集器_代表流元素的转换;它是接口的一个实例java.util.stream.Gatherer。收集者可以以一对一、一对多、多对一或多对多的方式转换元素。它们可以跟踪先前看到的元素以影响后面元素的转换,它们可以短路以将无限流转换为有限流,并且它们可以启用并行执行。例如,收集器可以将一个输入元素转换为一个输出元素,直到某个条件变为真,此时它开始将一个输入元素转换为两个输出元素。

收集器由四个协同工作的函数定义:

  • 可选的_初始化_函数提供了一个在处理流元素时维护私有状态的对象。例如,收集器可以存储当前元素,以便下次应用时,它可以将新元素与当前的前一个元素进行比较,并且仅发出两者中较大的一个。实际上,这样的收集器将两个输入元素转换为一个输出元素。

  • _积分器_函数集成来自输入流的新元素,可能检查私有状态对象并可能将元素发送到输出流。它还可以在到达输入流末尾之前终止处理;例如,搜索最大整数流的收集器如果检测到 则可以终止Integer.MAX_VALUE

  • 当输入流标记为并行时,可选的_组合器_函数可用于并行评估收集器。如果收集器不支持并行,那么它仍然可以是并行流管道的一部分,但它是按顺序评估的。这对于操作本质上是有序的并且因此无法并行化的情况非常有用。

  • 当没有更多的输入元素可供使用时,将调用可选的_完成器_函数。该函数可以检查私有状态对象,并可能发出额外的输出元素。例如,在调用其终结器时,在其输入元素中搜索特定元素的收集器可以报告失败,例如通过抛出异常。

调用时,Stream::gather执行与以下步骤等效的操作:

  • 创建一个Downstream对象,当给定收集器输出类型的元素时,该对​​象会将其传递到管道中的下一个阶段。

  • get()通过调用其初始值设定项的方法来获取 Gatherer 的私有状态对象。

  • 通过调用采集器的方法来获取采集器的积分器integrator()

  • 当有更多输入元素时,调用积分器的integrate(...)方法,向其传递状态对象、下一个元素和下游对象。如果该方法返回则终止false

  • 获取 Gatherer 的Finisher并使用状态和下游对象调用它。

接口中声明的每个现有中间操作都Stream可以通过调用gather实现该操作的收集器来实现。例如,给定一个T类型元素流,通过应用函数Stream::map将每个T元素转换为一个元素,然后将该元素传递到下游;这只是一个无状态的一对一收集器。作为另一个例子,采用一个谓词来确定输入元素是否应该向下游传递;这只是一个无状态的一对多收集器。事实上,从概念上讲,每个流管道都相当于U``U``Stream::filter

source.gather(...).gather(...).gather(...).collect(...)

内置收集器

我们在java.util.stream.Gatherers类中引入以下内置收集器:

  • fold是一个有状态的多对一收集器,它增量地构造聚合,并在不再存在输入元素时发出该聚合。

  • mapConcurrent是一个有状态的一对一收集器,它同时为每个输入元素调用提供的函数,直至达到提供的限制。

  • scan是一个有状态的一对一收集器,它将提供的函数应用于当前状态和当前元素以生成下一个元素,并将其传递到下游。

  • windowFixed是一个有状态的多对多收集器,它将输入元素分组到指定大小的列表中,当窗口已满时向下游发出窗口。

  • windowSliding是一个有状态的多对多收集器,它将输入元素分组到指定大小的列表中。在第一个窗口之后,通过删除第一个元素并从输入流附加下一个元素,从其前一个窗口的副本创建每个后续窗口。

并行评估

收集器的并行评估分为两种不同的模式。当没有提供组合器时,流库仍然可以通过并行执行上游和下游操作来提取并行性,类似于可短路parallel().forEachOrdered()操作。当提供组合器时,并行评估类似于可短路parallel().reduce()操作。

撰写收集者

收集器通过该方法支持组合andThen(Gatherer),该方法连接两个收集器,其中第一个收集器生成第二个收集器可以消耗的元素。这使得可以通过组合更简单的收集器来创建复杂的收集器,就像函数组合一样。从语义上来说,

source.gather(a).gather(b).gather(c).collect(...)

相当于

source.gather(a.andThen(b).andThen(c)).collect(...)

收集者与收藏者

界面的设计Gatherer很大程度上受到Collector.主要区别是:

  • Gatherer使用 anIntegrator而不是 aBiConsumer进行每个元素的处理,因为它需要对象的额外输入参数Downstream,并且因为它需要返回 aboolean来指示处理是否应该继续。

  • Gatherer使用 aBiConsumer作为其终结器而不是 a ,Function因为它的对象需要额外的输入参数Downstream,并且因为它无法返回结果,因此是void

示例:拥抱流

有时,缺乏适当的中间操作迫使我们将流评估为列表并循环运行我们的分析逻辑。例如,假设我们有一个按时间顺序排列的温度读数流:

record Reading(Instant obtainedAt, int kelvins) {

Reading(String time, int kelvins) {
this(Instant.parse(time), kelvins);
}

static Stream<Reading> loadRecentReadings() {
// In reality these could be read from a file, a database,
// a service, or otherwise
return Stream.of(
new Reading("2023-09-21T10:15:30.00Z", 310),
new Reading("2023-09-21T10:15:31.00Z", 312),
new Reading("2023-09-21T10:15:32.00Z", 350),
new Reading("2023-09-21T10:15:33.00Z", 310)
);
}

}

进一步假设我们想要检测该流中的可疑变化,定义为五秒时间窗口内两个连续读数的温度变化超过 30° 开尔文:

boolean isSuspicious(Reading previous, Reading next) {
return next.obtainedAt().isBefore(previous.obtainedAt().plusSeconds(5))
&& (next.kelvins() > previous.kelvins() + 30
|| next.kelvins() < previous.kelvins() - 30);
}

这需要对输入流进行顺序扫描,因此我们必须避免声明式流处理并强制执行我们的分析:

List<List<Reading>> findSuspicious(Stream<Reading> source) {
var suspicious = new ArrayList<List<Reading>>();
Reading previous = null;
boolean hasPrevious = false;
for (Reading next : source.toList()) {
if (!hasPrevious) {
hasPrevious = true;
previous = next;
} else {
if (isSuspicious(previous, next))
suspicious.add(List.of(previous, next));
previous = next;
}
}
return suspicious;
}

var result = findSuspicious(Reading.loadRecentReadings());

// result ==> [[Reading[obtainedAt=2023-09-21T10:15:31Z, kelvins=312],
// Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350]],
// [Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350],
// Reading[obtainedAt=2023-09-21T10:15:33Z, kelvins=310]]]

然而,通过收集器,我们可以更简洁地表达这一点:

List<List<Reading>> findSuspicious(Stream<Reading> source) {
return source.gather(Gatherers.windowSliding(2))
.filter(window -> (window.size() == 2
&& isSuspicious(window.get(0),
window.get(1))))
.toList();
}

示例:定义收集器

windowFixed类中声明的 Gatherer可以Gatherers编写为接口的直接实现Gatherer

record WindowFixed<TR>(int windowSize)
implements Gatherer<TR, ArrayList<TR>, List<TR>>
{

public WindowFixed {
// Validate input
if (windowSize < 1)
throw new IllegalArgumentException("window size must be positive");
}

@Override
public Supplier<ArrayList<TR>> initializer() {
// Create an ArrayList to hold the current open window
return () -> new ArrayList<>(windowSize);
}

@Override
public Integrator<ArrayList<TR>, TR, List<TR>> integrator() {
// The integrator is invoked for each element consumed
return Gatherer.Integrator.ofGreedy((window, element, downstream) -> {

// Add the element to the current open window
window.add(element);

// Until we reach our desired window size,
// return true to signal that more elements are desired
if (window.size() < windowSize)
return true;

// When the window is full, close it by creating a copy
var result = new ArrayList<TR>(window);

// Clear the window so the next can be started
window.clear();

// Send the closed window downstream
return downstream.push(result);

});
}

// The combiner is omitted since this operation is intrinsically sequential,
// and thus cannot be parallelized

@Override
public BiConsumer<ArrayList<TR>, Downstream<? super List<TR>>> finisher() {
// The finisher runs when there are no more elements to pass from
// the upstream
return (window, downstream) -> {
// If the downstream still accepts more elements and the current
// open window is non-empty, then send a copy of it downstream
if(!downstream.isRejecting() && !window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
};
}
}

用法示例:

jshell> Stream.of(1,2,3,4,5,6,7,8,9).gather(new WindowFixed(3)).toList()
$1 ==> [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

示例:临时收集器

windowFixed或者,可以通过工厂方法以临时方式编写收集器Gatherer.ofSequential(...)

/**
* Gathers elements into fixed-size groups. The last group may contain fewer
* elements.
* @param windowSize the maximum size of the groups
* @return a new gatherer which groups elements into fixed-size groups
* @param <TR> the type of elements the returned gatherer consumes and produces
*/
static <TR> Gatherer<TR, ?, List<TR>> fixedWindow(int windowSize) {

// Validate input
if (windowSize < 1)
throw new IllegalArgumentException("window size must be non-zero");

// This gatherer is inherently order-dependent,
// so it should not be parallelized
return Gatherer.ofSequential(

// The initializer creates an ArrayList which holds the current
// open window
() -> new ArrayList<TR>(windowSize),

// The integrator is invoked for each element consumed
Gatherer.Integrator.ofGreedy((window, element, downstream) -> {

// Add the element to the current open window
window.add(element);

// Until we reach our desired window size,
// return true to signal that more elements are desired
if (window.size() < windowSize)
return true;

// When window is full, close it by creating a copy
var result = new ArrayList<TR>(window);

// Clear the window so the next can be started
window.clear();

// Send the closed window downstream
return downstream.push(result);

}),

// The combiner is omitted since this operation is intrinsically sequential,
// and thus cannot be parallelized

// The finisher runs when there are no more elements to pass from the upstream
(window, downstream) -> {
// If the downstream still accepts more elements and the current
// open window is non-empty then send a copy of it downstream
if(!downstream.isRejecting() && !window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
}

);
}

用法示例:

jshell> Stream.of(1,2,3,4,5,6,7,8,9).gather(fixedWindow(3)).toList()
$1 ==> [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

示例:可并行收集器

当在并行流中使用时,如果收集器提供组合器功能,则仅并行评估它。例如,这个可并行收集器根据提供的选择器函数最多发出一个元素:

static <TR> Gatherer<TR, ?, TR> selectOne(BinaryOperator<TR> selector) {

// Validate input
Objects.requireNonNull(selector, "selector must not be null");

// Private state to track information across elements
class State {
TR value; // The current best value
boolean hasValue; // true when value holds a valid value
}

// Use the `of` factory method to construct a gatherer given a set
// of functions for `initializer`, `integrator`, `combiner`, and `finisher`
return Gatherer.of(

// The initializer creates a new State instance
State::new,

// The integrator; in this case we use `ofGreedy` to signal
// that this integerator will never short-circuit
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
if (!state.hasValue) {
// The first element, just save it
state.value = element;
state.hasValue = true;
} else {
// Select which value of the two to save, and save it
state.value = selector.apply(state.value, element);
}
return true;
}),

// The combiner, used during parallel evaluation
(leftState, rightState) -> {
if (!leftState.hasValue) {
// If no value on the left, return the right
return rightState;
} else if (!rightState.hasValue) {
// If no value on the right, return the left
return leftState;
} else {
// If both sides have values, select one of them to keep
// and store it in the leftState, as that will be returned
leftState.value = selector.apply(leftState.value,
rightState.value);
return leftState;
}
},

// The finisher
(state, downstream) -> {
// Emit the selected value, if there is one, downstream
if (state.hasValue)
downstream.push(state.value);
}

);
}

在随机整数流上的用法示例:

jshell> Stream.generate(() -> ThreadLocalRandom.current().nextInt())
.limit(1000) // Take the first 1000 elements
.gather(selectOne(Math::max)) // Select the largest value seen
.parallel() // Execute in parallel
.findFirst() // Extract the largest value
$1 ==> Optional[99822]

备择方案

我们在单独的设计文档中探索了替代方案。

风险和假设

  • 自定义收集器以及类中声明的内置收集器的使用Gatherers不会像类中声明的内置中间操作的使用那样简洁Stream。然而,自定义收集器的定义在复杂性上与终端操作的自定义收集器的定义类似collect。此外,自定义收集器和内置收集器的使用在复杂性上与自定义收集器和类中声明的内置收集器的使用类似Collectors

  • 我们可能会在预览此功能的过程中修改内置收集器集,并且可能会在未来版本中修改内置收集器集。

  • 我们不会Stream为类中定义的每个内置收集器向类添加新的中间操作Gatherers,尽管为了统一起见很容易这样做。为了保持Stream类的可学习性,只有在经验表明它们广泛有用之后,我们才会考虑向其中添加新的中间操作。我们可能会在后面一轮预览中添加此类方法,甚至在该功能最终确定后也可能会添加此类方法。现在公开新的内置收集器并不排除Stream以后添加专用方法。