概要
あるスレッドがデータを生産し、別のスレッドがそれを消費するというプロデューサー・コンシューマーのパターンは、非同期処理やメッセージキューの基盤となる構造です。Object.wait/notify でも実装できますが、ReentrantLock と Condition を使えば「キューが満杯でない」「キューが空でない」という2つの条件を分離でき、無駄な通知を減らせます。この記事では、容量制限付きキューを Condition で実装し、満杯時のブロッキングと空時のブロッキングを個別に制御する方法を示します。タイムアウト付きの取得も扱います。
使いどころ
バッチ処理で読み込みスレッドと書き込みスレッドを分離し、メモリ使用量を制御しながらパイプライン処理を行う
ログ収集で非同期にログをキューに投入し、別スレッドで一括書き込みするバッファリング機構を実装する
受注データの取込で、CSVパース(Producer)とDB登録(Consumer)を容量制限付きキューで接続する
コード例
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionQueueDemo {
// Condition を使った容量制限付きキュー
static class BoundedQueue<T> {
private final Deque<T> queue = new ArrayDeque<>();
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
BoundedQueue(int capacity) {
this.capacity = capacity;
}
void put(T item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 空きが出るまで待機
}
queue.addLast(item);
System.out.println("[Producer] 追加: " + item
+ " (サイズ: " + queue.size() + ")");
notEmpty.signalAll(); // Consumer に通知
} finally {
lock.unlock();
}
}
T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // データが入るまで待機
}
var item = queue.removeFirst();
System.out.println("[Consumer] 取得: " + item
+ " (サイズ: " + queue.size() + ")");
notFull.signalAll(); // Producer に通知
return item;
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
var queue = new BoundedQueue<Integer>(3);
var producer = new Thread(() -> {
try {
for (var i = 1; i <= 5; i++) {
queue.put(i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
var consumer = new Thread(() -> {
try {
for (var i = 0; i < 5; i++) {
queue.take();
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
System.out.println("完了");
}
}Version Coverage
ラムダ式 + var で簡潔に記述可能。record でキューアイテムを不変データとして表現できる。
// Java 17: ラムダ式 + var で簡潔に
var producer = new Thread(() -> {
try {
for (var i = 1; i <= 5; i++) {
queue.put(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});Library Comparison
注意点
await() を if ではなく while ループで囲むのが鉄則。偽の起床(spurious wakeup)が発生する可能性があるため、条件の再確認が必要
signalAll() の代わりに signal() を使うと、通知対象が1スレッドに限定される。複数コンシューマーがいる場合は signalAll() が安全
lock() と unlock() の対応が崩れると Condition.await() から復帰できなくなる。必ず try-finally で unlock する
awaitNanos のタイムアウト計算は戻り値を使って残り時間を管理する。Thread.sleep のように単純に待つのとは異なる
FAQ
Condition は1つの Lock に対して複数の待機キューを持てます。Object.wait/notify では1つのモニターに1つの待機キューしかないため、条件ごとの通知分離ができません。
偽の起床(spurious wakeup)で条件を満たさないまま処理が進む可能性があります。仕様上 spurious wakeup は起こりうるとされているため、while で条件を再確認するのが正しい書き方です。
BlockingQueue の内部は Condition で実装されています。仕組みを理解しておくと、カスタムキューの設計やデバッグで役立ちます。