虚拟线程
此页面由 Cay Horstmann 在 UPL 下贡献为什么使用虚拟线程?
1995 年发布 Java 1.0 时,其 API 约有 100 个类,其中包括 java.lang.Thread
。Java 是第一个直接支持并发编程的主流编程语言。
从 Java 1.2 开始,每个 Java 线程都在由底层操作系统提供的平台线程上运行。(在 Java 1.1 之前,在某些平台上,所有 Java 线程都由单个平台线程执行。)
平台线程的成本非同小可。它们需要几千个 CPU 指令才能启动,并且会消耗几兆字节的内存。服务器应用程序可以处理如此多的并发请求,以至于让每个请求都在单独的平台线程上执行变得不可行。在典型的服务器应用程序中,这些请求会将大部分时间花在阻塞上,等待来自数据库或其他服务的响应。
提高吞吐量的经典方法是非阻塞 API。程序员无需等待结果,而是指示在结果可用时应调用哪个方法,以及在发生错误时应调用哪个方法。这很快就会变得令人不快,因为回调会不断嵌套得越来越深。
JEP 425 在 Java 19 中引入了虚拟线程。许多虚拟线程运行在单个平台线程上。每当虚拟线程阻塞时,它就会被卸载,平台线程就会运行另一个虚拟线程。(“虚拟线程”这个名称应该让人想起映射到实际 RAM 的虚拟内存。)虚拟线程在 Java 20(JEP 436)中成为预览功能,并在 Java 21 中成为最终功能。
使用虚拟线程,阻塞变得很便宜。如果结果无法立即获得,您只需在虚拟线程中阻塞。您可以使用熟悉的编程结构(分支、循环、try 块)来代替回调管道。
当并发任务数量很大,并且任务主要阻塞在网络 I/O 上时,虚拟线程非常有用。它们对 CPU 密集型任务没有帮助。对于此类任务,请考虑使用 并行流 或 递归 fork-join 任务。
创建虚拟线程
工厂方法 Executors.newVirtualThreadPerTaskExecutor()
会生成一个 ExecutorService
,该服务会在单独的虚拟线程中运行每个任务。例如
import java.util.concurrent.*;
public class VirtualThreadDemo {
public static void main(String[] args) {
final int NTASKS = 100;
ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < NTASKS; i++) {
service.submit(() -> {
long id = Thread.currentThread().threadId();
LockSupport.parkNanos(1_000_000_000);
System.out.println(id);
});
}
service.close();
}
}
顺便说一下,代码使用 LockSupport.parkNanos
而不是 Thread.sleep
,这样我们就不必捕获讨厌的 InterruptedException
。
也许您正在使用更低级的 API,该 API 需要线程工厂。要获取虚拟线程的工厂,请使用新的 Thread.Builder
类
Thread.Builder builder = Thread.ofVirtual().name("request-", 1);
ThreadFactory factory = builder.factory();
现在,调用 factory.newThread(myRunnable)
会创建一个新的(未启动的)虚拟线程。name
方法会配置构建器以设置线程名称 request-1
、request-2
等。
您也可以使用构建器来创建一个单独的虚拟线程
Thread t = builder.unstarted(myRunnable);
或者,如果您想立即启动线程
Thread t = builder.started(myRunnable);
最后,为了快速演示,有一个便捷方法
Thread t = Thread.startVirtualThread(myRunnable);
请注意,只有第一种方法(使用执行器服务)适用于带有结果的任务(可调用对象)。
线程 API 更改
在对不同的 API 进行了一系列实验后,Java 虚拟线程的设计人员决定简单地重用熟悉的 Thread
API。虚拟线程是 Thread
的实例。取消操作与平台线程的工作方式相同,通过调用 interrupt
来实现。与往常一样,线程代码必须检查“中断”标志或调用执行此操作的方法。(大多数阻塞方法都会这样做。)
有一些区别。特别是,所有虚拟线程
- 都在单个线程组中
- 优先级为
NORM_PRIORITY
- 是守护线程
没有 API 可以使用其他线程组来构造虚拟线程。尝试在虚拟线程上调用 setPriority
或 setDaemon
不会有任何效果。
静态 Thread::getAllStackTraces
方法会返回所有平台线程的堆栈跟踪映射。不包括虚拟线程。
新的 Thread::isVirtual
实例方法会告知线程是否为虚拟线程。
请注意,无法找到虚拟线程执行所在的平台线程。
Java 19 对 Thread
API 进行了一些更改,这些更改与虚拟线程无关
- 现在有实例方法
join(Duration)
和sleep(Duration)
。 - 非 final
getId
方法已弃用,因为有人可能会覆盖它以返回线程 ID 以外的内容。请改用 finalthreadId
方法。
从 Java 20 开始,stop
、suspend
和 resume
方法会对平台线程和虚拟线程都抛出 UnsupportedOperationException
。这些方法自 Java 1.2 起已弃用,自 Java 18 起已弃用并计划在未来版本中移除。
捕获任务结果
您通常希望合并多个并发任务的结果
Future<T1> f1 = service.submit(callable1);
Future<T2> f2 = service.submit(callable2);
result = combine(f1.get(), f2.get());
在虚拟线程出现之前,您可能对阻塞的 get
调用感到很糟糕。但现在阻塞很便宜。以下是一个更具体的示例程序
import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;
public class VirtualThreadDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
Future<String> f1 = service.submit(() -> get("https://horstmann.com/random/adjective"));
Future<String> f2 = service.submit(() -> get("https://horstmann.com/random/noun"));
String result = f1.get() + " " + f2.get();
System.out.println(result);
service.close();
}
private static HttpClient client = HttpClient.newHttpClient();
public static String get(String url) {
try {
var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
} catch (Exception ex) {
var rex = new RuntimeException();
rex.initCause(ex);
throw rex;
}
}
}
如果您有一个结果类型相同的任务列表,可以使用 invokeAll
方法,然后对每个 Future
调用 get
List<Callable<T>> callables = ...;
List<T> results = new ArrayList<>();
for (Future<T> f : service.invokeAll(callables))
results.add(f.get());
同样,以下是一个更具体的示例程序
import java.util.*;
import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;
public class VirtualThreadDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
List<Callable<String>> callables = new ArrayList<>();
final int ADJECTIVES = 4;
for (int i = 1; i <= ADJECTIVES; i++)
callables.add(() -> get("https://horstmann.com/random/adjective"));
callables.add(() -> get("https://horstmann.com/random/noun"));
List<String> results = new ArrayList<>();
for (Future<String> f : service.invokeAll(callables))
results.add(f.get());
System.out.println(String.join(" ", results));
service.close();
}
private static HttpClient client = HttpClient.newHttpClient();
public static String get(String url) {
try {
var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
} catch (Exception ex) {
var rex = new RuntimeException();
rex.initCause(ex);
throw rex;
}
}
}
速率限制
虚拟线程可以提高应用程序吞吐量,因为您可以拥有比平台线程更多的并发任务。这会给任务调用的服务带来压力。例如,Web 服务可能无法容忍大量的并发请求。
使用平台线程,一个简单的(虽然粗略的)调整因素是这些任务的线程池大小。但您不应该池化虚拟线程。在虚拟线程上调度任务,然后在平台线程上调度这些任务显然效率低下。那么这样做有什么好处呢?将虚拟线程数量限制为您的服务可以容忍的少量并发请求?那么您为什么要使用虚拟线程呢?
使用虚拟线程,您应该使用其他机制来控制对有限资源的访问。不要对并发任务设置总体限制,而是以适当的方式保护每个资源。对于数据库连接,连接池可能已经做到了这一点。当访问 Web 服务时,您了解自己的服务,并且可以提供适当的速率限制。
例如,在我的个人网站上,我提供用于生成随机项目的演示服务。如果来自同一 IP 地址的大量请求同时到达,托管公司会将该 IP 地址列入黑名单。
以下示例程序展示了使用简单的信号量来限制速率,该信号量允许少量并发请求。当超过最大值时,acquire
方法会阻塞,但这没关系。使用虚拟线程,阻塞很便宜。
import java.util.*;
import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;
public class RateLimitDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
List<Future<String>> futures = new ArrayList<>();
final int TASKS = 250;
for (int i = 1; i <= TASKS; i++)
futures.add(service.submit(() -> get("https://horstmann.com/random/word")));
for (Future<String> f : futures)
System.out.print(f.get() + " ");
System.out.println();
service.close();
}
private static HttpClient client = HttpClient.newHttpClient();
private static final Semaphore SEMAPHORE = new Semaphore(20);
public static String get(String url) {
try {
var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
SEMAPHORE.acquire();
try {
Thread.sleep(100);
return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
} finally {
SEMAPHORE.release();
}
} catch (Exception ex) {
ex.printStackTrace();
var rex = new RuntimeException();
rex.initCause(ex);
throw rex;
}
}
}
固定
虚拟线程调度程序会将虚拟线程安装到载体线程上。默认情况下,载体线程的数量与 CPU 内核的数量相同。您可以使用 jdk.virtualThreadScheduler.parallelism
VM 选项来调整该数量。
当虚拟线程执行阻塞操作时,它应该从其载体线程中卸载,载体线程可以执行其他虚拟线程。但是,在某些情况下,无法进行此卸载。在某些情况下,虚拟线程调度程序会通过启动另一个载体线程来进行补偿。例如,在 JDK 21 中,这会发生在许多文件 I/O 操作以及调用 Object.wait
时。您可以使用 jdk.virtualThreadScheduler.maxPoolSize
VM 选项来控制载体线程的最大数量。
线程在以下两种情况之一中被称为固定
- 当执行
synchronized
方法或块时 - 当调用本机方法或外部函数时
固定本身并不坏。但是,当固定线程阻塞时,它无法卸载。载体线程被阻塞,并且在 Java 21 中,不会启动额外的载体线程。这会导致可用于运行虚拟线程的载体线程更少。
如果 synchronized
用于避免内存中操作的竞争条件,则固定是无害的。但是,如果有阻塞调用,最好用 ReentrantLock
替换 synchronized
。当然,只有在您控制源代码的情况下,这才是可行的选择。
要找出固定线程是否被阻塞,请使用以下选项之一启动 JVM
-Djdk.tracePinnedThreads=short
-Djdk.tracePinnedThreads=full
您会得到一个堆栈跟踪,显示固定线程何时阻塞
...
org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) <== monitors:1
...
请注意,您只会收到每个固定位置的一个警告!
或者,使用 Java Flight Recorder 记录,使用您喜欢的任务控制查看器查看,并查找 VirtualThreadPinned
和 VirtualThreadSubmitFailed
事件。
JVM 最终将实现,以便 synchronized
方法或块不再导致固定。然后您只需要担心本机代码的固定。
以下示例程序展示了固定操作。我们启动了一些虚拟线程,这些线程在同步方法中休眠,从而阻塞其载体线程。添加了一些不做任何工作的虚拟线程。但它们无法调度,因为载体线程池已被完全耗尽。请注意,当您
- 使用
ReentrantLock
- 不使用虚拟线程
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class PinningDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService service =
Executors.newVirtualThreadPerTaskExecutor();
// Executors.newCachedThreadPool();
final int TASKS = 20;
long start = System.nanoTime();
for (int i = 1; i <= TASKS; i++) {
service.submit(() -> block());
// service.submit(() -> rblock());
}
for (int i = 1; i <= TASKS; i++) {
service.submit(() -> noblock());
}
service.close();
long end = System.nanoTime();
System.out.printf("%.2f%n", (end - start) * 1E-9);
}
public static synchronized void block() {
System.out.println("Entering block " + Thread.currentThread());
LockSupport.parkNanos(1_000_000_000);
System.out.println("Exiting block " + Thread.currentThread());
}
private static Lock lock = new ReentrantLock();
public static void rblock() {
lock.lock();
try {
System.out.println("Entering rblock " + Thread.currentThread());
LockSupport.parkNanos(1_000_000_000);
System.out.println("Exiting rblock " + Thread.currentThread());
} finally {
lock.unlock();
}
}
public static void noblock() {
System.out.println("Entering noblock " + Thread.currentThread());
LockSupport.parkNanos(1_000_000_000);
System.out.println("Exiting noblock " + Thread.currentThread());
}
}
线程局部变量
线程局部变量是一个对象,其 get
和 set
方法访问的值取决于当前线程。为什么要使用这样的变量,而不是使用全局变量或局部变量?经典的应用是一个非线程安全的服务,例如 SimpleDateFormat
,或者一个会因争用而受到影响的服务,例如随机数生成器。每个线程的实例比受锁保护的全局实例的性能更好。
线程局部变量的另一个常见用途是提供“隐式”上下文,例如数据库连接,该连接已针对每个任务正确配置。任务代码无需将上下文从一个方法传递到另一个方法,而是在需要访问数据库时只需读取线程局部变量即可。
在迁移到虚拟线程时,线程局部变量可能会成为问题。虚拟线程的数量很可能远大于线程池中的线程数量,这意味着现在会有更多线程局部变量实例。在这种情况下,您应该重新考虑您的共享策略。
要定位应用程序中线程局部变量的使用,请使用 VM 标志 `jdk.traceVirtualThreadLocals` 运行。当虚拟线程修改线程局部变量时,您将获得一个堆栈跟踪。
结论
- 当您有许多主要阻塞在网络 I/O 上的任务时,使用虚拟线程来提高吞吐量。
- 主要优势是熟悉的“同步”编程风格,无需回调。
- 不要池化虚拟线程;使用其他机制进行速率限制。
- 检查固定并根据需要进行缓解。
- 最小化虚拟线程中的线程局部变量。
上次更新: 2021 年 9 月 14 日
返回教程列表