跳到主要内容

JEP 462:结构化并发(第二次预览)

QWen Max 中英对照 JEP 462: Structured Concurrency (Second Preview)

总结

通过引入 结构化并发 的 API 来简化并发编程。结构化并发将不同线程中运行的相关任务组视为一个工作单元,从而简化错误处理和取消操作,提高可靠性,并增强可观测性。这是一个 预览 API

历史

结构化并发(Structured Concurrency)由 JEP 428 提出,并在 JDK 19 中以孵化 API 的形式交付。随后,通过 JEP 437,它在 JDK 20 中重新孵化,并进行了小幅更新以继承作用域值(JEP 429)。它首次在 JDK 21 中预览,通过 JEP 453,将 StructuredTaskScope::fork(...) 的返回值从 Future 改为 Subtask。我们在此提议在 JDK 22 中不加修改地重新预览该 API,以获取更多反馈。

目标

  • 推广一种可以消除因取消和关闭而产生的常见风险(如线程泄漏和取消延迟)的并发编程风格。

  • 提高并发代码的可观测性。

非目标

  • 并不旨在替换 java.util.concurrent 包中的任何并发结构,例如 ExecutorServiceFuture

  • 并不旨在为 Java 平台定义最终的结构化并发 API。其他的结构化并发结构可以由第三方库或在未来的 JDK 版本中定义。

  • 并不旨在定义一种在线程之间共享数据流的方法(即 *[通道](https://en.wikipedia.org/wiki/Channel_\(programming\))*)。我们可能会在未来提出相关建议。

  • 并不旨在用新的线程取消机制替换现有的线程中断机制。我们可能会在未来提出相关建议。

动机

开发者通过将任务分解为多个子任务来管理复杂性。在普通的单线程代码中,子任务是按顺序执行的。然而,如果子任务之间足够独立,并且硬件资源充足,那么通过并发执行子任务,可以加快整个任务的运行速度(即降低延迟)。例如,一个组合多个 I/O 操作结果的任务,如果每个 I/O 操作都在自己的线程中并发执行,那么该任务将运行得更快。虚拟线程(JEP 444)使得为每个这样的 I/O 操作分配一个线程变得经济高效,但管理可能由此产生的大量线程仍然是一个挑战。

使用 ExecutorService 的非结构化并发

Java 5 中引入的 java.util.concurrent.ExecutorService API 可帮助开发者并发执行子任务。

例如,这里有一个方法 handle(),它代表了服务器应用程序中的一个任务。该方法通过向 ExecutorService 提交两个子任务来处理传入的请求。一个子任务执行 findUser() 方法,另一个子任务执行 fetchOrder() 方法。ExecutorService 会立即为每个子任务返回一个 Future,并根据 Executor 的调度策略并发执行这些子任务。handle() 方法通过阻塞调用其 futures 的 get() 方法来等待子任务的结果,因此该任务被称为 join 其子任务。

Response handle() throws ExecutionException, InterruptedException {
Future<String> user = esvc.submit(() -> findUser());
Future<Integer> order = esvc.submit(() -> fetchOrder());
String theUser = user.get(); // Join findUser
int theOrder = order.get(); // Join fetchOrder
return new Response(theUser, theOrder);
}

由于子任务是并发执行的,每个子任务可以独立成功或失败。(在此上下文中,失败意味着抛出异常。)通常,如果任何一个子任务失败,像 handle() 这样的任务也应该失败。当失败发生时,理解线程的生命周期可能会变得异常复杂:

  • 如果 findUser() 抛出异常,那么在调用 user.get()handle() 将抛出异常,但 fetchOrder() 将继续在其自己的线程中运行。这是一个 线程泄漏,最好的情况下会浪费资源;最坏的情况下,fetchOrder() 线程将干扰其他任务。

  • 如果执行 handle() 的线程被中断,该中断不会传播到子任务。findUser()fetchOrder() 线程都会泄漏,即使在 handle() 失败后仍将继续运行。

  • 如果 findUser() 执行时间过长,但在此期间 fetchOrder() 失败,那么 handle() 将通过阻塞在 user.get() 上而无谓地等待 findUser(),而不是取消它。只有在 findUser() 完成并且 user.get() 返回后,order.get() 才会抛出异常,导致 handle() 失败。

每个案例的问题在于,我们的程序按照任务-子任务关系进行了逻辑结构设计,但这些关系只存在于开发者的头脑中。

这不仅增加了出错的空间,还使得诊断和排查这类错误变得更加困难。例如,像线程转储这样的可观测性工具会在无关线程的调用栈中显示 handle()findUser()fetchOrder(),但不会提供任何关于任务与子任务关系的线索。

当发生错误时,我们可以通过显式取消其他子任务来尝试做得更好,例如使用 try-finally 包装任务,并在失败任务的 catch 块中调用其他任务的 future 的 cancel(boolean) 方法。我们还需要在 try-with-resources 语句 中使用 ExecutorService,正如 JEP 425 中的示例所示,因为 Future 不提供等待已取消任务的方式。但所有这些操作都很难正确实现,并且通常会使代码的逻辑意图更难以辨别。跟踪任务间的关系,并手动添加所需的任务间取消关联,这对开发者来说要求过高了。

这种需要手动协调生命周期的原因在于,ExecutorServiceFuture 允许无限制的并发模式。所涉及的任何线程都没有约束或顺序限制。一个线程可以创建一个 ExecutorService,第二个线程可以向其提交任务,而执行任务的线程与第一个线程或第二个线程没有任何关系。此外,在一个线程提交任务后,一个完全不同的线程可以等待执行结果。任何持有对 Future 引用的代码都可以加入它(即通过调用 get() 方法等待其结果),即使是获取该 Future 的线程之外的其他线程中的代码也可以这样做。实际上,由一个任务启动的子任务不必返回到提交它的任务。它可以返回到多个任务中的任何一个,甚至可能不返回。

由于 ExecutorServiceFuture 允许这种非结构化的使用方式,它们并不强制甚至不跟踪任务和子任务之间的关系,即使这些关系是常见且有用的。因此,即使子任务在同一个任务中被提交和合并,一个子任务的失败也不能自动导致另一个子任务的取消:在上述 handle() 方法中,fetchOrder() 的失败不能自动导致 findUser() 的取消。fetchOrder() 的 future 与 findUser() 的 future 毫无关联,而且它们都与最终通过其 get() 方法进行合并的线程无关。我们希望可靠地自动化这一过程,而不是要求开发人员手动管理这种取消操作。

任务结构应反映代码结构

ExecutorService 下自由组合的线程不同,单线程代码的执行始终强制任务和子任务之间的层级关系。一个方法的主体块 {...} 对应一个任务,而在此块内调用的方法则对应子任务。被调用的方法必须返回到调用它的方法,或者向其抛出异常。它不能比调用它的方法存在更久,也不能返回或向其他方法抛出异常。因此,所有子任务都会在任务之前完成,每个子任务都是其父任务的子级,并且每个子任务相对于其他子任务和任务的生命周期由代码的语法块结构决定。

例如,在这个单线程版本的 handle() 中,任务与子任务的关系从语法结构上就很明显:

Response handle() throws IOException {
String theUser = findUser();
int theOrder = fetchOrder();
return new Response(theUser, theOrder);
}

我们不会在 findUser() 子任务完成之前启动 fetchOrder() 子任务,无论 findUser() 成功还是失败。如果 findUser() 失败,那么我们根本不会启动 fetchOrder(),并且 handle() 任务会隐式失败。子任务只能返回给其父任务这一事实非常重要:这意味着父任务可以隐式地将一个子任务的失败作为触发器,取消其他未完成的子任务,然后自身也失败。

在单线程代码中,任务与子任务的层次结构在运行时会具体化为调用栈。因此,我们可以免费获得相应的父子关系,这些关系决定了错误传播的方式。当观察单个线程时,这种层级关系显而易见:findUser()(以及稍后的 fetchOrder())看起来是从属于 handle() 的。这使得回答“handle() 当前正在处理什么?”这个问题变得非常容易。

如果任务及其子任务之间的父子关系能够从代码的语法结构中清晰可见,并且在运行时也能够具体化(如同单线程代码那样),那么并发编程将会变得更加容易、更加可靠,也更便于观察。语法结构将明确子任务的生命周期,并支持线程间层次结构的运行时表示形式,类似于线程内的调用栈。这种表示形式将支持错误传播和取消操作,同时也能够对并发程序进行有意义的观察。

(Java 平台已经有一个用于对并发任务施加结构的 API,即 java.util.concurrent.ForkJoinPool,它是并行流背后的执行引擎。然而,该 API 是为计算密集型任务设计的,而不是为涉及 I/O 的任务设计的。)

结构化并发

结构化并发 是一种并发编程方法,它保留了任务与子任务之间的自然关系,从而生成更具可读性、可维护性和可靠性的并发代码。 “结构化并发”一词由 Martin Sústrik 提出,并由 Nathaniel J. Smith 推广。 其他语言中的思想,例如 Erlang 的分层监督器,为结构化并发中的错误处理设计提供了参考。

结构化并发源于一个简单的原则,即

如果一个任务拆分为并发子任务,那么它们都会返回到同一个地方,即任务的代码块。

在结构化并发中,子任务代表一个任务工作。该任务等待子任务的结果并监控它们是否失败。与单线程代码的结构化编程技术一样,多线程的结构化并发能力来自于两个理念:(1)通过代码块的执行流程有明确的入口和出口点,以及(2)操作生命周期的严格嵌套方式,这种方式反映了它们在代码中的语法嵌套。

由于一段代码块的入口和出口是明确定义的,因此并发子任务的生命周期被限制在其父任务的语法块内。由于同级子任务的生命周期嵌套在父任务的生命周期之中,它们可以作为一个整体进行推理和管理。而父任务的生命周期又依次嵌套在其父任务的生命周期之内,因此运行时可以将任务的层次结构具体化为一棵树,这棵树是单线程调用栈的并发对应物。这使得代码可以对整个任务子树应用策略(如截止时间),并允许可观测性工具将子任务呈现为其父任务的从属任务。

结构化并发非常适合虚拟线程,虚拟线程是由 JDK 实现的轻量级线程。许多虚拟线程共享同一个操作系统线程,从而允许存在大量的虚拟线程。虚拟线程不仅数量庞大,而且成本低廉,足以表示任何并发的行为单元,即使是涉及 I/O 的行为也可以胜任。这意味着服务器应用程序可以使用结构化并发一次处理成千上万甚至数百万个传入请求:它可以为每个请求的处理任务分配一个新的虚拟线程,并且当任务通过提交子任务进行并发执行时,还可以为每个子任务分配一个新的虚拟线程。在幕后,任务与子任务的关系通过一种树形结构具体化,这是通过让每个虚拟线程携带对其唯一父线程的引用来实现的,类似于调用栈中的一个帧引用其唯一的调用者。

总之,虚拟线程提供了大量的线程。结构化并发可以正确且稳健地协调这些线程,并使可观测性工具能够按照开发者理解的方式显示线程。在 JDK 中拥有结构化并发的 API 将使得构建可维护、可靠且可观测的服务器应用程序变得更加容易。

描述

结构化并发 API 的主要类是 java.util.concurrent 包中的 StructuredTaskScope。此类允许开发者将任务组织为一组并发的子任务,并将它们作为一个整体进行协调。子任务通过单独 forking(分叉)到自己的线程中执行,然后以一个整体进行 joining(合并),并且可能以一个整体被取消。子任务的成功结果或异常会被汇总并由父任务处理。StructuredTaskScope 将子任务的生命周期限制在一个明确的词法作用域内,在该作用域中完成任务与其子任务的所有交互 —— 分叉、合并、取消、错误处理以及结果组合。

以下是之前介绍的 handle() 示例,改写为使用 StructuredTaskScopeShutdownOnFailure 将在下方解释):

Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<String> user = scope.fork(() -> findUser());
Supplier<Integer> order = scope.fork(() -> fetchOrder());

scope.join() // Join both subtasks
.throwIfFailed(); // ... and propagate errors

// Here, both subtasks have succeeded, so compose their results
return new Response(user.get(), order.get());
}
}

