Skip to content
Published on

[オペレーティングシステム] 07. 同期問題:生産者-消費者、哲学者の食事

Authors

古典的同期問題

同期ツールの正しい使い方を理解するために、オペレーティングシステム分野で伝統的に扱われる3つの古典的問題を見ていく。


1. 有界バッファ問題(Bounded-Buffer Problem)

生産者はデータを作ってバッファに入れ、消費者はバッファからデータを取り出して使用する。バッファのサイズが有限なので、バッファが満杯なら生産者は待機し、空なら消費者が待機しなければならない。

[有界バッファ構造]

生産者 -->  [  |  |  |  |  ]  --> 消費者
            0  1  2  3  4
            ^           ^
           out          in

セマフォ:
  mutex = 1      (バッファアクセスの相互排除)
  empty = N      (空きスロット数、初期値 = バッファサイズ)
  full  = 0      (埋まったスロット数、初期値 = 0
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>

#define BUFFER_SIZE 5

int buffer[BUFFER_SIZE];
int in = 0, out = 0;

sem_t empty;    // 空きスロット数
sem_t full;     // 埋まったスロット数
pthread_mutex_t mutex;

void *producer(void *arg) {
    int id = *(int *)arg;
    for (int i = 0; i < 10; i++) {
        int item = rand() % 100;

        sem_wait(&empty);           // 空きスロットがあるまで待機
        pthread_mutex_lock(&mutex); // バッファアクセスをロック

        // クリティカルセクション:バッファにアイテムを追加
        buffer[in] = item;
        printf("生産者 %d: buffer[%d] = %d 生産\n", id, in, item);
        in = (in + 1) % BUFFER_SIZE;

        pthread_mutex_unlock(&mutex);
        sem_post(&full);            // 埋まったスロット数を増加

        usleep(rand() % 500000);
    }
    return NULL;
}

void *consumer(void *arg) {
    int id = *(int *)arg;
    for (int i = 0; i < 10; i++) {
        sem_wait(&full);            // 埋まったスロットがあるまで待機
        pthread_mutex_lock(&mutex); // バッファアクセスをロック

        // クリティカルセクション:バッファからアイテムを取り出す
        int item = buffer[out];
        printf("消費者 %d: buffer[%d] = %d 消費\n", id, out, item);
        out = (out + 1) % BUFFER_SIZE;

        pthread_mutex_unlock(&mutex);
        sem_post(&empty);           // 空きスロット数を増加

        usleep(rand() % 800000);
    }
    return NULL;
}

int main() {
    pthread_mutex_init(&mutex, NULL);
    sem_init(&empty, 0, BUFFER_SIZE);
    sem_init(&full, 0, 0);

    pthread_t prod[2], cons[2];
    int ids[] = {0, 1};

    for (int i = 0; i < 2; i++) {
        pthread_create(&prod[i], NULL, producer, &ids[i]);
        pthread_create(&cons[i], NULL, consumer, &ids[i]);
    }

    for (int i = 0; i < 2; i++) {
        pthread_join(prod[i], NULL);
        pthread_join(cons[i], NULL);
    }

    pthread_mutex_destroy(&mutex);
    sem_destroy(&empty);
    sem_destroy(&full);
    return 0;
}

2. 読者-書者問題(Readers-Writers Problem)

データベースを複数のプロセスが共有する場合、読み取りのみを行うプロセス(reader)は同時にアクセスしても安全だが、書き込みプロセス(writer)は排他的アクセスが必要である。

[読者-書者ルール]

Reader + Reader = 許可(同時読み取り可能)
Reader + Writer = 不許可(衝突)
Writer + Writer = 不許可(衝突)

第一変形:読者優先

読者がいる間、書者は待機する。読者が次々と入ってくると書者が飢餓状態に陥る可能性がある。

#include <pthread.h>
#include <stdio.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t rw_mutex = PTHREAD_MUTEX_INITIALIZER;
int read_count = 0;
int shared_data = 0;

void *reader(void *arg) {
    int id = *(int *)arg;

    pthread_mutex_lock(&mutex);
    read_count++;
    if (read_count == 1) {
        // 最初の読者が書者をブロック
        pthread_mutex_lock(&rw_mutex);
    }
    pthread_mutex_unlock(&mutex);

    // クリティカルセクション:読み取り
    printf("読者 %d: データ読み取り = %d(現在の読者数:%d)\n",
           id, shared_data, read_count);

    pthread_mutex_lock(&mutex);
    read_count--;
    if (read_count == 0) {
        // 最後の読者が書者のブロックを解除
        pthread_mutex_unlock(&rw_mutex);
    }
    pthread_mutex_unlock(&mutex);

    return NULL;
}

void *writer(void *arg) {
    int id = *(int *)arg;

    pthread_mutex_lock(&rw_mutex);  // 排他的アクセス

    // クリティカルセクション:書き込み
    shared_data++;
    printf("書者 %d: データ更新 = %d\n", id, shared_data);

    pthread_mutex_unlock(&rw_mutex);

    return NULL;
}

第二変形:書者優先

書者の準備ができたら新しい読者は入れない。既存の読者が全員出た後に書者が実行される。


3. 食事する哲学者問題(Dining Philosophers Problem)

5人の哲学者が円形テーブルに座っている。各哲学者の間に箸が1本ずつあり、食事するには両側の箸2本が必要である。

[食事する哲学者]

        P0
    C4      C0
  P4          P1
    C3      C1
        P3
      C2
        P2

P: 哲学者(Philosopher)
C: 箸(Chopstick)

哲学者 iは箸 iと箸 (i+1)%5を使用

セマフォを使った簡単な解法(デッドロックの危険)

// 注意:この解法はデッドロックが発生する可能性がある!
sem_t chopstick[5];

void *philosopher(void *arg) {
    int id = *(int *)arg;

    while (1) {
        printf("哲学者 %d: 考え中\n", id);
        usleep(rand() % 1000000);

        // 両側の箸を取る
        sem_wait(&chopstick[id]);           // 左
        sem_wait(&chopstick[(id + 1) % 5]); // 右

        printf("哲学者 %d: 食事中\n", id);
        usleep(rand() % 1000000);

        // 箸を置く
        sem_post(&chopstick[id]);
        sem_post(&chopstick[(id + 1) % 5]);
    }
}

// デッドロック:5人全員が同時に左の箸を取ると
// 誰も右の箸を取れない!

デッドロックを防止する解法

// 解法 1: 非対称 - 偶数哲学者は左から、奇数は右から取る
void *philosopher_asymmetric(void *arg) {
    int id = *(int *)arg;

    while (1) {
        printf("哲学者 %d: 考え中\n", id);

        if (id % 2 == 0) {
            sem_wait(&chopstick[id]);
            sem_wait(&chopstick[(id + 1) % 5]);
        } else {
            sem_wait(&chopstick[(id + 1) % 5]);
            sem_wait(&chopstick[id]);
        }

        printf("哲学者 %d: 食事中\n", id);
        usleep(rand() % 1000000);

        sem_post(&chopstick[id]);
        sem_post(&chopstick[(id + 1) % 5]);
    }
}

モニターベースの解法

// モニターベースの解法(Pthreads条件変数を使用)
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>

#define N 5
#define THINKING 0
#define HUNGRY   1
#define EATING   2

int state[N];
pthread_mutex_t monitor_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t self[N];

// 左右の隣人インデックス
#define LEFT(i)  ((i + N - 1) % N)
#define RIGHT(i) ((i + 1) % N)

void test(int i) {
    // 自分が空腹で、両隣が食事中でなければ食べられる
    if (state[i] == HUNGRY &&
        state[LEFT(i)] != EATING &&
        state[RIGHT(i)] != EATING) {
        state[i] = EATING;
        pthread_cond_signal(&self[i]);
    }
}

void pickup(int i) {
    pthread_mutex_lock(&monitor_mutex);

    state[i] = HUNGRY;
    printf("哲学者 %d: 空腹\n", i);
    test(i);  // すぐに食べられるか確認

    while (state[i] != EATING) {
        pthread_cond_wait(&self[i], &monitor_mutex);
    }

    printf("哲学者 %d: 食事開始\n", i);
    pthread_mutex_unlock(&monitor_mutex);
}

void putdown(int i) {
    pthread_mutex_lock(&monitor_mutex);

    state[i] = THINKING;
    printf("哲学者 %d: 食事完了、考え始める\n", i);

    // 隣人が食べられるか確認
    test(LEFT(i));
    test(RIGHT(i));

    pthread_mutex_unlock(&monitor_mutex);
}

void *philosopher(void *arg) {
    int id = *(int *)arg;
    while (1) {
        usleep(rand() % 1000000);  // 考える
        pickup(id);                 // 箸を取る
        usleep(rand() % 1000000);  // 食事
        putdown(id);                // 箸を置く
    }
}

POSIX同期

POSIXミューテックス

#include <pthread.h>

pthread_mutex_t lock;

// 静的初期化
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

// 動的初期化
pthread_mutex_init(&lock, NULL);

// 使用
pthread_mutex_lock(&lock);
// クリティカルセクション
pthread_mutex_unlock(&lock);

// ノンブロッキング試行
if (pthread_mutex_trylock(&lock) == 0) {
    // ロック獲得成功
    pthread_mutex_unlock(&lock);
} else {
    // 他のスレッドが保有中
}

// クリーンアップ
pthread_mutex_destroy(&lock);

POSIXセマフォ

POSIXは名前付き(named)セマフォと名前なし(unnamed)セマフォを提供する。

#include <semaphore.h>

// 名前なしセマフォ(スレッド間)
sem_t sem;
sem_init(&sem, 0, 1);  // 0: スレッド間、初期値1
sem_wait(&sem);
sem_post(&sem);
sem_destroy(&sem);

// 名前付きセマフォ(プロセス間)
sem_t *sem = sem_open("/my_sem", O_CREAT, 0644, 1);
sem_wait(sem);
sem_post(sem);
sem_close(sem);
sem_unlink("/my_sem");

POSIX条件変数

#include <pthread.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int data_ready = 0;

// 生産者
void *producer(void *arg) {
    pthread_mutex_lock(&mutex);

    // データ生成
    data_ready = 1;
    printf("生産者:データ準備完了\n");

    pthread_cond_signal(&cond);    // 待機中の消費者を起こす
    pthread_mutex_unlock(&mutex);
    return NULL;
}

// 消費者
void *consumer(void *arg) {
    pthread_mutex_lock(&mutex);

    while (!data_ready) {
        // 条件が満たされるまで待機
        // 待機中はmutexが自動的に解放される
        pthread_cond_wait(&cond, &mutex);
    }

    printf("消費者:データ消費\n");
    data_ready = 0;

    pthread_mutex_unlock(&mutex);
    return NULL;
}

Java同期

synchronizedキーワード

public class Counter {
    private int count = 0;

    // メソッドレベルの同期
    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }

    // ブロックレベルの同期
    public void incrementBlock() {
        synchronized (this) {
            count++;
        }
    }
}

