JEP 428:结构化并发(孵化中)
概述
通过引入 结构化并发 的 API 来简化多线程编程。结构化并发将运行在不同线程中的多个任务视为一个单一的工作单元,从而简化错误处理和取消操作,提高可靠性,并增强可观测性。这是一个 孵化阶段的 API。
目标
-
提高多线程代码的可维护性、可靠性和可观测性。
-
推广一种并发编程风格,这种风格可以消除因取消和关闭操作而引发的常见风险,例如线程泄漏和取消延迟。
非目标
-
不打算替换
java.util.concurrent
包中的任何并发结构,例如ExecutorService
和Future
。 -
不打算为 Java 定义最终的结构化并发 API。其他的结构化并发结构可以由第三方库或在未来的 JDK 版本中定义。
-
不打算定义一种在线程之间共享数据流的机制(即 通道)。我们可能会在将来提出这样的目标。
-
不打算用新的线程取消机制替换现有的线程中断机制。我们可能会在将来提出这样的目标。
动机
开发者通过将任务分解为多个子任务来管理复杂性。在普通的单线程代码中,子任务是按顺序执行的。然而,如果子任务之间足够独立,并且硬件资源充足,那么通过并发执行子任务,可以加快整个任务的运行速度(即降低延迟)。例如,一个组合多个 I/O 操作结果的任务,如果每个 I/O 操作都在自己的线程中并发执行,那么该任务会运行得更快。虚拟线程(JEP 425)使得为每个这样的 I/O 操作分配一个线程变得经济高效,但管理由此可能产生的大量线程仍然是一个挑战。
使用 ExecutorService
的非结构化并发
Java 5 中引入的 java.util.concurrent.ExecutorService
API 可帮助开发人员并发执行子任务。
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = esvc.submit(() -> findUser());
Future<Integer> order = esvc.submit(() -> fetchOrder());
String theUser = user.get(); // Join findUser
int theOrder = order.get(); // Join fetchOrder
return new Response(theUser, theOrder);
}
由于子任务是并发执行的,每个子任务可以独立成功或失败。(在此上下文中,失败意味着抛出异常。)通常,像 handle()
这样的任务在任意一个子任务失败时也应该失败。当发生失败时,理解线程的生命周期可能会变得非常复杂:
-
如果
findUser()
抛出异常,那么在调用user.get()
时handle()
将抛出异常,但fetchOrder()
将继续在其自己的线程中运行。这是一种 线程泄漏,最好的情况下会浪费资源;最坏的情况下,fetchOrder()
线程将干扰其他任务。 -
如果执行
handle()
的线程被中断,该中断不会传播到子任务。findUser()
和fetchOrder()
两个线程都会泄漏,即使在handle()
失败后仍会继续运行。 -
如果
findUser()
执行时间过长,但在此期间fetchOrder()
失败,那么handle()
将通过阻塞在user.get()
上而无谓地等待findUser()
,而不是取消它。只有在findUser()
完成并且user.get()
返回后,order.get()
才会抛出异常,导致handle()
失败。
在每种情况下,问题在于我们的程序按照任务-子任务关系进行了逻辑结构化,但这些关系只存在于开发者的脑海中。这不仅为错误的产生提供了更多空间,还使得诊断和排查此类错误变得更加困难。例如,像线程转储这样的可观测性工具会在无关线程的调用栈中显示 handle()
、findUser()
和 fetchOrder()
,而没有任何关于任务-子任务关系的提示。
当发生错误时,我们可以通过显式取消其他子任务来尝试做得更好,例如使用 try-finally
包装任务,并在失败任务的 catch 块中调用其他任务的 futures 的 cancel(boolean)
方法。我们还需要在 try
-with-resources 语句 中使用 ExecutorService
,正如 JEP 425 中的示例所示,因为 Future
不提供等待已取消任务的方式。但所有这些操作都很难正确实现,并且常常使代码的逻辑意图更难以辨别。跟踪任务之间的关系,并手动添加所需的任务间取消关联,这对开发者来说要求很高。
这种需要手动协调生命周期的原因在于,ExecutorService
和 Future
允许无限制的并发模式。所涉及的任何线程都没有约束或顺序要求。一个线程可以创建一个 ExecutorService
,第二个线程可以向其提交任务,而执行任务的线程与第一个线程或第二个线程没有任何关系。此外,在一个线程提交任务后,另一个完全不同的线程可以等待执行结果。任何持有对 Future
引用的代码都可以加入它(即通过调用 get()
方法等待其结果),即使是获取该 Future
的线程之外的其他线程中的代码也可以这样做。实际上,由一个任务启动的子任务不必返回到提交它的任务。它可以返回到多个任务中的任何一个,甚至可能不返回。
由于 ExecutorService
和 Future
允许这种非结构化的使用方式,它们并不强制甚至不跟踪任务与子任务之间的关系,即使这些关系是常见且有用的。因此,即使子任务在同一个任务中被提交和合并,一个子任务的失败也不能自动导致另一个子任务的取消:在上述 handle()
方法中,fetchOrder()
的失败无法自动导致 findUser()
的取消。fetchOrder()
的 future 与 findUser()
的 future 毫无关联,而且两者都与最终通过其 get()
方法进行合并的线程无关。我们希望可靠地自动化这一过程,而不是要求开发人员手动管理此类取消操作。
任务结构应反映代码结构
与 ExecutorService
下各种自由组合的线程不同,单线程代码的执行始终强制任务和子任务之间的层级关系。一个方法的主体代码块 {...}
对应一个任务,而代码块中调用的方法则对应子任务。被调用的方法必须返回到调用它的方法,或者向其抛出异常。它不能比调用它的方法存在更久,也不能返回或向其他不同的方法抛出异常。因此,所有子任务都会在任务之前完成,每个子任务都是其父任务的子级,并且每个子任务相对于其他子任务和任务的生命周期由代码的语法块结构决定。
例如,在这个单线程版本的 handle()
中,任务与子任务的关系从语法结构上就可以明显看出:
Response handle() throws IOException {
String theUser = findUser();
int theOrder = fetchOrder();
return new Response(theUser, theOrder);
}
我们不会在 findUser()
子任务完成之前(无论是成功还是失败)启动 fetchOrder()
子任务。如果 findUser()
失败,我们将完全不会启动 fetchOrder()
,并且 handle()
任务会隐式失败。子任务只能返回给其父任务这一事实非常重要:这意味着父任务可以隐式地将某个子任务的失败视为触发取消所有剩余子任务的信号,然后自身也失败。
在单线程代码中,任务与子任务的层次结构在运行时会具体化为调用栈。因此,我们可以免费获得相应的父子关系,这些关系决定了错误传播的方式。当观察单个线程时,这种层次关系显而易见:findUser()
(以及后来的 fetchOrder()
)看起来是从属于 handle()
的。
如果任务及其子任务之间的父子关系能够像单线程代码一样在语法上表达并在运行时具体化,那么多线程编程将会变得更加容易、更加可靠且更可观测。语法结构将明确子任务的生命周期,并支持线程间层次结构的运行时表示形式,类似于线程内的调用栈。这种表示形式将支持错误传播和取消操作,同时也能对并发程序进行有意义的观测。
(Java 已经有一个用于对并发任务施加结构的 API,即 java.util.concurrent.ForkJoinPool
,它是并行流背后的执行引擎。然而,该 API 是为计算密集型任务设计的,而不是涉及 I/O 的任务。)
结构化并发
结构化并发 是一种多线程编程的方法,它保留了单线程代码的可读性、可维护性和可观测性。它体现了以下原则:
如果一个任务拆分为并发子任务,那么它们都会返回到同一个地方,即任务的代码块。
“结构化并发”一词由 Martin Sústrik 提出,并由 Nathaniel J. Smith 推广。其他语言中的思想,例如 Erlang 的分层监督器,为结构化并发中的错误处理设计提供了参考。
在结构化并发中,子任务代表一个任务进行工作。该任务等待子任务的结果并监控它们是否出现故障。与单线程代码中的结构化编程技术一样,多线程的结构化并发能力来自于两个理念:(1) 通过代码块的执行流程有明确的入口和出口点,以及 (2) 操作生命周期的严格嵌套方式,这种方式反映了它们在代码中的语法嵌套关系。
由于一段代码块的入口和出口是明确定义的,因此并发子任务的生命周期被限制在其父任务的语法块之内。由于同级子任务的生命周期嵌套在父任务的生命周期之中,它们可以作为一个整体来进行推理和管理。由于父任务的生命周期又依次嵌套在其父级任务的生命周期之内,运行时可以将任务的层次结构具体化为一棵树。这棵树是单线程调用栈的并发对应物,可观测性工具可以利用它来将子任务呈现为隶属于其父任务的状态。
结构化并发非常适合虚拟线程,虚拟线程是由 JDK 实现的轻量级线程。许多虚拟线程共享同一个操作系统线程,从而允许存在大量的虚拟线程。除了数量众多之外,虚拟线程的成本也足够低,足以表示任何并发的行为单元,即使是涉及 I/O 的行为也可以胜任。这意味着服务器应用程序可以使用结构化并发一次处理成千上万甚至数百万个传入请求:它可以为每个请求的处理任务分配一个新的虚拟线程,并且当一个任务通过提交子任务进行并发执行时,它还可以为每个子任务分配一个新的虚拟线程。在幕后,任务与子任务之间的关系通过一种树形结构具体化,方法是让每个虚拟线程携带对其唯一父线程的引用,这类似于调用栈中的一个栈帧指向其唯一的调用者。
总之,虚拟线程提供了大量的线程。结构化并发确保它们得到正确且稳健的协调,并使可观测性工具能够按照开发者所理解的方式显示线程。在 JDK 中拥有一个用于结构化并发的 API 将提高服务器应用程序的可维护性、可靠性和可观测性。
描述
结构化并发 API 的主要类是 StructuredTaskScope
。这个类允许开发者将一个任务组织为一组并发的子任务,并将它们作为一个整体进行协调。子任务通过单独 forking(分叉)到自己的线程中执行,然后以整体的形式 joining(合并),并且可以选择整体取消。子任务的成功结果或异常会被父任务汇总并处理。StructuredTaskScope
将子任务(或称 forks)的生命周期限定在一个明确的 词法作用域 中,在此范围内完成任务与其子任务之间的所有交互 —— 包括分叉、合并、取消、错误处理和结果组合等操作。
以下是之前使用 StructuredTaskScope
编写的 handle()
示例(ShutdownOnFailure
将在下方解释):
Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());
scope.join(); // Join both forks
scope.throwIfFailed(); // ... and propagate errors
// Here, both forks have succeeded, so compose their results
return new Response(user.resultNow(), order.resultNow());
}
}
与原始示例相比,理解此处涉及的线程生命周期非常容易:在所有条件下,它们的生命周期都被限制在一个词法作用域内,即 try
-with-resources 语句的主体。此外,使用 StructuredTaskScope
确保了许多有价值的特性:
-
通过短路进行错误处理 — 如果
findUser()
或fetchOrder()
子任务中的任何一个失败,另一个任务如果尚未完成的话将被取消。(这是由ShutdownOnFailure
实现的取消策略管理的;也可能存在其他策略)。 -
取消传播 — 如果运行
handle()
的线程在调用join()
之前或期间被中断,当线程退出作用域时,两个分支会自动取消。 -
清晰性 — 上述代码具有清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功(并处理已经完成的子任务结果)还是失败(子任务已经完成,因此无需进一步清理)。
-
可观测性 — 正如下文 所述,线程转储清楚地展示了任务层次结构,其中运行
findUser()
和fetchOrder()
的线程显示为作用域的子线程。
像 ExecutorService.submit(...)
一样,StructuredTaskScope.fork(...)
方法接收一个 Callable
并返回一个 Future
。然而,与 ExecutorService
不同的是,返回的 future 并不打算通过其 get()
方法来获取结果,也不通过其 cancel()
方法来取消任务。相反,一个作用域中的所有分支(forks)是作为一个整体来进行合并或取消的。两个新的 Future
方法 resultNow()
和 exceptionNow()
被设计为在子任务完成后使用,例如在调用 scope.join()
后。
使用 StructuredTaskScope
使用 StructuredTaskScope
的代码的一般工作流程如下:
-
创建一个作用域。创建该作用域的线程是其 拥有者。
-
在作用域中分叉并发子任务。
-
作用域中的任何分叉或作用域的拥有者都可以调用作用域的
shutdown()
方法,以请求取消所有剩余的子任务。 -
作用域的拥有者将作用域作为一个整体加入,即加入所有的分叉。拥有者可以调用作用域的
join()
方法,该方法会阻塞,直到所有分叉已完成(成功或失败)或通过shutdown()
被取消。或者,拥有者可以调用作用域的joinUntil(java.time.Instant)
方法,该方法接受一个截止时间。 -
加入后,处理分叉中的任何错误并处理它们的结果。
-
关闭作用域,通常通过
try
-with-resources 隐式完成。这将关闭作用域并等待任何滞后的分叉完成。
如果所有者是现有作用域的成员(即,作为其中一个分支创建的),那么该作用域将成为新作用域的父级。任务因此形成了一棵树,作用域作为中间节点,而线程作为叶节点。
每个分支(fork)都在其自身新创建的线程中运行,默认情况下该线程是一个虚拟线程。分支的线程由其作用域(scope)拥有,而作用域又由创建它的线程拥有,从而形成一个层级结构。任何分支都可以创建自己的嵌套 StructuredTaskScope
以分叉其自身的子任务,从而扩展该层级结构。这一层级结构反映在代码的块结构中,它限制了分支的生命周期:一旦作用域关闭,所有分支的线程都保证已终止,并且在块退出时不会留下任何线程。
作用域中的任何分叉、嵌套作用域中的任何分叉,以及作用域的所有者都可以随时调用作用域的 shutdown()
方法来表示任务已完成 —— 即使其他分叉仍在运行。shutdown()
方法会中断作用域中所有仍在活动的分叉线程。因此,所有分叉都应该以能够响应中断的方式编写。实际上,shutdown()
是顺序代码中 break
语句的并发类比。
当 join()
返回时,所有分叉(fork)都已完成(成功或失败)或已被取消。可以通过它们的 futures 的 resultNow()
或 exceptionNow()
方法获取其结果或异常,并且无需额外的阻塞。(如果在 future 完成之前调用这些方法,会抛出 IllegalStateException
异常。)
在作用域内调用 join()
或 joinUntil()
是强制性的。如果作用域的代码块在执行 join 操作前就退出了,那么该作用域将会等待所有分叉(fork)终止,然后抛出一个异常。
一个作用域的拥有线程可能在加入之前或期间被中断。例如,它可能是已被关闭的封闭作用域的一个分支。如果发生这种情况,join()
和 joinUntil(Instant)
将抛出异常,因为继续执行已无意义。随后,try
-with-resources 语句将关闭该作用域,这将取消所有分支并等待它们终止。这会产生自动将任务的取消传播到其子任务的效果。如果 joinUntil(Instant)
方法的截止时间在分支终止或调用 shutdown()
之前到期,则它将抛出异常,并且再次由 try
-with-resources 语句来关闭该作用域。
StructuredTaskScope
的结构化使用在运行时会被强制执行。例如,尝试从一个不在该作用域树层次结构中的线程调用 fork(Callable)
—— 即,拥有者、派生线程以及嵌套作用域中的派生线程之外的线程 —— 将会因异常而失败。在 try
-with-resources 块之外使用作用域,并且在未调用 close()
或未保持正确的 close()
调用嵌套顺序的情况下返回,可能会导致该作用域的方法抛出 StructureViolationException
异常。
StructuredTaskScope
对并发操作施加了结构和顺序。因此,它没有实现 ExecutorService
或 Executor
接口,因为这些接口的实例通常以非结构化的方式使用(见下文)。然而,将使用 ExecutorService
的代码迁移到 StructuredTaskScope
是非常简单的,特别是那些能从结构化中受益的代码。
StructuredTaskScope 位于一个 孵化模块 中,默认情况下被排除
上面的示例使用了 StructuredTaskScope
API,因此要在 JDK XX 上运行它们,必须添加 jdk.incubator.concurrent
模块,并且还要启用预览功能以开启虚拟线程:
# 添加模块和启用预览功能的示例命令
java --add-modules jdk.incubator.concurrent --enable-preview YourClass.java
-
使用
javac --release XX --enable-preview --add-modules jdk.incubator.concurrent Main.java
编译程序,并使用java --enable-preview --add-modules jdk.incubator.concurrent Main
运行它;或者, -
当使用 源代码启动器 时,使用
java --source XX --enable-preview --add-modules jdk.incubator.concurrent Main.java
运行程序;或者, -
当使用 jshell 时,使用
jshell --enable-preview --add-modules jdk.incubator.concurrent
启动它。
关闭政策
在处理并发子任务时,通常会使用短路模式来避免不必要的工作。例如,有时如果其中一个子任务失败(即 invoke all),或者其中一个子任务成功(即 invoke any),取消所有子任务是有意义的。StructuredTaskScope
的两个子类 ShutdownOnFailure
和 ShutdownOnSuccess
支持这些模式,分别在首次分叉失败或成功时关闭作用域的策略。它们还提供了用于处理异常和成功结果的方法。
下面是一个带有故障关闭策略的 StructuredTaskScope
(在上面的 handle()
示例中也使用了该策略),它会并发运行一组任务,如果其中任何一个任务失败,则整体失败:
下面是一个带有故障关闭策略的 `StructuredTaskScope`(在上面的 `handle()` 示例中也使用了该策略),它会并发运行一组任务,如果其中任何一个任务失败,则整体失败:
<T> List<T> runAll(List<Callable<T>> tasks) throws Throwable {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<Future<T>> futures = tasks.stream().map(scope::fork).toList();
scope.join();
scope.throwIfFailed(e -> e); // Propagate exception as-is if any fork fails
// Here, all tasks have succeeded, so compose their results
return futures.stream().map(Future::resultNow).toList();
}
}
下面是一个带有 成功即关闭(shutdown-on-success) 策略的 StructuredTaskScope
,它会返回第一个成功子任务的结果:
<T> T race(List<Callable<T>> tasks, Instant deadline) throws ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
for (var task : tasks) {
scope.fork(task);
}
scope.joinUntil(deadline);
return scope.result(); // Throws if none of the forks completed successfully
}
}
一旦某个分支成功,此作用域会自动关闭,并取消剩余的活动分支。如果所有分支都失败或者超过了给定的截止时间,任务就会失败。例如,在需要从一组冗余服务中获取结果的服务器应用程序中,这种模式可能会很有用。
虽然提供了这两种关闭策略,但开发者可以通过扩展 StructuredTaskScope
并重写 handleComplete(Future)
方法来创建抽象其他模式的自定义策略。
扇入场景
上面的示例聚焦于 扇出 场景,该场景管理多个并发的传出 I/O 操作。StructuredTaskScope
在管理多个并发传入 I/O 操作的 扇入 场景中也非常有用。在这样的场景中,我们通常会根据传入的请求创建未知数量的分支。以下是一个服务器的示例,它在 StructuredTaskScope
内部分支子任务以处理传入的连接:
void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
try (var scope = new StructuredTaskScope<Void>()) {
try {
while (true) {
var socket = serverSocket.accept();
scope.fork(() -> handle(socket));
}
} finally {
// If there's been an error or we're interrupted, we stop accepting
scope.shutdown(); // Close all active connections
scope.join();
}
}
}
因为所有连接处理子任务都是在作用域内创建的,线程转储会将它们显示为作用域所有者的子级。
可观察性
我们扩展了 JEP 425 添加的新的 JSON 线程转储格式,以展示 StructuredTaskScope
将线程分组为层次结构的方式:
$ jcmd <pid> Thread.dump_to_file -format=json <file>
每个作用域的 JSON 对象包含在该作用域中分叉的线程及其堆栈跟踪的数组。一个作用域的拥有线程通常会阻塞在 join
方法中,等待子任务完成;线程转储通过展示结构化并发所施加的树状层次结构,可以轻松查看子任务线程正在执行的操作。作用域的 JSON 对象还包含对其父作用域的引用,以便可以从转储中重建程序的结构。
com.sun.management.HotSpotDiagnosticsMXBean
API 也可以用来生成此类线程转储,可以直接生成,也可以通过平台 MBeanServer
以及本地或远程 JMX 工具间接生成。
替代方案
-
什么都不做。让开发者继续使用现有的低级
java.util.concurrent
API,并且继续需要仔细考虑并发代码中出现的所有异常情况和生命周期协调问题。 -
增强
ExecutorService
接口。我们为该接口制作了一个原型实现,该实现始终强制执行结构化,并限制哪些线程可以提交任务。然而,我们发现这存在一些问题,因为在 JDK 和生态系统中,大多数ExecutorService
(及其父接口Executor
)的使用并不是结构化的。为一个限制性更强的概念重用相同的 API 必然会引起混淆。例如,在大多数情况下,将一个结构化的ExecutorService
实例传递给接受此类型的现有方法几乎肯定会抛出异常。