与原始示例相比,理解此处涉及的线程生命周期非常容易:在所有条件下,它们的生命周期都被限制在一个词法作用域内,即 try-with-resources 语句的主体。此外,使用 StructuredTaskScope 确保了许多有价值的属性:

  • 通过短路进行错误处理 — 如果 findUser()fetchOrder() 子任务中的任何一个失败,另一个任务如果尚未完成,则会被取消。(这是由 ShutdownOnFailure 实现的关闭策略管理的;也可能存在其他策略)。

  • 取消传播 — 如果运行 handle() 的线程在调用 join() 之前或期间被中断,当线程退出作用域时,两个子任务都会自动取消。

  • 清晰性 — 上述代码具有清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功(并处理已经完成的子任务结果)还是失败(子任务已经完成,因此无需进一步清理)。

  • 可观测性 — 线程转储(如下 所述)清楚地显示了任务层次结构,运行 findUser()fetchOrder() 的线程显示为作用域的子线程。

StructuredTaskScope 是一个 预览 API,默认情况下处于禁用状态

要使用 StructuredTaskScope API,您必须启用预览版 API,如下所示:

  • 使用 javac --release 21 --enable-preview Main.java 编译程序,并使用 java --enable-preview Main 运行它;或者,

  • 当使用 源代码启动器 时,通过 java --source 21 --enable-preview Main.java 运行程序;或者,

  • 当使用 jshell 时,通过 jshell --enable-preview 启动它。

