Skip to content
Published on

[オペレーティングシステム] 04. スレッドと並行プログラミング

Authors

スレッドの概念

スレッド(Thread)はCPU利用の基本単位である。1つのプロセス内で複数のスレッドが実行でき、各スレッドは固有のスレッドID、プログラムカウンタ、レジスタセット、スタックを持つ。同じプロセスのスレッド同士はコード、データ、ファイルなどのリソースを共有する。

[シングルスレッド vs マルチスレッドプロセス]

シングルスレッドプロセス:         マルチスレッドプロセス:

+------------------+          +------------------+
| コード | データ | ファイル|          | コード | データ | ファイル|
+------------------+          +------------------+
| レジスタ | スタック  |          | レジスタ | レジスタ | レジスタ |
+------------------+          | スタック  | スタック  | スタック  |
                              | スレッド1 | スレッド2 | スレッド3 |
                              +----------------------------------+

スレッド使用の動機

  1. 応答性(Responsiveness): GUIアプリケーションで1つのスレッドがユーザー入力を処理する間、別のスレッドが計算を実行
  2. リソース共有(Resource Sharing): 同じプロセスのスレッドはメモリとリソースを自動的に共有
  3. 経済性(Economy): プロセス生成よりスレッド生成の方がはるかに軽く速い。コンテキストスイッチも高速
  4. スケーラビリティ(Scalability): マルチコアアーキテクチャでスレッドを並列実行して性能向上
[Webサーバーのマルチスレッド構造]

                     +-> [ワーカースレッド 1] -> リクエスト処理
クライアントリクエスト ---> [メインスレッド] -+-> [ワーカースレッド 2] -> リクエスト処理
                     +-> [ワーカースレッド 3] -> リクエスト処理

各ワーカースレッドが個別のクライアントリクエストを処理
コード、データ(キャッシュなど)はすべてのスレッドが共有

マルチコアプログラミング

マルチコアシステムでの並列処理

マルチコアシステムではスレッドが実際に並列実行できる。

[シングルコア vs マルチコアでのスレッド実行]

シングルコア(並行性 - Concurrency):
時間 -->
コア1: [T1][T2][T3][T1][T2][T3][T1]...
        交互に実行(タイムシェアリング)

マルチコア(並列性 - Parallelism):
時間 -->
コア1: [T1][T1][T1][T1][T1]...
コア2: [T2][T2][T2][T2][T2]...
コア3: [T3][T3][T3][T3][T3]...
        同時に実行