ReentrantLock

synchronizedよりも柔軟なロックメカニズムである。

import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class BoundedBufferLock {
    private final Object[] buffer;
    private int count = 0, in = 0, out = 0;

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBufferLock(int size) {
        buffer = new Object[size];
    }

    public void produce(Object item) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length) {
                notFull.await();    // バッファが空くまで待機
            }
            buffer[in] = item;
            in = (in + 1) % buffer.length;
            count++;
            notEmpty.signal();      // 消費者を起こす
        } finally {
            lock.unlock();
        }
    }

    public Object consume() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();   // データが来るまで待機
            }
            Object item = buffer[out];
            out = (out + 1) % buffer.length;
            count--;
            notFull.signal();       // 生産者を起こす
            return item;
        } finally {
            lock.unlock();
        }
    }
}

代替アプローチ

トランザクショナルメモリ(Transactional Memory)

データベースのトランザクション概念をメモリアクセスに適用する。

// 概念的コード(実際の文法は実装により異なる)
atomic {
    // このブロック内のすべてのメモリ操作はアトミックに実行
    // 衝突発生時は自動的にリトライ
    account1.balance -= amount;
    account2.balance += amount;
}

関数型プログラミング

不変(immutable)データ構造を使用すれば共有状態が変更されないため、同期が不要になる。Erlang、Scala、Haskellなどの関数型言語がこのアプローチを活用している。


まとめ

有界バッファ、読者-書者、食事する哲学者問題は同期の核心的な課題を示す古典的な例題である。POSIXとJavaはミューテックス、セマフォ、条件変数など様々な同期ツールを提供する。トランザクショナルメモリや関数型プログラミングなどの代替アプローチも並行性問題を解決する方法として注目されている。