使用 StructuredTaskScope

public class StructuredTaskScope<T> implements AutoCloseable {

public <U extends T> Subtask<U> fork(Callable<? extends U> task);
public void shutdown();

public StructuredTaskScope<T> join() throws InterruptedException;
public StructuredTaskScope<T> joinUntil(Instant deadline)
throws InterruptedException, TimeoutException;
public void close();

protected void handleComplete(Subtask<? extends T> handle);
protected final void ensureOwnerAndJoined();

}

使用 StructuredTaskScope 的代码的一般工作流程是:

  1. 创建一个作用域。创建该作用域的线程是其拥有者

  2. 使用 fork(Callable) 方法在作用域中分叉子任务。

  3. 在任何时候,任何子任务或作用域的拥有者都可以调用作用域的 shutdown() 方法来取消未完成的子任务,并防止分叉新的子任务。

  4. 作用域的拥有者以单元的形式加入作用域,即加入其所有的子任务。拥有者可以调用作用域的 join() 方法,等待所有子任务完成(成功或失败)或通过 shutdown() 被取消。或者,它可以调用作用域的 joinUntil(java.time.Instant) 方法,等待直到某个截止时间。

  5. 加入完成后,处理子任务中的任何错误并处理其结果。

  6. 关闭作用域,通常通过 try-with-resources 隐式完成。这会在作用域尚未关闭时将其关闭,并等待任何已被取消但尚未完成的子任务完成。

