虚拟线程

此页面由 Cay HorstmannUPL 下贡献

 

为什么使用虚拟线程?

1995 年发布 Java 1.0 时,其 API 约有 100 个类,其中包括 java.lang.Thread。Java 是第一个直接支持并发编程的主流编程语言。

从 Java 1.2 开始,每个 Java 线程都在由底层操作系统提供的平台线程上运行。(在 Java 1.1 之前,在某些平台上,所有 Java 线程都由单个平台线程执行。)

平台线程的成本非同小可。它们需要几千个 CPU 指令才能启动,并且会消耗几兆字节的内存。服务器应用程序可以处理如此多的并发请求,以至于让每个请求都在单独的平台线程上执行变得不可行。在典型的服务器应用程序中,这些请求会将大部分时间花在阻塞上,等待来自数据库或其他服务的响应。

提高吞吐量的经典方法是非阻塞 API。程序员无需等待结果,而是指示在结果可用时应调用哪个方法,以及在发生错误时应调用哪个方法。这很快就会变得令人不快,因为回调会不断嵌套得越来越深。

JEP 425 在 Java 19 中引入了虚拟线程。许多虚拟线程运行在单个平台线程上。每当虚拟线程阻塞时,它就会被卸载,平台线程就会运行另一个虚拟线程。(“虚拟线程”这个名称应该让人想起映射到实际 RAM 的虚拟内存。)虚拟线程在 Java 20(JEP 436)中成为预览功能,并在 Java 21 中成为最终功能。

使用虚拟线程,阻塞变得很便宜。如果结果无法立即获得,您只需在虚拟线程中阻塞。您可以使用熟悉的编程结构(分支、循环、try 块)来代替回调管道。

当并发任务数量很大,并且任务主要阻塞在网络 I/O 上时,虚拟线程非常有用。它们对 CPU 密集型任务没有帮助。对于此类任务,请考虑使用 并行流递归 fork-join 任务

 

创建虚拟线程

工厂方法 Executors.newVirtualThreadPerTaskExecutor() 会生成一个 ExecutorService,该服务会在单独的虚拟线程中运行每个任务。例如

import java.util.concurrent.*;

public class VirtualThreadDemo {
   public static void main(String[] args) {
     final int NTASKS = 100; 
     ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
      for (int i = 0; i < NTASKS; i++) {
         service.submit(() -> {
            long id = Thread.currentThread().threadId(); 
            LockSupport.parkNanos(1_000_000_000);
            System.out.println(id);
         });
      }
      service.close();
   }
}

顺便说一下,代码使用 LockSupport.parkNanos 而不是 Thread.sleep,这样我们就不必捕获讨厌的 InterruptedException

也许您正在使用更低级的 API,该 API 需要线程工厂。要获取虚拟线程的工厂,请使用新的 Thread.Builder

Thread.Builder builder = Thread.ofVirtual().name("request-", 1);
ThreadFactory factory = builder.factory();

现在,调用 factory.newThread(myRunnable) 会创建一个新的(未启动的)虚拟线程。name 方法会配置构建器以设置线程名称 request-1request-2 等。

您也可以使用构建器来创建一个单独的虚拟线程

Thread t = builder.unstarted(myRunnable);

或者,如果您想立即启动线程

Thread t = builder.started(myRunnable);

最后,为了快速演示,有一个便捷方法

Thread t = Thread.startVirtualThread(myRunnable);

请注意,只有第一种方法(使用执行器服务)适用于带有结果的任务(可调用对象)。

 

线程 API 更改

在对不同的 API 进行了一系列实验后,Java 虚拟线程的设计人员决定简单地重用熟悉的 Thread API。虚拟线程是 Thread 的实例。取消操作与平台线程的工作方式相同,通过调用 interrupt 来实现。与往常一样,线程代码必须检查“中断”标志或调用执行此操作的方法。(大多数阻塞方法都会这样做。)

有一些区别。特别是,所有虚拟线程

  • 都在单个线程组中
  • 优先级为 NORM_PRIORITY
  • 是守护线程

没有 API 可以使用其他线程组来构造虚拟线程。尝试在虚拟线程上调用 setPrioritysetDaemon 不会有任何效果。

静态 Thread::getAllStackTraces 方法会返回所有平台线程的堆栈跟踪映射。不包括虚拟线程。

新的 Thread::isVirtual 实例方法会告知线程是否为虚拟线程。

请注意,无法找到虚拟线程执行所在的平台线程。

Java 19 对 Thread API 进行了一些更改,这些更改与虚拟线程无关

  • 现在有实例方法 join(Duration)sleep(Duration)
  • 非 final getId 方法已弃用,因为有人可能会覆盖它以返回线程 ID 以外的内容。请改用 final threadId 方法。

