概要

あるスレッドがデータを生産し、別のスレッドがそれを消費するというプロデューサー・コンシューマーのパターンは、非同期処理やメッセージキューの基盤となる構造です。Object.wait/notify でも実装できますが、ReentrantLock と Condition を使えば「キューが満杯でない」「キューが空でない」という2つの条件を分離でき、無駄な通知を減らせます。この記事では、容量制限付きキューを Condition で実装し、満杯時のブロッキングと空時のブロッキングを個別に制御する方法を示します。タイムアウト付きの取得も扱います。

使いどころ

バッチ処理で読み込みスレッドと書き込みスレッドを分離し、メモリ使用量を制御しながらパイプライン処理を行う

ログ収集で非同期にログをキューに投入し、別スレッドで一括書き込みするバッファリング機構を実装する

受注データの取込で、CSVパース(Producer)とDB登録(Consumer)を容量制限付きキューで接続する

コード例

ConditionQueueDemo.java
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionQueueDemo {

    // Condition を使った容量制限付きキュー
    static class BoundedQueue<T> {
        private final Deque<T> queue = new ArrayDeque<>();
        private final int capacity;
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition notFull = lock.newCondition();
        private final Condition notEmpty = lock.newCondition();

        BoundedQueue(int capacity) {
            this.capacity = capacity;
        }

        void put(T item) throws InterruptedException {
            lock.lock();
            try {
                while (queue.size() == capacity) {
                    notFull.await(); // 空きが出るまで待機
                }
                queue.addLast(item);
                System.out.println("[Producer] 追加: " + item
                    + " (サイズ: " + queue.size() + ")");
                notEmpty.signalAll(); // Consumer に通知
            } finally {
                lock.unlock();
            }
        }

        T take() throws InterruptedException {
            lock.lock();
            try {
                while (queue.isEmpty()) {
                    notEmpty.await(); // データが入るまで待機
                }
                var item = queue.removeFirst();
                System.out.println("[Consumer] 取得: " + item
                    + " (サイズ: " + queue.size() + ")");
                notFull.signalAll(); // Producer に通知
                return item;
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        var queue = new BoundedQueue<Integer>(3);

        var producer = new Thread(() -> {
            try {
                for (var i = 1; i <= 5; i++) {
                    queue.put(i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        var consumer = new Thread(() -> {
            try {
                for (var i = 0; i < 5; i++) {
                    queue.take();
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
        producer.join();
        consumer.join();
        System.out.println("完了");
    }
}

Java 8 / 17 / 21 の完全なサンプルコードは GitHub リポジトリ で確認できます。

Version Coverage

ラムダ式 + var で簡潔に記述可能。record でキューアイテムを不変データとして表現できる。

Java 17
// Java 17: ラムダ式 + var で簡潔に
var producer = new Thread(() -> {
    try {
        for (var i = 1; i <= 5; i++) {
            queue.put(i);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

Library Comparison

Condition(ReentrantLock)条件ごとに待機キューを分離し、不要な起床を避けたいとき。容量制限付きキューの自作に向く。BlockingQueue を自作することになるため、バグのリスクが高い。プロダクション用途では標準の BlockingQueue を優先すべき。
BlockingQueue(ArrayBlockingQueue 等)プロデューサー・コンシューマーを標準 API だけで安全に実装したいとき。Condition の管理が不要。内部実装は Condition と同じ仕組みだが、API として隠蔽されている。仕組みを理解する学習用途には Condition 実装が有効。
Disruptor(LMAX)超低レイテンシが求められる金融系やリアルタイムデータ処理。ロックフリー設計で高性能だが、API が独特で学習コストが高い。一般的な業務システムではオーバーキル。

注意点

await() を if ではなく while ループで囲むのが鉄則。偽の起床(spurious wakeup)が発生する可能性があるため、条件の再確認が必要

signalAll() の代わりに signal() を使うと、通知対象が1スレッドに限定される。複数コンシューマーがいる場合は signalAll() が安全

lock() と unlock() の対応が崩れると Condition.await() から復帰できなくなる。必ず try-finally で unlock する

awaitNanos のタイムアウト計算は戻り値を使って残り時間を管理する。Thread.sleep のように単純に待つのとは異なる

FAQ

Object.wait/notify と Condition.await/signal の違いは何ですか。

Condition は1つの Lock に対して複数の待機キューを持てます。Object.wait/notify では1つのモニターに1つの待機キューしかないため、条件ごとの通知分離ができません。

await() を while ではなく if で囲むとどうなりますか。

偽の起床(spurious wakeup)で条件を満たさないまま処理が進む可能性があります。仕様上 spurious wakeup は起こりうるとされているため、while で条件を再確認するのが正しい書き方です。

BlockingQueue があるのに Condition を学ぶ必要はありますか。

BlockingQueue の内部は Condition で実装されています。仕組みを理解しておくと、カスタムキューの設計やデバッグで役立ちます。

関連書籍

この記事のテーマをさらに深く学びたい方へ。

※ Amazon アソシエイトリンクを含みます