每次调用 fork(...) 都会启动一个新线程来执行子任务,默认情况下这是一个虚拟线程。子任务可以创建自己的嵌套 StructuredTaskScope 来派生其子任务,从而形成一个层次结构。该层次结构反映在代码的块结构中,它限制了子任务的生命周期:一旦作用域关闭,所有子任务的线程都保证已终止,并且在块退出时不会留下任何线程。

在某个范围内的任何子任务、嵌套范围内的任何子子任务,以及该范围的所有者都可以随时调用该范围的 shutdown() 方法,以表明任务已完成 —— 即使其他子任务仍在执行中。shutdown() 方法会中断仍在执行子任务的线程,并导致 join()joinUntil(Instant) 方法返回。因此,所有子任务的编写方式都应能够响应中断。在调用 shutdown() 后分叉的新子任务将处于 UNAVAILABLE 状态,并且不会运行。实际上,shutdown() 是顺序代码中 break 语句的并发类比。

在作用域内调用 join()joinUntil(Instant) 是强制性的。如果作用域的代码块在加入(join)之前退出,那么该作用域将会等待所有子任务终止,然后抛出一个异常。

一个作用域的拥有线程有可能在加入之前或加入过程中被中断。例如,它可能是已被关闭的封闭作用域的一个子任务。如果发生这种情况,join()joinUntil(Instant) 将抛出异常,因为继续执行已无意义。随后,try-with-resources 语句将关闭该作用域,这会取消所有子任务并等待它们终止。这会产生自动将任务的取消传播到其子任务的效果。如果 joinUntil(Instant) 方法的截止时间在子任务终止或调用 shutdown() 之前到期,那么它将抛出异常,并且同样地,try-with-resources 语句将关闭该作用域。