从 Java 20 开始,stopsuspendresume 方法会对平台线程和虚拟线程都抛出 UnsupportedOperationException。这些方法自 Java 1.2 起已弃用,自 Java 18 起已弃用并计划在未来版本中移除。

 

捕获任务结果

您通常希望合并多个并发任务的结果

Future<T1> f1 = service.submit(callable1);
Future<T2> f2 = service.submit(callable2);
result = combine(f1.get(), f2.get());

在虚拟线程出现之前,您可能对阻塞的 get 调用感到很糟糕。但现在阻塞很便宜。以下是一个更具体的示例程序

import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;

public class VirtualThreadDemo {
   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
      Future<String> f1 = service.submit(() -> get("https://horstmann.com/random/adjective"));
      Future<String> f2 = service.submit(() -> get("https://horstmann.com/random/noun"));
      String result = f1.get() + " " + f2.get();
      System.out.println(result);
      service.close();
   }

   private static HttpClient client = HttpClient.newHttpClient();

   public static String get(String url) {
      try {
         var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
         return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
      } catch (Exception ex) {
         var rex = new RuntimeException();
         rex.initCause(ex);
         throw rex;
      }
   }   
}

如果您有一个结果类型相同的任务列表,可以使用 invokeAll 方法,然后对每个 Future 调用 get

List<Callable<T>> callables = ...;
List<T> results = new ArrayList<>();
for (Future<T> f : service.invokeAll(callables))
  results.add(f.get());

同样,以下是一个更具体的示例程序

import java.util.*;
import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;

public class VirtualThreadDemo {
   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
      List<Callable<String>> callables = new ArrayList<>();
      final int ADJECTIVES = 4;
      for (int i = 1; i <= ADJECTIVES; i++)
         callables.add(() -> get("https://horstmann.com/random/adjective"));
      callables.add(() -> get("https://horstmann.com/random/noun"));
      List<String> results = new ArrayList<>();
      for (Future<String> f : service.invokeAll(callables))
         results.add(f.get());
      System.out.println(String.join(" ", results));
      service.close();
   }

   private static HttpClient client = HttpClient.newHttpClient();

   public static String get(String url) {
      try {
         var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
         return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
      } catch (Exception ex) {
         var rex = new RuntimeException();
         rex.initCause(ex);
         throw rex;
      }
   }   
}

 

速率限制

虚拟线程可以提高应用程序吞吐量,因为您可以拥有比平台线程更多的并发任务。这会给任务调用的服务带来压力。例如,Web 服务可能无法容忍大量的并发请求。

使用平台线程,一个简单的(虽然粗略的)调整因素是这些任务的线程池大小。但您不应该池化虚拟线程。在虚拟线程上调度任务,然后在平台线程上调度这些任务显然效率低下。那么这样做有什么好处呢?将虚拟线程数量限制为您的服务可以容忍的少量并发请求?那么您为什么要使用虚拟线程呢?

使用虚拟线程,您应该使用其他机制来控制对有限资源的访问。不要对并发任务设置总体限制,而是以适当的方式保护每个资源。对于数据库连接,连接池可能已经做到了这一点。当访问 Web 服务时,您了解自己的服务,并且可以提供适当的速率限制。

例如,在我的个人网站上,我提供用于生成随机项目的演示服务。如果来自同一 IP 地址的大量请求同时到达,托管公司会将该 IP 地址列入黑名单。

以下示例程序展示了使用简单的信号量来限制速率,该信号量允许少量并发请求。当超过最大值时,acquire 方法会阻塞,但这没关系。使用虚拟线程,阻塞很便宜。

import java.util.*;
import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;

public class RateLimitDemo {
   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
      List<Future<String>> futures = new ArrayList<>();
      final int TASKS = 250;
      for (int i = 1; i <= TASKS; i++)
         futures.add(service.submit(() -> get("https://horstmann.com/random/word")));
      for (Future<String> f : futures)
         System.out.print(f.get() + " ");
      System.out.println();
      service.close();
   }

   private static HttpClient client = HttpClient.newHttpClient();

   private static final Semaphore SEMAPHORE = new Semaphore(20);
   
   public static String get(String url) {
      try {
         var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
         SEMAPHORE.acquire();
         try {
            Thread.sleep(100);
            return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
         } finally {
            SEMAPHORE.release();
         }
      } catch (Exception ex) {
         ex.printStackTrace();
         var rex = new RuntimeException();
         rex.initCause(ex);
         throw rex;
      }
   }   
}

 

固定

虚拟线程调度程序会将虚拟线程安装到载体线程上。默认情况下,载体线程的数量与 CPU 内核的数量相同。您可以使用 jdk.virtualThreadScheduler.parallelism VM 选项来调整该数量。

