JEP 428:结构化并发(孵化器)
概括
_通过引入结构化_并发 API 来简化多线程编程。结构化并发将在不同线程中运行的多个任务视为单个工作单元,从而简化错误处理和取消、提高可靠性并增强可观察性。这是一个正在孵化的API。
目标
-
提高多线程代码的可维护性、可靠性和可观察性。
-
推广并发编程风格,可以消除因取消和关闭而产生的常见风险,例如线程泄漏和取消延迟。
非目标
-
java.util.concurrent
替换包中的任何并发结构(例如ExecutorService
和 )并不是目标Future
。 -
为 Java 定义明确的结构化并发 API 并不是我们的目标。其他结构化并发构造可以由第三方库或在未来的 JDK 版本中定义。
-
定义在线程(即通道)之间共享数据流的方法并不是目标。我们可能会建议将来这样做。
-
用新的线程取消机制取代现有的线程中断机制并不是目标。我们可能会建议将来这样做。
动机
开发人员通过将任务分解为多个子任务来管理复杂性。在普通的单线程代码中,子任务顺序执行。然而,如果子任务彼此充分独立,并且如果有足够的硬件资源,则可以通过同时执行子任务来使整体任务运行得更快(即,具有更 低的延迟)。例如,如果每个 I/O 操作在自己的线程中并发执行,则由多个 I/O 操作组成的任务将运行得更快。虚拟线程 ( JEP 425 ) 使为每个此类 I/O 操作专用一个线程变得经济高效,但管理可能产生的大量线程仍然是一个挑战。
非结构化并发ExecutorService
java.util.concurrent.ExecutorService
Java 5 中引入的 API 可以帮助开发人员同时执行子任务。
例如,这里有一个方法handle()
,它代表服务器应用程序中的任务。它通过将两个子任务提交到ExecutorService
.一个子任务执行该方法findUser()
,另一个子任务执行该方法fetchOrder()
。立即为每个子任务ExecutorService
返回一个Future
,并在其自己的线程中执行每个子任务。该handle()
方法通过阻止调用子任务的 futureget()
方法来等待子任务的结果,因此该任务被称为_加入_其子任务。
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = esvc.submit(() -> findUser());
Future<Integer> order = esvc.submit(() -> fetchOrder());
String theUser = user.get(); // Join findUser
int theOrder = order.get(); // Join fetchOrder
return new Response(theUser, theOrder);
}
由于子任务是并发执行的,因此每个子任务可以独立成功或失败。 (在此上下文中,失败意味着抛出异常。 )通常,handle()
如果某个任务的任何子任务失败,则此类任务应该失败。当发生故障时,了解线程的生命周期可能会非常复杂:
-
如果
findUser()
抛出异常,那么handle()
在调用时会抛出异常user.get()
,但fetchOrder()
会继续在自己的线程中运行。这是一个_线程泄漏_,充其量只是浪费资源;最坏的情况是,该fetchOrder()
线程会干扰其他任务。 -
如果线程执行
handle()
被中断,中断不会传播到子任务。findUser()
和线程都会泄漏,即使失败fetchOrder()
后也会继续运行。handle()
-
如果
findUser()
执行时间较长,但fetchOrder()
同时失败,则将通过阻塞而不是取消来handle()
不必要地等待。只有完成并返回后才会抛出异常,导致失败。findUser()``user.get()``findUser()``user.get()``order.get()``handle()
在每种情况下,问题在于我们的程序在逻辑上是由任务子任务关系构成的,但这些关系只存在于开发人员的脑海中。这不仅会产生更多的错误空间,而且会使诊断和排除此类错误变得更加困难。例如,线程转储等可观察性工具将在不相关线程的调用堆栈上显示handle()
、findUser()
、 和fetchOrder()
,而不会提示任务-子任务关系。
我们可能会尝试通过在发生错误时显式取消其他子任务来做得更好,例如,通过在失败任务的 catch 块中包装任务try-finally
并调用cancel(boolean)
其他任务的 future 方法。我们还需要使用ExecutorService
内部的try
-with-resources 语句,如JEP 425中的示例所示,因为Future
它没有提供等待已取消任务的方法。但所有这些都很难正确执行,并且常常使代码的逻辑意图更难以辨别。跟踪任务间关系,并手动添加回所需的任务间取消边,是许多开发人员面临的问题。
需要手动协调生命周期是因为允许ExecutorService
不受限制Future
的并发模式。对所涉及的任何线程都没有限制或顺序。一个线程可以创建ExecutorService
,第二个线程可以向其提交工作,并且执行该工作的线程与第一个或第二个线程没有任何关系。此外,在一个线程提交工作后,另一个完全不同的线程可以等待执行结果。任何引用 a 的代码Future
都可以加入它(即,通过调用 等待其结果get()
),甚至是获取 .a 的线程以外的线程中的代码Future
。实际上,由一个任务启动的子任务不必返回到提交它的任务。它可以返回许多任务中的任何一个,甚至不返回任何任务。
因为ExecutorService
并Future
允许这种非结构化使用,所以它们不强制甚至不跟踪任务和子任务之间的关系,即使这样的关系是常见且有用的。因此,即使在同一个任务中提交并加入子任务,一个子任务的失败也不会自动导致另一个子任务的取消:在上述handle()
方法中, 的失败fetchOrder()
不会自动导致 的取消findUser()
。 for 的未来fetchOrder()
与 for 的未 来无关findUser()
,也与最终通过其方法加入它的线程无关get()
。我们不想要求开发人员手动管理此类取消,而是希望可靠地实现自动化。
任务结构应反映代码结构
与 下的随心所欲的线程分类相反ExecutorService
,单线程代码的执行始终强制执行任务和子任务的层次结构。{...}
方法的主体块对应于任务,块内调用的方法对应于子任务。被调用的方法必须返回调用它的方法,或者向调用它的方法抛出异常。它不能比调用它的方法寿命更长,也不能向其他方法返回或抛出异常。因此,所有子任务都在该任务之前完成,每个子任务都是其父任务的子任务,并且每个子任务相对于其他子任务和任务的生命周期由代码的语法块结构控制。
例如,在这个单线程版本中,handle()
任务-子任务关系从语法结构中显而易见:
Response handle() throws IOException {
String theUser = findUser();
int theOrder = fetchOrder();
return new Response(theUser, theOrder);
}
在子任务完成fetchOrder()
之前,无论成功还是不成功,我们都不会启动子任务。findUser()
如果findUser()
失败,那么我们fetchOrder()
根本不会开始,并且handle()
任务隐式失败。子任务只能返回其父任务这一事实很重要:这意味着父任务可以隐式地将一个子任务的失败视为取消所有剩余子任务的触发器,然后自行失败。
在单线程代码中,任务-子任务层次结构在运行时在调用堆栈中具体化。因此,我们免费获得了相应的控制错误传播的父子关系。当观察单个线程时,层次关系很明显:(findUser()
以及后来的fetchOrder()
)出现从属于handle()
。
如果任务及其子任务之间的父子关系能够在语法上表达并在运行时具体化,那么多线程编程将会更容易、更可靠、更容易观察——就像单线程代码一样。语法结构将描述子任务的生命周期,并启用线程间层次结构的运行时表示,类似于线程内调用堆栈。该表示将实现错误传播和取消以及对并发程序的有意义的观察。
(Java 已经有一个用于对并发任务强加结构的 API,即java.util.concurrent.ForkJoinPool
,它是并行流背后的执行引擎。但是,该 API 是为计算密集型任务而不是涉及 I/O 的任务设计的。)
结构化并发
_结构化并发_是一种多线程编程方法,它保留了单线程代码的可读性、可维护性和可观察性。它体现的原则是
如果一个任务分成并发的子任务,那么它们都返回到同一个地方,即任务的代码块。
“结构化并发”一词由Martin Sústrik创造,并由Nathaniel J. Smith推广。来自其他语言的思想,例如 Erlang 的分层管理器,为结构化并发中的错误处理设计提供了信息。
在结构化并发中,子任务代表任务工作。该任务等待子任务的结果并监视它们是否失败。与单线程中代码的结构化编程技术一样,多线程结构化并发的强大功能来自两个想法:(1) 代码块执行流的明确定义的入口点和出口点,以及 (2)操作生命周期的严格嵌套,其方式反映了它们在代码中的语法嵌套。
由于代码块的入口点和出口点已明确定义,因此并发子任务的生命周期仅限于其父任务的语法块。由于同级子任务的生命周期嵌套在其父任务的生命周期内,因此可以将它们作为一个单元进行推理和管理。因为父任务的生命周期又嵌套在其父任务的生命周期内,所以运行时可以将任务的层次结构具体化为 树。该树是单个线程的调用堆栈的并发对应部分,可观察性工具可以使用它来将子任务呈现为从属于其父任务。
结构化并发非常适合虚拟线程,虚拟线程是 JDK 实现的轻量级线程。许多虚拟线程共享相同的操作系统线程,从而允许大量虚拟线程。除了数量充足之外,虚拟线程还足够便宜,可以表示任何并发的行为单元,甚至涉及 I/O 的行为。这意味着服务器应用程序可以使用结构化并发来同时处理数千或数百万个传入请求:它可以专用一个新的虚拟线程来处理每个请求的任务,并且当任务通过提交子任务进行并发执行而扇出时,它可以可以为每个子任务分配一个新的虚拟线程。在幕后,通过安排每个虚拟线程携带对其唯一父级的引用,将任务-子任务关系具体化为树,类似于调用堆栈中的帧引用其唯一调用者的方式。
总之,虚拟线程提供了大量的线程。结构化并发确保它们正确且稳健地协调,并使可观察性工具能够按照开发人员理解的方式显示线程。在 JDK 中拥有用于结构化并发的 API 将提高服务器应用程序的可维护性、可靠性和可观察性。
描述
结构化并发 API 的主要类是StructuredTaskScope
.此类允许开发人员将任务构建为一系列并发子任务,并将它们作为一个单元进行协调。子任务在它们自己的线程中执行,方法是单独_分叉_它们,然后将它们_连接_为一个单元,并且可能将它们作为一个单元取消。子任务的成功结果或异常由父任务聚合和处理。StructuredTaskScope
将子任务或_fork_的生命周期限制在一个明确的词法范围内,任务与其子任务的所有交互(分叉、加入、取消、处理错误和组合结果)都在该范围内发生。
这是handle()
之前的示例,编写用于使用StructuredTaskScope
(下面ShutdownOnFailure
解释):
Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());
scope.join(); // Join both forks
scope.throwIfFailed(); // ... and propagate errors
// Here, both forks have succeeded, so compose their results
return new Response(user.resultNow(), order.resultNow());
}
}
try
与原始示例相反,理解这里涉及的线程的生命周期很容易:在所有条件下,它们的生命周期都限制在词法范围内,即-with-resources 语句的主体。此外,使用StructuredTaskScope
确保了许多有价值的特性:
-
短路错误处理— 如果一个
findUser()
或fetchOrder()
一个子任务失败,则另一个尚未完成的任务将被取消。 (这是由 实施的取消政策管理的ShutdownOnFailure
;也可能有其他政策)。 -
取消传播——如果线程运行
handle()
在调用之前或期间被中断join()
,则当线程退出作用域时,两个分叉都会自动取消。 -
清晰性——上面的代码有一个清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功(并处理已经完成的子任务的结果)还是失败(和子任务已经完成,所以没有什么需要清理的)。
-
可观察性——线程转储,如下所述,清楚地显示任务层次结构,其中线程正在运行
findUser()
并fetchOrder()
显示为范围的子级。
例如ExecutorService.submit(...)
,该StructuredTaskScope.fork(...)
方法接受 aCallable
并返回 a Future
。然而,与 不同的是ExecutorService
,返回的 future 并不打算通过其方法加入get()
或通过其cancel()
方法取消。相反,范围内的所有分叉都旨在作为一个单元加入或取消。两个新Future
方法resultNow()
和exceptionNow()
被设计为在子任务完成后使用,例如在调用 后scope.join()
。
使用StructuredTaskScope
代码使用的大致流程StructuredTaskScope
如下:
-
创建一个范围。创建作用域的线程是它的_所有者_。
-
在范围内分叉并发子任务。
-
作用域中的任何分支或作用域的所有者都可以调用作用域的
shutdown()
方法来请求取消所有剩余的子任务。 -
范围的所有者将范围(即其所有分叉)作为一个单元加入。所有者可以调用作用域的
join()
方法,该方法会阻塞,直到所有分叉完成(成功或失败)或通过 取消shutdown()
。或者,所有者可以调用范围的joinUntil(java.time.Instant)
方法,该方法接受截止日期。 -
加入后,处理分叉中的任何错误并处理其结果。
-
关闭范围,通常通过
try
-with-resources 隐式关闭。这将关闭作用域并等待任何落后的分叉完成。
如果所有者是现有作用域的成员(即,作为一个作用域的分支创建),则该作用域将成为新作用域的父作用域。因此,任务形成一棵树,范围作为中间节点,线程作为叶子。
每个 fork 都在自己新创建的线程中运行,默认情况下该线程是虚拟线程。分叉的线程由作用域拥有,而作用域又由其创建线程拥有,从而形成层次结构。任何 fork 都可以创建自己的嵌套StructuredTaskScope
来 fork 自己的子任务,从而扩展层次结构。该层次结构反映在代码的块结构中,它限制了分叉的生命周期:一旦作用域关闭,所有分叉的线程都保证终止,并且当块退出时不会留下任何线程。
作用域中的任何分叉、嵌套作用域中的任何分叉以及作用域的所有者都可以shutdown()
随时调用作用域的方法来表示任务已完成 - 即使其他分叉仍在运行时也是如此。该shutdown()
方法会中断范围内仍处于活动状态的所有分叉的线程。因此,所有分叉都应该以响应中断的方式编写。实际上,它是顺序代码中语句shutdown()
的并发模拟。break
返回时join()
,所有分叉要么已完成(成功或失败),要么已被取消。他们的结果或异常可以通过他们的 futureresultNow()
或exceptionNow()
方法获得,而无需任何额外的阻塞。 (这些方法IllegalStateException
在 future 完成之前抛出一个 if 调用。)
在某个范围内调用join()
或是强制的。joinUntil()
如果作用域的块在加入之前退出,则作用域将等待所有分叉终止,然后抛出异常。
作用域所属的线程有可能在加入之前或加入时被中断。例如,它可能是已关闭的封闭范围的分支。如果发生这种情况,join()
则会joinUntil(Instant)
抛出异常,因为没有继续下去的意义。然后-with try
-resources 语句将关闭作用域,这将取消所有分叉并等待它们终止。这具有自动将任务的取消传播到其子任务的效果。如果该joinUntil(Instant)
方法的截止日期在分叉终止或shutdown()
调用之前到期,那么它将抛出异常,并且try
-with-resources 语句将再次关闭作用域。
的结构化使用StructuredTaskScope
是在运行时强制执行的。例如,尝试fork(Callable)
从不在作用域的树层次结构中的线程(即所有者、分叉和嵌套作用域中的分叉)进行调用将失败并出现异常。在try
-with-resources 块之外使用作用域并在不调用close()
或不维护close()
调用的正确嵌套的情况下返回,可能会导致作用域的方法抛出StructureViolationException
.
StructuredTaskScope
强制并发操作的结构和顺序。因此,它不实现ExecutorService
或Executor
接口,因为这些接口的实例通常以非结构化方式使用(见下文)。然而,将使用 的代码迁移到使用 很简单ExecutorService
,但会从结构中受益StructuredTaskScope
。
StructuredTaskScope 驻留在孵化器模块中,默认排除
上面的示例使用StructuredTaskScope
API,因此要在 JDK XX 上运行它们,您必须添加jdk.incubator.concurrent
模块,并且还必须启用预览功能才能启用虚拟线程:
-
使用 编译程序
javac --release XX --enable-preview --add-modules jdk.incubator.concurrent Main.java
并使用java --enable-preview --add-modules jdk.incubator.concurrent Main
;运行它或者, -
使用源代码启动器时,使用
java --source XX --enable-preview --add-modules jdk.incubator.concurrent Main.java
;运行程序或者, -
使用jshell时,启动它
jshell --enable-preview --add-modules jdk.incubator.concurrent
关停政策
在处理并发子任务时,通常使用_短路模式_来避免做不必要的工作。有时,如果其中一个子任务失败(即_调用 all_),或者如果其中一个子任务成功(即_调用 any_),则取消所有子任务是有意义的。和StructuredTaskScope
的两个子类通过分别在第一次分叉失败或成功时关闭范围的策略来支持这些模式。它们还提供了处理异常和成功结果的方法。ShutdownOnFailure
ShutdownOnSuccess
这是一个StructuredTaskScope
失败时关闭策略(也在handle()
上面的示例中使用),它同时运行一组任务,如果其中任何一个 失败,则失败:
<T> List<T> runAll(List<Callable<T>> tasks) throws Throwable {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<Future<T>> futures = tasks.stream().map(scope::fork).toList();
scope.join();
scope.throwIfFailed(e -> e); // Propagate exception as-is if any fork fails
// Here, all tasks have succeeded, so compose their results
return futures.stream().map(Future::resultNow).toList();
}
}
这是一个StructuredTaskScope
成功关闭策略,返回第一个成功子任务的结果:
<T> T race(List<Callable<T>> tasks, Instant deadline) throws ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
for (var task : tasks) {
scope.fork(task);
}
scope.joinUntil(deadline);
return scope.result(); // Throws if none of the forks completed successfully
}
}
一旦一个分叉成功,该作用域就会自动关闭,从而取消剩余的活动分叉。如果所有分叉都失败或者给定的截止日期已过,则任务将失败。例如,此模式在需要来自任何一个冗余服务集合的结果的服务器应用程序中非常有用。
虽然这两个关闭策略是开箱即用的,但开发人员可以创建自定义策略,通过扩展StructuredTaskScope
和重写该handleComplete(Future)
方法来抽象其他模式。
扇入场景
上面的示例重点关注_扇出_场景,管理多个并发传出 I/O 操作。在管理多个并发传入 I/O 操作的_扇入_StructuredTaskScope
场景中也很有用。在这种情况下,我们通常会创建未知数量的分叉来响应传入的请求。下面是一个服务器的示例,它派生子任务来处理 a 内的传入连接:StructuredTaskScope
void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
try (var scope = new StructuredTaskScope<Void>()) {
try {
while (true) {
var socket = serverSocket.accept();
scope.fork(() -> handle(socket));
}
} finally {
// If there's been an error or we're interrupted, we stop accepting
scope.shutdown(); // Close all active connections
scope.join();
}
}
}
由于所有连接处理子任务都是在作用域内创建的,因此线程转储会将它们显示为作用域所有者的子任务。
可观察性
我们扩展了JEP 425添加的新 JSON 线程转储格式,以将StructuredTaskScope
线程分组显示为层次结构:
$ jcmd <pid> Thread.dump_to_file -format=json <file>
每个作用域的 JSON 对象包含作用域中分叉的线程数组及其堆栈跟踪。作用域的拥有线程通常会被阻塞在 join 方法中,等待子任务完成;线程转储通过显示结构化并发所施加的树层次结构,可以轻松查看子任务的线程正在执行的操作。作用域的 JSON 对象还具有对其父级的引用,以便可以从转储中重建程序的结构。
APIcom.sun.management.HotSpotDiagnosticsMXBean
还可用于直接或间接通过平台MBeanServer
和本地或远程 JMX 工具生成此类线程转储。
备择方案
-
没做什么。让开发人员继续使用现有的低级
java.util.concurrent
API,并继续仔细考虑并发代码中出现的所有异常条件和生命周期协调问题。 -
增强
ExecutorService
界面。我们对该接口的实现进行了原型设计,该接口始终强制执行结构并限制哪些线程可以提交任务。然而,我们发现它是有问题的,因为JDK 和生态系统中的大多数使用ExecutorService
(及其父接口)都不是结构化的。Executor
将相同的 API 重复用于限制性更大的概念必然会引起混乱。例如,ExecutorService
在大多数情况 下,将结构化实例传递给接受该类型的现有方法几乎肯定会引发异常。