join() 成功完成时,每个子任务要么已成功完成,要么失败,要么因为作用域被关闭而被取消。

一旦加入,作用域的所有者将处理失败的子任务,并处理成功完成的子任务的结果;这通常由关闭策略完成(参见下文)。成功完成的任务的结果可以通过 Subtask.get() 方法获取。get() 方法永远不会阻塞;如果在加入之前错误地调用它,或者子任务未成功完成时,它会抛出 IllegalStateException 异常。

在作用域中分叉的子任务会继承 ScopedValue 绑定(JEP 446)。如果某个作用域的所有者从绑定的 ScopedValue 中读取了一个值,那么每个子任务都会读取到相同的值。

如果一个作用域的所有者本身是现有作用域的子任务,即它是作为分叉的子任务创建的,那么该作用域将成为新作用域的父作用域。因此,作用域和子任务形成了一棵树。

StructuredTaskScope 的结构化使用在运行时强制执行。例如,尝试从不属于该作用域树层次结构中的线程调用 fork(Callable) —— 即,拥有者、子任务以及嵌套作用域中的子任务(子子任务)—— 将因异常而失败。在 try-with-resources 块之外使用作用域,并且在未调用 close() 或未保持正确的 close() 调用嵌套顺序的情况下返回,可能会导致该作用域的方法抛出 StructureViolationException 异常。

StructuredTaskScope 对并发操作强制实施结构和顺序。因此它没有实现 ExecutorServiceExecutor 接口,因为这些接口的实例通常以非结构化的方式使用(见下文)。然而,将使用 ExecutorService 的代码迁移到 StructuredTaskScope 是非常简单的,特别是那些能从结构化中受益的代码。

在实践中,大多数 StructuredTaskScope 的使用都不会直接利用 StructuredTaskScope 类,而是使用下一节中描述的两个实现关闭策略的子类之一。在其他场景中,用户可能会编写自己的子类来实现自定义的关闭策略。

关闭策略

在处理并发子任务时,通常会使用短路模式来避免不必要的工作。例如,如果其中一个子任务失败(即,invoke all),或者其中一个子任务成功(即,invoke any),取消所有子任务有时是有意义的。StructuredTaskScope 的两个子类 ShutdownOnFailureShutdownOnSuccess 支持这些模式,分别在第一个子任务失败或成功时关闭作用域。

关闭策略还提供了集中式的方法来处理异常,以及可能的成功结果。这与结构化并发的精神是一致的,根据这种精神,整个作用域被视为一个单元。

下面是一个带有故障时关闭策略的 StructuredTaskScope(在上面的 handle() 示例中也使用了该策略),它会并发运行一组任务,如果其中任何一个任务失败,则整体失败:

<T> List<T> runAll(List<Callable<T>> tasks) 
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<? extends Supplier<T>> suppliers = tasks.stream().map(scope::fork).toList();
scope.join()
.throwIfFailed(); // Propagate exception if any subtask fails
// Here, all tasks have succeeded, so compose their results
return suppliers.stream().map(Supplier::get).toList();
}
}

下面是一个具有成功后关闭策略的 StructuredTaskScope,它会返回第一个成功子任务的结果:

<T> T race(List<Callable<T>> tasks, Instant deadline) 
throws InterruptedException, ExecutionException, TimeoutException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
for (var task : tasks) {
scope.fork(task);
}
return scope.joinUntil(deadline)
.result(); // Throws if none of the subtasks completed successfully
}
}

一旦一个子任务成功,此作用域就会自动关闭,取消未完成的子任务。如果所有子任务都失败或者超过了给定的截止时间,则任务失败。例如,在需要从一组冗余服务中获取结果的服务器应用程序中,这种模式可能会很有用。

虽然提供了这两种关闭策略,但开发人员可以创建抽象其他模式的自定义策略(参见下方)。

处理结果

在通过关闭策略(例如,使用 ShutdownOnFailure::throwIfFailed) 联合处理并集中处理异常之后,如果子任务的结果未被策略处理(例如,通过 ShutdownOnSuccess::result()),作用域的所有者可以使用从调用 fork(...) 返回的 Subtask 对象来处理子任务的结果。

