JEP 461: Stream Gatherers (Preview)
Summary
Enhance the Stream API to support custom intermediate operations. This will allow stream pipelines to transform data in ways that are not easily achievable with the existing built-in intermediate operations. This is a preview API.
Goals
-
Make stream pipelines more flexible and expressive.
-
Insofar as possible, allow custom intermediate operations to manipulate streams of infinite size.
Non-Goals
-
It is not a goal to change the Java programming language to better facilitate stream processing.
-
It is not a goal to special-case the compilation of code that uses the Stream API.
Motivation
Java 8 introduced the first API designed specifically for lambda expressions: the Stream API, java.util.stream
. A stream is a lazily computed, potentially unbounded sequence of values. The API supports the ability to process a stream either sequentially or in parallel.
A stream pipeline consists of three parts: a source of elements, any number of intermediate operations, and a terminal operation. For example:
long numberOfWords =
Stream.of("the", "", "fox", "jumps", "over", "the", "", "dog") // (1)
.filter(Predicate.not(String::isEmpty)) // (2)
.collect(Collectors.counting()); // (3)
This programming style is both expressive and efficient. With the builder-style API, each intermediate operation returns a new stream; evaluation begins only when a terminal operation is invoked. In this example, line (1) creates a stream, but does not evaluate it, line (2) sets up an intermediate filter
operation but still does not evaluate the stream, and finally the terminal collect
operation on line (3) evaluates the entire stream pipeline.
The Stream API provides a reasonably rich, albeit fixed, set of intermediate and terminal operations: mapping, filtering, reduction, sorting, and so forth. It also includes an extensible terminal operation, Stream::collect
, which enables the output of a pipeline to be summarized in a variety of ways.
The use of streams in the Java ecosystem is by now pervasive, and ideal for many tasks, but the fixed set of intermediate operations means that some complex tasks cannot easily be expressed as stream pipelines. Either a required intermediate operation does not exist, or it exists but does not directly support the task.
As an example, suppose the task is to take a stream of strings and make it distinct, but with distinctness based on string length rather than content. That is, at most one string of length 1 should be emitted, and at most one string of length 2, and at most one string of length 3, and so forth. Ideally, the code would look something like this:
var result = Stream.of("foo", "bar", "baz", "quux")
.distinctBy(String::length) // Hypothetical
.toList();
// result ==> [foo, quux]
Unfortunately, distinctBy
is not a built-in intermediate operation. The closest built-in operation, distinct
, tracks the elements it has already seen by using object equality to compare them. That is, distinct
is stateful but in this case uses the wrong state: We want it to track elements based on equality of string length, not string content. We could work around this limitation by declaring a class that defines object equality in terms of string length, wrapping each string in an instance of that class and applying distinct
to those instances. This expression of the task is not intuitive, however, and makes for code that is difficult to maintain:
record DistinctByLength(String str) {
@Override public boolean equals(Object obj) {
return obj instanceof DistinctByLength(String other)
&& str.length() == other.length();
}
@Override public int hashCode() {
return str == null ? 0 : Integer.hashCode(str.length());
}
}
var result = Stream.of("foo", "bar", "baz", "quux")
.map(DistinctByLength::new)
.distinct()
.map(DistinctByLength::str)
.toList();
// result ==> [foo, quux]
As another example, suppose the task is to group elements into fixed-size groups of three, but retain only the first two groups: [0, 1, 2, 3, 4, 5, 6, ...]
should produce [[0, 1, 2], [3, 4, 5]]
. Ideally, the code would look like this:
var result = Stream.iterate(0, i -> i + 1)
.windowFixed(3) // Hypothetical
.limit(2)
.toList();
// result ==> [[0, 1, 2], [3, 4, 5]]
Unfortunately, no built-in intermediate operation supports this task. The best option is to place the fixed-window grouping logic in the terminal operation, by invoking collect
with a custom Collector
. However, we must precede the collect
operation with a fixed-size limit
operation, since the collector cannot signal to collect
that it is finished while new elements are appearing — which happens forever with an infinite stream. Also, the task is inherently about ordered data, so it is not feasible to have the collector perform grouping in parallel, and it must signal this fact by throwing an exception if its combiner is invoked. The resulting code is difficult to understand:
var result
= Stream.iterate(0, i -> i + 1)
.limit(3 * 2)
.collect(Collector.of(
() -> new ArrayList<ArrayList<Integer>>(),
(groups, element) -> {
if (groups.isEmpty() || groups.getLast().size() == 3) {
var current = new ArrayList<Integer>();
current.add(element);
groups.addLast(current);
} else {
groups.getLast().add(element);
}
},
(left, right) -> {
throw new UnsupportedOperationException("Cannot be parallelized");
}
));
// result ==> [[0, 1, 2], [3, 4, 5]]
Over the years, many new intermediate operations have been suggested for the Stream API. Most of them make sense when considered in isolation, but adding all of them would make the (already large) Stream API more difficult to learn because its operations would be less discoverable.
The designers of the Stream API understood that it would be desirable to have an extension point so that anyone could define intermediate stream operations. At the time, however, they did not know what that extension point should look like. It eventually became clear that the extension point for terminal operations, namely Stream::collect(Collector)
, was effective. We can now take a similar approach for intermediate operations.
In summary, more intermediate operations create more situational value, making streams a better fit for even more tasks. We should provide an API for custom intermediate operations that allows developers to transform finite and infinite streams in their preferred ways.
Description
Stream::gather(Gatherer)
is a new intermediate stream operation that processes the elements of a stream by applying a user-defined entity called a gatherer. With the gather
operation we can build efficient, parallel-ready streams that implement almost any intermediate operation. Stream::gather(Gatherer)
is to intermediate operations what Stream::collect(Collector)
is to terminal operations.
A gatherer represents a transform of the elements of a stream; it is an instance of the java.util.stream.Gatherer
interface. Gatherers can transform elements in a one-to-one, one-to-many, many-to-one, or many-to-many fashion. They can track previously seen elements in order to influence the transformation of later elements, they can short-circuit in order to transform infinite streams to finite ones, and they can enable parallel execution. For example, a gatherer can transform one input element to one output element until some condition becomes true, at which time it starts to transform one input element to two output elements.
A gatherer is defined by four functions that work together:
-
The optional initializer function provides an object that maintains private state while processing stream elements. For example, a gatherer can store the current element so that, the next time it is applied, it can compare the new element with the now-previous element and, say, emit only the larger of the two. In effect, such a gatherer transforms two input elements into one output element.
-
The integrator function integrates a new element from the input stream, possibly inspecting the private state object and possibly emitting elements to the output stream. It can also terminate processing before reaching the end of the input stream; for example, a gatherer searching for the largest of a stream of integers can terminate if it detects
Integer.MAX_VALUE
. -
The optional combiner function can be used to evaluate the gatherer in parallel when the input stream is marked as parallel. If a gatherer is not parallel-capable then it can still be part of a parallel stream pipeline, but it is evaluated sequentially. This is useful for cases where an operation is inherently ordered in nature and thus cannot be parallelized.
-
The optional finisher function is invoked when there are no more input elements to consume. This function can inspect the private state object and, possibly, emit additional output elements. For example, a gatherer searching for a specific element amongst its input elements can report failure, say by throwing an exception, when its finisher is invoked.
When invoked, Stream::gather
performs the equivalent of the following steps:
-
Create a
Downstream
object which, when given an element of the gatherer’s output type, passes it to the next stage in the pipeline. -
Obtain the gatherer’s private state object by invoking the
get()
method of its initializer. -
Obtain the gatherer’s integrator by invoking its
integrator()
method. -
While there are more input elements, invoke the integrator's integrate(...) method, passing it the state object, the next element, and the downstream object. Terminate if that method returns
false
. -
Obtain the gatherer’s finisher and invoke it with the state and downstream objects.
Every existing intermediate operation declared in the Stream
interface can be implemented by invoking gather
with a gatherer that implements that operation. For example, given a stream of T
-typed elements, Stream::map
turns each T
element into a U
element by applying a function and then passes the U
element downstream; this is simply a stateless one-to-one gatherer. As another example, Stream::filter
takes a predicate that determines whether an input element should be passed downstream; this is simply a stateless one-to-many gatherer. In fact every stream pipeline is, conceptually, equivalent to
source.gather(...).gather(...).gather(...).collect(...)