当虚拟线程执行阻塞操作时,它应该从其载体线程中卸载,载体线程可以执行其他虚拟线程。但是,在某些情况下,无法进行此卸载。在某些情况下,虚拟线程调度程序会通过启动另一个载体线程来进行补偿。例如,在 JDK 21 中,这会发生在许多文件 I/O 操作以及调用 Object.wait 时。您可以使用 jdk.virtualThreadScheduler.maxPoolSize VM 选项来控制载体线程的最大数量。

线程在以下两种情况之一中被称为固定

  1. 当执行 synchronized 方法或块时
  2. 当调用本机方法或外部函数时

固定本身并不坏。但是,当固定线程阻塞时,它无法卸载。载体线程被阻塞,并且在 Java 21 中,不会启动额外的载体线程。这会导致可用于运行虚拟线程的载体线程更少。

如果 synchronized 用于避免内存中操作的竞争条件,则固定是无害的。但是,如果有阻塞调用,最好用 ReentrantLock 替换 synchronized。当然,只有在您控制源代码的情况下,这才是可行的选择。

要找出固定线程是否被阻塞,请使用以下选项之一启动 JVM

-Djdk.tracePinnedThreads=short
-Djdk.tracePinnedThreads=full

您会得到一个堆栈跟踪,显示固定线程何时阻塞

...
org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) <== monitors:1
...

请注意,您只会收到每个固定位置的一个警告!

或者,使用 Java Flight Recorder 记录,使用您喜欢的任务控制查看器查看,并查找 VirtualThreadPinnedVirtualThreadSubmitFailed 事件。

JVM 最终将实现,以便 synchronized 方法或块不再导致固定。然后您只需要担心本机代码的固定。

以下示例程序展示了固定操作。我们启动了一些虚拟线程,这些线程在同步方法中休眠,从而阻塞其载体线程。添加了一些不做任何工作的虚拟线程。但它们无法调度,因为载体线程池已被完全耗尽。请注意,当您

  • 使用 ReentrantLock
  • 不使用虚拟线程
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class PinningDemo {
   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ExecutorService service =
         Executors.newVirtualThreadPerTaskExecutor(); 
         // Executors.newCachedThreadPool();
      
      final int TASKS = 20;
      long start = System.nanoTime();
      for (int i = 1; i <= TASKS; i++) {
         service.submit(() -> block());
         // service.submit(() -> rblock());
      }
      for (int i = 1; i <= TASKS; i++) {    
         service.submit(() -> noblock());
      }
      service.close();
      long end = System.nanoTime();
      System.out.printf("%.2f%n", (end - start) * 1E-9);
   }

   public static synchronized void block() {
      System.out.println("Entering block " + Thread.currentThread());      
      LockSupport.parkNanos(1_000_000_000);
      System.out.println("Exiting block " + Thread.currentThread());
   }
   private static Lock lock = new ReentrantLock(); 
   public static void rblock() {
      lock.lock();
      try {
         System.out.println("Entering rblock " + Thread.currentThread());      
         LockSupport.parkNanos(1_000_000_000);
         System.out.println("Exiting rblock " + Thread.currentThread());
      } finally {
         lock.unlock();
      }
   }   
   public static void noblock() {
      System.out.println("Entering noblock " + Thread.currentThread());      
      LockSupport.parkNanos(1_000_000_000);
      System.out.println("Exiting noblock " + Thread.currentThread());
   }   
}

 

线程局部变量

线程局部变量是一个对象,其 getset 方法访问的值取决于当前线程。为什么要使用这样的变量,而不是使用全局变量或局部变量?经典的应用是一个非线程安全的服务,例如 SimpleDateFormat,或者一个会因争用而受到影响的服务,例如随机数生成器。每个线程的实例比受锁保护的全局实例的性能更好。

线程局部变量的另一个常见用途是提供“隐式”上下文,例如数据库连接,该连接已针对每个任务正确配置。任务代码无需将上下文从一个方法传递到另一个方法,而是在需要访问数据库时只需读取线程局部变量即可。

在迁移到虚拟线程时,线程局部变量可能会成为问题。虚拟线程的数量很可能远大于线程池中的线程数量,这意味着现在会有更多线程局部变量实例。在这种情况下,您应该重新考虑您的共享策略。

要定位应用程序中线程局部变量的使用,请使用 VM 标志 `jdk.traceVirtualThreadLocals` 运行。当虚拟线程修改线程局部变量时,您将获得一个堆栈跟踪。

 

结论

  • 当您有许多主要阻塞在网络 I/O 上的任务时,使用虚拟线程来提高吞吐量。
  • 主要优势是熟悉的“同步”编程风格,无需回调。
  • 不要池化虚拟线程;使用其他机制进行速率限制。
  • 检查固定并根据需要进行缓解。
  • 最小化虚拟线程中的线程局部变量。

上次更新: 2021 年 9 月 14 日


返回教程列表