通常,范围所有者调用的唯一 Subtask 方法是 get() 方法。所有其他 Subtask 方法通常只会在自定义关闭策略(参见下文)的 handleComplete(...) 方法实现中使用。实际上,我们建议将引用由 fork(...) 返回的 Subtask 的变量类型定义为例如 Supplier<String> 而不是 Subtask<String>(当然,除非你选择使用 var)。如果关闭策略本身处理子任务结果 —— 例如在 ShutdownOnSuccess 的情况下 —— 那么应完全避免使用由 fork(...) 返回的 Subtask 对象,并将 fork(...) 方法视为返回 void。子任务应将其结果作为范围所有者在策略集中异常处理后需要处理的任何信息返回。

如果作用域所有者处理子任务异常以生成复合结果,而不是使用关闭策略,那么异常可以作为值从子任务中返回。例如,下面是一个并行运行任务列表的方法,它返回一个包含每个任务相应成功或异常结果的已完成 Future 列表:

<T> List<Future<T>> executeAll(List<Callable<T>> tasks)
throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<? extends Supplier<Future<T>>> futures = tasks.stream()
.map(task -> asFuture(task))
.map(scope::fork)
.toList();
scope.join();
return futures.stream().map(Supplier::get).toList();
}
}

static <T> Callable<Future<T>> asFuture(Callable<T> task) {
return () -> {
try {
return CompletableFuture.completedFuture(task.call());
} catch (Exception ex) {
return CompletableFuture.failedFuture(ex);
}
};
}

自定义关闭策略

StructuredTaskScope 可以被扩展,并且其受保护的 handleComplete(...) 方法可以被重写,以实现不同于 ShutdownOnSuccessShutdownOnFailure 的策略。例如,子类可以:

  • 收集成功完成的子任务的结果,并忽略失败的子任务,
  • 在子任务失败时收集异常,或者
  • 调用 shutdown() 方法进行关闭,并在出现某种条件时使 join() 唤醒。

当一个子任务完成时,即使在调用 shutdown() 之后,它也会作为 Subtask 被报告给 handleComplete(...) 方法:

public sealed interface Subtask<T> extends Supplier<T> {
enum State { SUCCESS, FAILED, UNAVAILABLE }

State state();
Callable<? extends T> task();
T get();
Throwable exception();
}

在调用 shutdown() 之前,对于已经完成的子任务(无论是成功完成即 SUCCESS 状态,还是失败完成即 FAILED 状态),都会调用 handleComplete(...) 方法。只有当子任务处于 SUCCESS 状态时才可以调用 get() 方法,而只有当子任务处于 FAILED 状态时才可以调用 exception() 方法;在其他情况下调用 get()exception() 将导致抛出 IllegalStateException 异常。UNAVAILABLE 状态表示以下情况之一:(1) 子任务已分叉但尚未完成;(2) 子任务在关闭后完成,或者 (3) 子任务在关闭后分叉,因此尚未开始。对于处于 UNAVAILABLE 状态的子任务,永远不会调用 handleComplete(...) 方法。

子类通常会定义一些方法,以在 join() 方法返回后,向后续执行的代码提供结果、状态或其他输出。一个收集结果并忽略失败子任务的子类可以定义一个返回结果集合的方法。一个在子任务失败时实施关闭策略的子类可以定义一个方法,以获取第一个失败子任务的异常。

下面是一个 StructuredTaskScope 子类的示例,它收集成功完成的子任务的结果。它定义了 results() 方法,供主任务用来检索结果。

class MyScope<T> extends StructuredTaskScope<T> {

private final Queue<T> results = new ConcurrentLinkedQueue<>();

MyScope() { super(null, Thread.ofVirtual().factory()); }

@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS)
results.add(subtask.get());
}

@Override
public MyScope<T> join() throws InterruptedException {
super.join();
return this;
}

// Returns a stream of results from the subtasks that completed successfully
public Stream<T> results() {
super.ensureOwnerAndJoined();
return results.stream();
}

}

此自定义策略可以这样使用:

<T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {
try (var scope = new MyScope<T>()) {
for (var task : tasks) scope.fork(task);
return scope.join()
.results().toList();
}
}

扇入场景

上面的示例聚焦于扇出场景,即管理多个并发传出 I/O 操作的场景。StructuredTaskScope 在管理多个并发传入 I/O 操作的扇入场景中也非常有用。在这样的场景中,我们通常会根据传入的请求创建未知数量的子任务。