アムダールの法則(Amdahl's Law)

プログラムに順次的にしか実行できない部分があると、コアをいくら追加しても速度向上に限界がある。

[アムダールの法則]

速度向上 = 1 / (S + (1 - S) / N)

S = 順次実行比率
N = プロセッサ(コア)数

例:プログラムの25%が順次的(S = 0.25
コア数(N)  |  速度向上
-----------+-----------
    1      |   1.00    2      |   1.60    4      |   2.29    8      |   2.91   16      |   3.20   無限大   |   4.00倍(理論的最大値)

順次部分が25%ならコアをいくら追加しても4倍が限界!

マルチコアプログラミングの課題

  1. タスク分割: 独立して実行可能なタスクを識別
  2. バランス: 各スレッドに均等な量の作業を配分
  3. データ分割: データを個別のコアで使用できるように分割
  4. データ依存性: 共有データに対する同期が必要
  5. テストとデバッグ: 非決定的な実行パスによる困難

マルチスレッドモデル

ユーザースレッドとカーネルスレッド

  • ユーザースレッド: ユーザーレベルのライブラリで管理。カーネルは存在を知らない
  • カーネルスレッド: OSカーネルが直接管理。カーネルがスケジューリング
[マルチスレッドモデル]

1. 多対一(Many-to-One):
   ユーザースレッド: T1 T2 T3 T4
                    \  |  |  /
   カーネルスレッド:   K1
   - 1つがブロックすると全体がブロック
   - マルチコア活用不可

2. 一対一(One-to-One):
   ユーザースレッド: T1  T2  T3  T4
                    |   |   |   |
   カーネルスレッド: K1  K2  K3  K4
   - より高い並列性
   - スレッド数に制限がある可能性
   - Linux、Windowsが使用

3. 多対多(Many-to-Many):
   ユーザースレッド: T1 T2 T3 T4 T5
                    \  | X  |  /
   カーネルスレッド: K1  K2  K3
   - 柔軟なマッピング
   - 実装が複雑

現代のほとんどのOSは一対一モデルを使用する。Linuxはclone()システムコールでカーネルスレッドを生成する。


スレッドライブラリ

Pthreads

POSIX標準のスレッドAPIである。Linux、macOSで使用する。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NUM_THREADS 5

// スレッド関数
void *thread_func(void *arg) {
    int thread_id = *(int *)arg;
    long sum = 0;

    // 各スレッドが異なる範囲の合計を計算
    long start = thread_id * 1000000L + 1;
    long end = (thread_id + 1) * 1000000L;

    for (long i = start; i <= end; i++) {
        sum += i;
    }

    printf("スレッド %d: %ldから%ldまでの合計 = %ld\n",
           thread_id, start, end, sum);

    long *result = malloc(sizeof(long));
    *result = sum;
    return (void *)result;
}

int main() {
    pthread_t threads[NUM_THREADS];
    int thread_ids[NUM_THREADS];
    long total = 0;

    // スレッド生成
    for (int i = 0; i < NUM_THREADS; i++) {
        thread_ids[i] = i;
        int rc = pthread_create(&threads[i], NULL,
                                thread_func, &thread_ids[i]);
        if (rc) {
            fprintf(stderr, "スレッド生成失敗: %d\n", rc);
            exit(EXIT_FAILURE);
        }
    }

    // すべてのスレッドの終了を待ち結果を収集
    for (int i = 0; i < NUM_THREADS; i++) {
        void *result;
        pthread_join(threads[i], &result);
        total += *(long *)result;
        free(result);
    }

    printf("合計: %ld\n", total);
    return 0;
}
# コンパイル時に-pthreadフラグが必要
gcc -pthread thread_sum.c -o thread_sum

Javaスレッド

JavaではThreadクラスの継承またはRunnableインターフェースの実装でスレッドを生成する。

// Runnableインターフェースを使用したマルチスレッド合算
public class ThreadSum {

    static long[] partialSums;

    static class SumTask implements Runnable {
        private final int threadId;
        private final long start;
        private final long end;

        SumTask(int threadId, long start, long end) {
            this.threadId = threadId;
            this.start = start;
            this.end = end;
        }

        @Override
        public void run() {
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            partialSums[threadId] = sum;
            System.out.printf("スレッド %d: %d~%d 合計 = %d%n",
                              threadId, start, end, sum);
        }
    }

    public static void main(String[] args) throws Exception {
        int numThreads = 5;
        partialSums = new long[numThreads];
        Thread[] threads = new Thread[numThreads];

        for (int i = 0; i < numThreads; i++) {
            long start = i * 1_000_000L + 1;
            long end = (i + 1) * 1_000_000L;
            threads[i] = new Thread(new SumTask(i, start, end));
            threads[i].start();
        }

        // すべてのスレッドの終了を待つ
        for (Thread t : threads) {
            t.join();
        }

        long total = 0;
        for (long s : partialSums) {
            total += s;
        }
        System.out.println("合計: " + total);
    }
}

暗黙的スレッディング(Implicit Threading)

マルチスレッドプログラミングは難しくエラーが発生しやすい。暗黙的スレッディングはスレッドの生成と管理をコンパイラやランタイムライブラリに委ねるアプローチである。

スレッドプール(Thread Pool)

スレッドを事前に複数生成しておき、タスクが来たらアイドルスレッドに割り当てる。

// Javaのスレッドプール使用例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.util.ArrayList;
import java.util.List;

public class ThreadPoolExample {

    static class SumTask implements Callable<Long> {
        private final long start, end;

        SumTask(long start, long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        public Long call() {
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }
    }

    public static void main(String[] args) throws Exception {
        // CPUコア数分のスレッドプールを生成
        int cores = Runtime.getRuntime().availableProcessors();
        ExecutorService pool = Executors.newFixedThreadPool(cores);

        List<Future<Long>> futures = new ArrayList<>();

        // タスクの提出
        for (int i = 0; i < 10; i++) {
            long start = i * 1_000_000L + 1;
            long end = (i + 1) * 1_000_000L;
            futures.add(pool.submit(new SumTask(start, end)));
        }

        // 結果の収集
        long total = 0;
        for (Future<Long> f : futures) {
            total += f.get();
        }

        System.out.println("合計: " + total);
        pool.shutdown();
    }
}

スレッドプールの利点は以下の通りである。

  • 既存のスレッドを再利用するため生成オーバーヘッドが減少
  • 同時スレッド数を制限してシステムリソースを保護
  • タスクの生成と実行を分離して柔軟なスケジューリングが可能

Fork-Joinフレームワーク

大きなタスクを小さなサブタスクに分割(fork)し、結果を結合する(join)パターンである。

[Fork-Join動作]

       [全体タスク: 1~1000]
              |
        fork /  \ fork
            /    \
  [1~500]      [500+1~1000]
     |                |
  fork/ \fork      fork/ \fork
   /    \           /    \
[1~250][250+1  [500+1 [750+1
         ~500]  ~750]  ~1000]
   \    /           \    /
  join\ /join      join\ /join
     |                |
  [部分和1]         [部分和2]
        \    /
      join\ /join
           |
       [全体合計]
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

public class ForkJoinSum extends RecursiveTask<Long> {
    private static final long THRESHOLD = 100_000;
    private final long start, end;

    ForkJoinSum(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 基本ケース:直接計算
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }

        // 再帰的分割
        long mid = (start + end) / 2;
        ForkJoinSum left = new ForkJoinSum(start, mid);
        ForkJoinSum right = new ForkJoinSum(mid + 1, end);

        left.fork();              // 左を非同期実行
        long rightResult = right.compute();  // 右を現在のスレッドで実行
        long leftResult = left.join();       // 左の結果を待つ

        return leftResult + rightResult;
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        long result = pool.invoke(new ForkJoinSum(1, 10_000_000));
        System.out.println("結果: " + result);
    }
}

OpenMP

コンパイラ指示文(pragma)を使用してC/C++コードを並列化する。

#include <stdio.h>
#include <omp.h>

int main() {
    long sum = 0;
    int n = 10000000;

    // OpenMP並列forループ
    // コンパイラが自動的にイテレーションをスレッドに分配
    #pragma omp parallel for reduction(+:sum)
    for (int i = 1; i <= n; i++) {
        sum += i;
    }

    printf("合計: %ld\n", sum);
    printf("使用スレッド数: %d\n", omp_get_max_threads());

    return 0;
}
# OpenMPコンパイル
gcc -fopenmp parallel_sum.c -o parallel_sum

# スレッド数を指定して実行
OMP_NUM_THREADS=4 ./parallel_sum

OpenMPの主な指示文は以下の通りである。

  • parallel: 並列領域の開始
  • for: ループの並列化
  • critical: クリティカルセクションの指定
  • atomic: アトミック操作
  • reduction: リダクション操作(和、積など)

スレッド関連の問題

fork()とexec()のセマンティクス

マルチスレッドプロセスでfork()を呼び出すと2つの選択肢がある。

  • すべてのスレッドをコピーするfork
  • 呼び出したスレッドのみをコピーするfork(ほとんどのUnix実装)

exec()を呼び出すとすべてのスレッドが新しいプログラムに置き換わる。

シグナル処理

シグナルはどのスレッドに配信すべきか?

  1. シグナルが該当するスレッドに配信
  2. すべてのスレッドに配信
  3. 特定のスレッドに配信
  4. 特定のスレッドをシグナル受信専用に指定

スレッドキャンセル

実行中のスレッドを早期終了させることで、2つの方式がある。

  • 非同期キャンセル: 即座に対象スレッドを終了(リソースリークのリスク)
  • 遅延キャンセル: 対象スレッドが定期的にキャンセル要求を確認(安全)

まとめ

スレッドはプロセス内で実行フローを分ける軽量な単位である。マルチコアシステムで真の並列実行が可能だが、アムダールの法則により順次部分がボトルネックになる。スレッドプール、Fork-Join、OpenMPなどの暗黙的スレッディング技法はプログラマの負担を軽減しながら効率的な並列処理を可能にする。