概要
Java 21 で正式導入された仮想スレッド(Virtual Threads)は、従来のプラットフォームスレッドとは異なり、OS スレッドに1対1で紐づかない軽量なスレッドです。これにより、スレッドプールのサイズを気にすることなく、I/O 待ちの多い処理を大量に並行実行できるようになりました。従来は数百スレッドが実用上の限界でしたが、仮想スレッドでは数万〜数十万の並行タスクを低コストで起動できます。ただし、CPU 集約的な処理には向かない点や、synchronized ブロック内でのブロッキングによる pinning 問題など、導入時に理解しておくべき注意点もあります。この記事では、Thread.ofVirtual() や Executors.newVirtualThreadPerTaskExecutor() の基本的な使い方から、既存コードの移行方針、pinning の回避策までを、外部ライブラリなしの完結したコードで示します。
使いどころ
Web サーバーで1リクエスト1仮想スレッドのモデルを採用し、数千の同時接続を少ない OS リソースで処理する
夜間バッチで数千件の外部 API 呼び出しを仮想スレッドで並行実行し、全体の処理時間を短縮する
大量のファイルを並列に読み込み、内容を集計する ETL 処理で、スレッドプールのサイズ調整から解放される
コード例
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock;
public class VirtualThreadsDemo {
/** 外部 API 呼び出しを模擬(I/O バウンド) */
static String callApi(String endpoint) {
try {
Thread.sleep(100); // ネットワーク遅延を模擬
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Response from " + endpoint
+ " [" + Thread.currentThread() + "]";
}
/** --- 1. 基本: Thread.ofVirtual() で仮想スレッドを生成 --- */
static void basicVirtualThread() throws InterruptedException {
var thread = Thread.ofVirtual()
.name("my-virtual-thread")
.start(() -> {
System.out.println("仮想スレッド: "
+ Thread.currentThread());
System.out.println("isVirtual: "
+ Thread.currentThread().isVirtual());
});
thread.join();
}
/** --- 2. Thread.startVirtualThread() で簡易起動 --- */
static void quickStart() throws InterruptedException {
var thread = Thread.startVirtualThread(() ->
System.out.println("簡易起動: "
+ Thread.currentThread().isVirtual()));
thread.join();
}
/** --- 3. ExecutorService で大量タスクを並行実行 --- */
static void massiveConcurrency() throws Exception {
var taskCount = 1000;
var start = Instant.now();
try (var executor = Executors
.newVirtualThreadPerTaskExecutor()) {
var futures = new ArrayList<Future<String>>();
for (var i = 0; i < taskCount; i++) {
var endpoint = "/api/resource/" + i;
futures.add(executor.submit(
() -> callApi(endpoint)));
}
var successCount = 0;
for (var future : futures) {
future.get();
successCount++;
}
var elapsed = Duration.between(start, Instant.now());
System.out.println(successCount + " タスク完了: "
+ elapsed.toMillis() + " ms");
}
}
/** --- 4. pinning 回避: synchronized → ReentrantLock --- */
static class SafeCounter {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
/** ReentrantLock なら仮想スレッドが pinning されない */
void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
/** --- 5. プラットフォームスレッドとの性能比較 --- */
static long benchmark(ExecutorService executor, int tasks)
throws Exception {
var start = Instant.now();
var futures = new ArrayList<Future<String>>();
for (var i = 0; i < tasks; i++) {
var endpoint = "/api/bench/" + i;
futures.add(executor.submit(
() -> callApi(endpoint)));
}
for (var f : futures) {
f.get();
}
return Duration.between(start, Instant.now()).toMillis();
}
public static void main(String[] args) throws Exception {
System.out.println("=== 1. 基本的な仮想スレッド ===");
basicVirtualThread();
System.out.println("\n=== 2. 簡易起動 ===");
quickStart();
System.out.println("\n=== 3. 1000タスク並行実行 ===");
massiveConcurrency();
System.out.println("\n=== 4. pinning 回避カウンター ===");
var counter = new SafeCounter();
try (var executor = Executors
.newVirtualThreadPerTaskExecutor()) {
var futures = new ArrayList<Future<?>>();
for (var i = 0; i < 10000; i++) {
futures.add(executor.submit(counter::increment));
}
for (var f : futures) {
f.get();
}
}
System.out.println("カウンター結果: " + counter.getCount()
+ " (期待値: 10000)");
System.out.println("\n=== 5. 性能比較 ===");
var tasks = 500;
try (var platformPool = Executors
.newFixedThreadPool(50)) {
var platformMs = benchmark(platformPool, tasks);
System.out.println("プラットフォームスレッド (50本): "
+ platformMs + " ms");
}
try (var virtualPool = Executors
.newVirtualThreadPerTaskExecutor()) {
var virtualMs = benchmark(virtualPool, tasks);
System.out.println("仮想スレッド: "
+ virtualMs + " ms");
}
}
}Version Coverage
仮想スレッドは利用不可(Java 19 でプレビュー導入)。スレッドプールベースの設計が基本。Java 17 で追加された API はないが、var や record でコードは簡潔になる。
// Java 17: var で簡潔に、ただしスレッドプール型は同じ
var pool = Executors.newFixedThreadPool(200);
var futures = new ArrayList<Future<String>>();
for (var i = 0; i < 1000; i++) {
var taskId = i;
futures.add(pool.submit(
() -> callApi("task-" + taskId)));
}
for (var f : futures) {
System.out.println(f.get());
}
pool.shutdown();Library Comparison
注意点
仮想スレッドは I/O 待ちの多い処理に最適化されている。CPU 集約的な計算(暗号処理、画像変換など)にはプラットフォームスレッドのプールを使うほうが効率的
synchronized ブロック内でブロッキング I/O を行うと、仮想スレッドがキャリアスレッド(OS スレッド)に固定される pinning が発生する。ReentrantLock への置き換えで回避できる
仮想スレッドでは ThreadLocal の使用に注意が必要。大量の仮想スレッドが ThreadLocal を持つとメモリ消費が増大するため、ScopedValue(プレビュー)への移行を検討する
仮想スレッドのデバッグでは、従来のスレッドダンプに加えて jcmd の新しいスレッドダンプ形式(JSON)を使うと構造化された情報が得られる。-Djdk.virtualThreadScheduler.parallelism でキャリアスレッド数も調整可能
FAQ
Executors.newFixedThreadPool(n) を Executors.newVirtualThreadPerTaskExecutor() に置き換えるだけで基本的な移行は完了します。ただし synchronized 内のブロッキングがないか事前に確認してください。
スレッドプールはスレッドを再利用してコストを抑える仕組みですが、仮想スレッドは生成コストが極めて低いためプーリングが不要です。1タスク1仮想スレッドの使い捨てモデルが推奨されます。
-Djdk.tracePinnedThreads=short を JVM オプションに追加すると、pinning 発生時にスタックトレースが出力されます。synchronized を ReentrantLock に置き換えることで回避できます。