下面是一个在 StructuredTaskScope 中派生子任务来处理传入连接的服务器示例:

void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
try (var scope = new StructuredTaskScope<Void>()) {
try {
while (true) {
var socket = serverSocket.accept();
scope.fork(() -> handle(socket));
}
} finally {
// If there's been an error or we're interrupted, we stop accepting
scope.shutdown(); // Close all active connections
scope.join();
}
}
}

从并发的角度来看,这个场景的不同之处不在于请求的方向,而在于任务的持续时间和数量。在这里,与前面的示例不同,作用域的所有者在其持续时间上是无界的 —— 它只会在被中断时停止。子任务的数量也是未知的,因为它们是动态地响应外部事件而分叉的。

所有连接处理的子任务都是在作用域内创建的,因此在线程转储中很容易看出它们的用途,线程转储会将它们显示为作用域所有者的子级。整个服务作为一个单元也很容易关闭。

可观察性

我们扩展了 JEP 444 添加的新的 JSON 线程转储格式,以展示 StructuredTaskScope 将线程分组为层次结构的方式:

$ jcmd <pid> Thread.dump_to_file -format=json <file>

每个作用域的 JSON 对象包含该作用域中分叉出的线程及其堆栈跟踪的数组。一个作用域的拥有线程通常会阻塞在 join 方法中,等待子任务完成;线程转储通过展示结构化并发施加的树状层次结构,可以轻松查看子任务线程正在做什么。一个作用域的 JSON 对象还包含对其父作用域的引用,以便可以从转储中重建程序的结构。

com.sun.management.HotSpotDiagnosticsMXBean API 也可用于生成此类线程转储,既可以直接生成,也可以通过平台 MBeanServer 以及本地或远程 JMX 工具间接生成。

为什么 fork(...) 不返回 Future

StructuredTaskScope API 处于孵化阶段时,fork(...) 方法返回的是一个 Future。这通过使 fork(...) 类似于现有的 ExecutorService::submit 方法,提供了一种熟悉感。然而,鉴于 StructuredTaskScope 的使用方式与 ExecutorService 不同——如上所述,是以一种结构化的方式使用的——使用 Future 带来的困惑多于清晰。

  • 通常使用 Future 的方式是调用其 get() 方法,该方法会阻塞直到结果可用。但在 StructuredTaskScope 的上下文中,以这种方式使用 Future 不仅不被推荐,而且适得其反。结构化的 Future 对象应该仅在 join() 返回后进行查询,此时可以确定它们已完成或被取消,并且应使用的方法不是常见的 get(),而是新引入的 resultNow(),它永远不会阻塞。

  • 一些开发者疑惑为什么 fork(...) 不返回功能更强大的 CompletableFuture 对象。由于 fork(...) 返回的 Future 应该仅在其已知完成之后使用,CompletableFuture 并未提供任何好处,因为它的高级功能仅对未完成的 future 有帮助。此外,CompletableFuture 是为异步编程范式设计的,而 StructuredTaskScope 鼓励的是阻塞范式。

    简而言之,FutureCompletableFuture 提供的自由度在结构化并发中反而会适得其反。

  • 结构化并发是将运行在不同线程中的多个任务视为一个工作单元,而 Future 在将多个任务视为独立任务时最有用。一个作用域应该只阻塞一次以等待其子任务的结果,然后集中处理异常。因此,在绝大多数情况下,从 fork(...) 返回的 Future 上唯一应调用的方法是 resultNow()。这与 Future 的常规用法相比是一个显著的变化,而 Future 接口在此上下文中成为了正确使用的干扰。

在当前的 API 中,Subtask::get() 的行为与 API 处于孵化阶段时的 Future::resultNow() 完全相同。

替代方案

  • 增强 ExecutorService 接口。我们原型化了该接口的一个实现,该实现始终强制执行结构并限制哪些线程可以提交任务。然而,我们发现这是有问题的,因为 JDK 和生态系统中大多数对 ExecutorService(及其父接口 Executor)的使用都不是结构化的。为一个受限得多的概念重用相同的 API 必然会引起混淆。例如,将一个结构化的 ExecutorService 实例传递给接受这种类型的现有方法,在大多数情况下几乎肯定会抛出异常。