Skip to content
Published on

並行性 & 並列プログラミング完全ガイド 2025: Async/Await、Thread、Goroutine、Actorモデル

Authors

目次(もくじ)

1. 並行性(へいこうせい) vs 並列性(へいれつせい):Rob Pikeの区別(くべつ)

"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." -- Rob Pike

この一文が並行性と並列性の核心的な違いを要約しています。

1.1 概念的(がいねんてき)な区別

並行性(Concurrency) はプログラムの構造的特性です。複数のタスクを論理的に同時に扱えるようにプログラムを設計することです。実際に同時に実行されるかは重要ではありません。

並列性(Parallelism)実行の特性です。複数のタスクが物理的に同時に実行されることを意味します。マルチコアCPUで複数のスレッドが同時に動作するのが代表的です。

並行性 (Concurrency) - 構造
┌─────────────────────────────────────────┐
Task A: ██░░██░░░░██░░                │
Task B: ░░██░░██░░░░██                │
Task C: ░░░░░░░░██░░░░██              │
│         ─────────────────→ 時間         │
  (シングルコアで交互に実行)└─────────────────────────────────────────┘

並列性 (Parallelism) - 実行
┌─────────────────────────────────────────┐
Core 1: ████████████████               │
Core 2: ████████████████               │
Core 3: ████████████████               │
│         ─────────────────→ 時間         │
  (マルチコアで同時に実行)└─────────────────────────────────────────┘

1.2 実生活の例(たと)え

状況並行性並列性
料理人1人が3つの料理を交互に作るOX
料理人3人がそれぞれ1つずつ同時に作るOO
料理人1人が1つの料理だけ作るXX

1.3 なぜ区別が重要か

並行性なしに並列性だけを追求すると複雑度が爆発します。正しい並行性構造をまず設計し、その後並列実行でパフォーマンスを引き出すのが正道です。

# 並行性構造 (asyncio) - シングルスレッドでも効率的
import asyncio

async def fetch_data(url: str) -> str:
    print(f"Fetching {url}...")
    await asyncio.sleep(1)  # I/O待機シミュレーション
    return f"Data from {url}"

async def main():
    # 並行性:3つのリクエストを構造的に同時に扱う
    results = await asyncio.gather(
        fetch_data("https://api.example.com/users"),
        fetch_data("https://api.example.com/posts"),
        fetch_data("https://api.example.com/comments"),
    )
    for r in results:
        print(r)

asyncio.run(main())
# 合計約1秒(3秒ではない) - 並行性の力

2. スレッディングモデル:OSスレッドから仮想(かそう)スレッドまで

2.1 OSスレッド(カーネルスレッド)

オペレーティングシステムが直接管理するスレッドです。生成コストが高く(約1MBスタック)、コンテキストスイッチングオーバーヘッドが存在します。

// Java - OSスレッド
public class ThreadExample {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Thread 1: " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Thread 2: " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("Both threads completed");
    }
}

2.2 ユーザースレッド(グリーンスレッド)

ランタイムが管理する軽量スレッドです。OSスレッドの上に多数のユーザースレッドをマッピングします。

スレッドモデル比較
┌───────────────────────────────────────────────┐
1:1 (OSスレッド)   N:1 (グリーンスレッド)│ ┌──┐ ┌──┐ ┌──┐    ┌──┐┌──┐┌──┐┌──┐┌──┐     │
│ │UT│ │UT│ │UT│    │GT││GT││GT││GT││GT│     │
│ └┬─┘ └┬─┘ └┬─┘    └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘     │
│  │    │    │        └──┬──┘  │  └──┬──┘       │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐       ┌┴─┐  │     │          │
│ │KT│ │KT│ │KT│       │KT│  │     │          │
│ └──┘ └──┘ └──┘       └──┘  │     │          │
│                              │     │          │
M:N (ハイブリッド)│ ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐                    │
│ │GT││GT││GT││GT││GT││GT│                    │
│ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘└┬─┘                    │
│  └─┬─┘ └──┬──┘ └──┬──┘                       │
│   ┌┴─┐   ┌┴─┐   ┌┴─┐                         │
│   │KT│   │KT│   │KT│                         │
│   └──┘   └──┘   └──┘                         │
└───────────────────────────────────────────────┘

2.3 Java 21 仮想スレッド(Virtual Thread)

Java 21で正式導入された仮想スレッドはM:Nモデルを実装します。数百万の仮想スレッドを生成できます。

// Java 21 Virtual Threads
import java.util.concurrent.Executors;
import java.time.Duration;

public class VirtualThreadDemo {
    public static void main(String[] args) throws Exception {
        // 100万の仮想スレッドを生成可能!
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < 1_000_000; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    try {
                        Thread.sleep(Duration.ofSeconds(1));
                        if (taskId % 100_000 == 0) {
                            System.out.println("Task " + taskId + " done on " +
                                Thread.currentThread());
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
        }
        System.out.println("All tasks completed");
    }
}

2.4 スレッドモデル比較

特性OSスレッドグリーンスレッド仮想スレッド(Java 21)
スタックサイズ約1MB約KB数KB(動的)
生成コスト高い(syscall)低い非常に低い
同時数数千数百万数百万
プリエンプションOSプリエンプティブ協調的協調的(I/O時に譲渡)
I/Oブロッキングスレッドブロック全体ブロックの危険自動アンマウント

3. コルーチン:軽量(けいりょう)並行性の核心

3.1 Stackful vs Stackless コルーチン

Stackful Coroutine(例:Lua、Goのgoroutine)
┌──────────────────────────────────┐
│ 自前のスタックを持つ              │
│ スタックのどこでも一時中断可能     │
│ メモリ使用量が高い               │
│ ネストした関数呼び出し中でも譲渡可 │
└──────────────────────────────────┘

Stackless Coroutine(例:Python、Kotlin、Rust)
┌──────────────────────────────────┐
│ 状態をヒープに保存               │
│ 最上位でのみ一時中断可能          │
│ メモリ使用量が低い               │
│ コンパイラがステートマシンに変換   │
└──────────────────────────────────┘

3.2 Python asyncioコルーチン

import asyncio
import aiohttp
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    """非同期HTTPリクエスト"""
    async with session.get(url) as response:
        return {
            "url": url,
            "status": response.status,
            "length": len(await response.text()),
        }

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
    ]

    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        # 5つのリクエストを同時に実行
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - start
    print(f"5つのリクエスト完了: {elapsed:.2f}秒")  # 約1秒(5秒ではない)

    for r in results:
        print(f"  {r['url']}: status={r['status']}, length={r['length']}")

asyncio.run(main())

3.3 Kotlinコルーチン

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

suspend fun fetchData(id: Int): String {
    delay(1000) // 非同期待機(スレッドブロッキングなし)
    return "Result-$id"
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        // 構造化された並行性:coroutineScopeが全子コルーチンを管理
        coroutineScope {
            val results = (1..10).map { id ->
                async(Dispatchers.IO) {
                    fetchData(id)
                }
            }
            results.awaitAll().forEach { println(it) }
        }
    }
    println("Completed in ${time}ms") // 約1000ms
}

4. GoroutineとGoの並行性モデル

4.1 Goroutine内部構造

Goランタイムは M:Nスケジューリングを使用します。G(Goroutine)、M(Machine/OSスレッド)、P(Processor)の3つの概念が核心です。

Goランタイムスケジューラ(GMPモデル)
┌─────────────────────────────────────────────┐
│                                             │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐      │
│  │ G1   │ │ G2   │ │ G3   │ │ G4...│  └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘      │
│     │        │        │        │            │
│  ┌──┴────────┴──┐  ┌──┴────────┴──┐        │
│  │   P (Local   │  │   P (Local   │        │
│  │    Queue)    │  │    Queue)    │        │
│  └──────┬───────┘  └──────┬───────┘        │
│         │                 │                 │
│  ┌──────┴───────┐  ┌──────┴───────┐        │
│  │ M (OSスレッド)│  │ M (OSスレッド)│        │
│  └──────────────┘  └──────────────┘        │
│                                             │
Global Run Queue: [G5, G6, G7, ...]└─────────────────────────────────────────────┘

GOMAXPROCS = Pの数(デフォルト:CPUコア数)

4.2 Goroutine基本使用法

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg) // goroutine生成 - 約2KBスタック
    }

    wg.Wait()
    fmt.Println("All workers completed")
}

4.3 ワークスティーリング

Pのローカルキューが空になると、他のPのキューからgoroutineを盗みます。これにより負荷を均等に分配します。

ワークスティーリングプロセス
┌─────────────┐     ┌─────────────┐
P1 (busy)   │     │ P2 (idle)[G1,G2,G3]  │ ──→ │ []│             │steal│             │
[G1,G2]    │ ──→ │ [G3]└─────────────┘     └─────────────┘

5. Async/Awaitパターン比較

5.1 JavaScript PromiseとAsync/Await

// Promiseチェーン vs Async/Await
// Promiseチェーン方式
function fetchUserData(userId) {
  return fetch(`/api/users/${userId}`)
    .then(response => response.json())
    .then(user => fetch(`/api/posts?author=${user.id}`))
    .then(response => response.json())
    .then(posts => {
      return { user, posts };
    })
    .catch(error => {
      console.error('Error:', error);
      throw error;
    });
}

// Async/Await方式(はるかに読みやすい)
async function fetchUserDataAsync(userId) {
  try {
    const userResponse = await fetch(`/api/users/${userId}`);
    const user = await userResponse.json();

    const postsResponse = await fetch(`/api/posts?author=${user.id}`);
    const posts = await postsResponse.json();

    return { user, posts };
  } catch (error) {
    console.error('Error:', error);
    throw error;
  }
}

// 並列実行
async function fetchMultipleUsers(userIds) {
  const promises = userIds.map(id => fetchUserDataAsync(id));
  return Promise.all(promises); // 全て並列実行
}

// Promise.allSettled - 一部失敗しても続行
async function fetchWithPartialFailure(urls) {
  const results = await Promise.allSettled(
    urls.map(url => fetch(url).then(r => r.json()))
  );

  const successes = results
    .filter(r => r.status === 'fulfilled')
    .map(r => r.value);
  const failures = results
    .filter(r => r.status === 'rejected')
    .map(r => r.reason);

  return { successes, failures };
}

5.2 Rust Async

use tokio;
use reqwest;
use futures::future::join_all;

#[derive(Debug)]
struct ApiResponse {
    url: String,
    status: u16,
    body_length: usize,
}

async fn fetch_url(client: &reqwest::Client, url: &str) -> Result<ApiResponse, reqwest::Error> {
    let response = client.get(url).send().await?;
    let status = response.status().as_u16();
    let body = response.text().await?;

    Ok(ApiResponse {
        url: url.to_string(),
        status,
        body_length: body.len(),
    })
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();
    let urls = vec![
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
    ];

    // 全リクエストを同時に実行
    let futures: Vec<_> = urls.iter()
        .map(|url| fetch_url(&client, url))
        .collect();

    let results = join_all(futures).await;

    for result in results {
        match result {
            Ok(resp) => println!("{}: status={}, length={}", resp.url, resp.status, resp.body_length),
            Err(e) => eprintln!("Error: {}", e),
        }
    }

    Ok(())
}

5.3 Async/Await言語比較

特性JavaScriptPythonRustC#Kotlin
ランタイムV8 Event Loopasynciotokio/async-stdCLR Thread PoolDispatchers
Future型PromiseCoroutineFuture(遅延)TaskDeferred
キャンセルAbortControllerTask.cancel()dropCancellationTokenJob.cancel()
ストリームAsyncIteratorasync forStream traitIAsyncEnumerableFlow
エラー処理try/catchtry/exceptResulttry/catchtry/catch
並列実行Promise.allgatherjoin!Task.WhenAllawaitAll
ゼロコストXXOXX

6. イベントループ深層分析(しんそうぶんせき)

6.1 Node.js libuvイベントループ

Node.js イベントループフェーズ
┌───────────────────────────────────┐
│         ┌──────────────┐          │
│    ┌────│   timers     │────┐     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │ pending I/O  │    │     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │  idle/prepare │    │     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │    poll       │◄───┘    │
│    │    └──────────────┘         │
│    │    ┌──────────────┐         │
│    │    │    check      │         │
│    │     (setImmediate)│         │
│    │    └──────────────┘         │
│    │    ┌──────────────┐         │
│    └────│close callbacks│─────────│
│         └──────────────┘         │
│                                   │
│  各フェーズ間: microtaskキュー    │
  (Promise, queueMicrotask)└───────────────────────────────────┘
// イベントループの実行順序を理解する
console.log('1. 同期コード');

setTimeout(() => console.log('2. setTimeout (timers)'), 0);

setImmediate(() => console.log('3. setImmediate (check)'));

Promise.resolve().then(() => console.log('4. Promise (microtask)'));

process.nextTick(() => console.log('5. nextTick (microtask - 最優先)'));

console.log('6. 同期コード終了');

// 出力順序:
// 1. 同期コード
// 6. 同期コード終了
// 5. nextTick (microtask - 最優先)
// 4. Promise (microtask)
// 2. setTimeout (timers)
// 3. setImmediate (check)

6.2 Python asyncioイベントループ

import asyncio
import time

async def cpu_bound_simulation(name: str, duration: float):
    """CPU-boundタスクシミュレーション"""
    print(f"[{name}] 開始")
    await asyncio.sleep(duration)
    print(f"[{name}] 完了 ({duration}秒)")
    return f"{name}: done"

async def producer(queue: asyncio.Queue, n: int):
    """プロデューサー:キューにアイテム追加"""
    for i in range(n):
        await asyncio.sleep(0.1)
        item = f"item-{i}"
        await queue.put(item)
        print(f"  Produced: {item}")
    await queue.put(None)  # 終了シグナル

async def consumer(queue: asyncio.Queue, name: str):
    """コンシューマー:キューからアイテム消費"""
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)
            break
        await asyncio.sleep(0.2)
        print(f"  {name} consumed: {item}")
        queue.task_done()

async def main():
    # 1. 同時実行
    start = time.perf_counter()
    results = await asyncio.gather(
        cpu_bound_simulation("TaskA", 1.0),
        cpu_bound_simulation("TaskB", 0.5),
        cpu_bound_simulation("TaskC", 0.8),
    )
    elapsed = time.perf_counter() - start
    print(f"合計時間: {elapsed:.2f}秒")  # 約1秒
    print(f"結果: {results}")

    # 2. プロデューサー-コンシューマーパターン
    print("\n--- プロデューサー-コンシューマー ---")
    queue = asyncio.Queue(maxsize=5)
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
    )

asyncio.run(main())

7. Actorモデル:共有状態(きょうゆうじょうたい)のない並行性

7.1 Actorモデルの概念

Actorモデル
┌─────────────────────────────────────────────────┐
│                                                 │
│  ┌─────────┐ メッセージ ┌─────────┐             │
│  │ Actor A │ ───────→  │ Actor B │             │
│  │ [State] │           │ [State] │             │
│  │ [Mailbox][Mailbox]│  └─────────┘           └────┬────┘             │
│       ↑                     │                   │
│       │    メッセージ        │ 新Actor生成       │
│       └─────────────────────┘                   │
│                              ↓                   │
│                        ┌─────────┐              │
│                        │ Actor C │              │
│                        │ [State] │              │
│                        └─────────┘              │
│                                                 │
│  ルール:                                       │
1. メッセージのみで通信(共有状態なし)          │
2. 一度に1つのメッセージのみ処理               │
3. 他のActorを生成可能                         │
4. 自分の状態のみ変更可能                       │
└─────────────────────────────────────────────────┘

7.2 Erlang/Elixir Actor

%% Erlang - カウンターActor
-module(counter).
-export([start/0, increment/1, get/1, loop/1]).

start() ->
    spawn(fun() -> loop(0) end).

increment(Pid) ->
    Pid ! {increment, 1}.

get(Pid) ->
    Pid ! {get, self()},
    receive
        {count, Value} -> Value
    after 5000 ->
        timeout
    end.

loop(Count) ->
    receive
        {increment, N} ->
            loop(Count + N);
        {get, From} ->
            From ! {count, Count},
            loop(Count);
        stop ->
            ok
    end.

%% 使用例:
%% Pid = counter:start().
%% counter:increment(Pid).
%% counter:increment(Pid).
%% counter:get(Pid). %% => 2

7.3 Akka (Scala) Actor

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors

// メッセージ定義
sealed trait Command
case class Deposit(amount: Double) extends Command
case class Withdraw(amount: Double) extends Command
case class GetBalance(replyTo: ActorRef[Balance]) extends Command
case class Balance(amount: Double)

// 銀行口座Actor
object BankAccount {
  def apply(balance: Double = 0.0): Behavior[Command] =
    Behaviors.receive { (context, message) =>
      message match {
        case Deposit(amount) =>
          val newBalance = balance + amount
          context.log.info(s"Deposited $amount, balance: $newBalance")
          BankAccount(newBalance)

        case Withdraw(amount) =>
          if (amount <= balance) {
            val newBalance = balance - amount
            context.log.info(s"Withdrew $amount, balance: $newBalance")
            BankAccount(newBalance)
          } else {
            context.log.warn(s"Insufficient funds: $balance < $amount")
            Behaviors.same
          }

        case GetBalance(replyTo) =>
          replyTo ! Balance(balance)
          Behaviors.same
      }
    }
}

7.4 Actorモデルの長所と短所

長所短所
共有状態なし(競合状態防止)デバッグが難しい場合がある
位置透過性(分散可能)メッセージ順序保証が限定的
障害分離(let it crash)メールボックスオーバーフローの可能性
スケーラビリティが良い同期リクエスト-レスポンスが複雑

8. CSP:Goのチャネルベース並行性

8.1 チャネル基本

package main

import (
    "fmt"
    "time"
)

func main() {
    // バッファなしチャネル(同期)
    ch := make(chan string)

    go func() {
        time.Sleep(time.Second)
        ch <- "Hello from goroutine!"
    }()

    msg := <-ch // メッセージが来るまでブロック
    fmt.Println(msg)

    // バッファ付きチャネル(非同期)
    buffered := make(chan int, 3)
    buffered <- 1
    buffered <- 2
    buffered <- 3
    // buffered <- 4 // バッファが一杯でブロック!

    fmt.Println(<-buffered) // 1
    fmt.Println(<-buffered) // 2
}

8.2 Fan-In / Fan-Outパターン

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Fan-Out:1つの入力を複数ワーカーに分配
func fanOut(input <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan int)
        channels[i] = ch
        go func(workerID int, out chan<- int) {
            defer close(out)
            for val := range input {
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                out <- val * val
                fmt.Printf("  Worker %d processed %d\n", workerID, val)
            }
        }(i, ch)
    }
    return channels
}

// Fan-In:複数チャネルの結果を1つに合流
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                merged <- val
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 20; i++ {
            input <- i
        }
    }()

    // Fan-Out:4ワーカー
    workerChannels := fanOut(input, 4)

    // Fan-In:結果合流
    results := fanIn(workerChannels...)

    var sum int
    for result := range results {
        sum += result
    }
    fmt.Printf("Sum of squares: %d\n", sum)
}

8.3 Select文

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from channel 1"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from channel 2"
    }()

    // select:準備できたチャネルを選択
    for i := 0; i < 2; i++ {
        select {
        case msg := <-ch1:
            fmt.Println("Received", msg)
        case msg := <-ch2:
            fmt.Println("Received", msg)
        case <-time.After(3 * time.Second):
            fmt.Println("Timeout!")
        }
    }

    // ノンブロッキングselect
    ch := make(chan int, 1)
    select {
    case val := <-ch:
        fmt.Println("Got:", val)
    default:
        fmt.Println("Channel empty, moving on")
    }
}

9. 同期プリミティブ

9.1 MutexとRWLock

package main

import (
    "fmt"
    "sync"
)

// Mutex:相互排除
type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.v[key]++
}

func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.v[key]
}

// RWMutex:読み取りは同時に、書き込みは排他的
type SafeCache struct {
    mu    sync.RWMutex
    store map[string]string
}

func (c *SafeCache) Get(key string) (string, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    val, ok := c.store[key]
    return val, ok
}

func (c *SafeCache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.store[key] = value
}

func main() {
    counter := SafeCounter{v: make(map[string]int)}
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc("key")
        }()
    }
    wg.Wait()
    fmt.Println("Final count:", counter.Value("key")) // 正確に1000
}

9.2 セマフォ

import asyncio

async def worker(sem: asyncio.Semaphore, task_id: int):
    async with sem:  # 最大3つまで同時実行
        print(f"Task {task_id}: 開始(スロット取得)")
        await asyncio.sleep(1)
        print(f"Task {task_id}: 完了(スロット返却)")

async def main():
    sem = asyncio.Semaphore(3)  # 同時実行を3つに制限
    tasks = [worker(sem, i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

9.3 アトミック操作(そうさ)

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class AtomicDemo {
    // AtomicLong - ロックフリーカウンター
    private static final AtomicLong counter = new AtomicLong(0);

    // AtomicReference - ロックフリー参照更新
    private static final AtomicReference<String> config =
        new AtomicReference<>("default");

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10_000; j++) {
                    counter.incrementAndGet(); // アトミック増加
                }
            });
            threads[i].start();
        }

        for (Thread t : threads) t.join();
        System.out.println("Counter: " + counter.get()); // 正確に100,000

        boolean updated = config.compareAndSet("default", "production");
        System.out.println("Config updated: " + updated); // true
        System.out.println("Config value: " + config.get()); // production
    }
}

10. Lock-freeデータ構造

10.1 CAS(Compare-And-Swap)

CASはlock-freeプログラミングの基礎です。「現在の値が期待値と同じなら新しい値に置き換える」アトミック操作です。

CAS動作原理
┌──────────────────────────────────────┐
CAS(address, expected, new_value)│                                      │
if *address == expected:*address = new_value             │
return true   // 成功            │
else:return false  // 失敗 → リトライ │
│                                      │
 (全体がハードウェアレベルでアトミック)└──────────────────────────────────────┘

10.2 ABA問題

ABA問題
Thread 1:Aを読む  (一時停止)
Thread 2: ABに変更
Thread 3: BAに変更
Thread 1: (再開) CAS(A, A, C) → 成功!(しかしAは元のAではない)

解決: バージョンカウンター追加
AtomicStampedReference (Java)
TaggedPointer (C/C++)

10.3 Lock-freeスタック(Treiber Stack)

import java.util.concurrent.atomic.AtomicReference;

public class LockFreeStack<T> {
    private static class Node<T> {
        final T value;
        Node<T> next;

        Node(T value) {
            this.value = value;
        }
    }

    private final AtomicReference<Node<T>> top = new AtomicReference<>(null);

    public void push(T value) {
        Node<T> newNode = new Node<>(value);
        Node<T> oldTop;
        do {
            oldTop = top.get();
            newNode.next = oldTop;
        } while (!top.compareAndSet(oldTop, newNode)); // CASループ
    }

    public T pop() {
        Node<T> oldTop;
        Node<T> newTop;
        do {
            oldTop = top.get();
            if (oldTop == null) return null;
            newTop = oldTop.next;
        } while (!top.compareAndSet(oldTop, newTop)); // CASループ
        return oldTop.value;
    }
}

11. 並行性バグパターンと防止(ぼうし)

11.1 デッドロック

デッドロック条件(4つ全て充足時に発生)
1. 相互排除(Mutual Exclusion)
2. 保持待ち(Hold and Wait)
3. 非プリエンプション(No Preemption)
4. 循環待ち(Circular Wait)

┌──────────┐          ┌──────────┐
Thread A │──Lock1──→│ Thread B│          │←──Lock2──│          │
 (Lock2 (Lock1│  待機中)  │          │  待機中)└──────────┘          └──────────┘
import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

# デッドロック発生コード
def thread_1():
    with lock_a:
        print("Thread 1: Lock A取得")
        time.sleep(0.1)
        with lock_b:  # Lock B待ち → デッドロック!
            print("Thread 1: Lock B取得")

def thread_2():
    with lock_b:
        print("Thread 2: Lock B取得")
        time.sleep(0.1)
        with lock_a:  # Lock A待ち → デッドロック!
            print("Thread 2: Lock A取得")

# 解決: ロック順序を統一
def thread_1_fixed():
    with lock_a:     # 常にA → B順序
        print("Thread 1: Lock A取得")
        time.sleep(0.1)
        with lock_b:
            print("Thread 1: Lock B取得")

def thread_2_fixed():
    with lock_a:     # 常にA → B順序(循環待ち防止)
        print("Thread 2: Lock A取得")
        time.sleep(0.1)
        with lock_b:
            print("Thread 2: Lock B取得")

11.2 レースコンディション

package main

import (
    "fmt"
    "sync"
)

func main() {
    // レースコンディション例
    counter := 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // DATA RACE! 読み-変更-書きがアトミックではない
        }()
    }
    wg.Wait()
    fmt.Println("Counter (unsafe):", counter) // 1000未満の可能性

    // 解決1: Mutex
    counter = 0
    var mu sync.Mutex
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counter++
            mu.Unlock()
        }()
    }
    wg.Wait()
    fmt.Println("Counter (mutex):", counter) // 正確に1000
}

11.3 飢餓状態(きがじょうたい)と優先度逆転

飢餓状態 (Starvation)
- 特定のスレッドがリソースにアクセスできず無限待機
- 解決: 公平なロック (Fair Lock)、優先度エイジング

優先度逆転 (Priority Inversion)
┌─────────────────────────────────────┐
│ 高優先度タスク    ← ブロックされる   │
| (Lock待ち)│ 中優先度タスク    ← 実行中          │
| (プリエンプト)│ 低優先度タスク    ← Lock保持│                                     │
│ 解決: Priority Inheritance Protocol│ 低優先度タスクがLock保持時に         │
│ 高優先度を一時的に継承              │
└─────────────────────────────────────┘

12. 構造化された並行性(Structured Concurrency)

12.1 概念

構造化された並行性は、並行タスクの寿命をコードブロック(スコープ)に結び付けるパターンです。スコープ終了時に全子タスクの完了(またはキャンセル)を保証します。

非構造化並行性               構造化された並行性
┌────────────────┐         ┌────────────────┐
│ start task A   │         │ scope {│ start task B   │         │   task A... (漏れ?)    │         │   task B│ forget task A? │         │ } // A,B完了   │
└────────────────┘         └────────────────┘

12.2 Python TaskGroup(3.11+)

import asyncio

async def fetch(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    if name == "failing":
        raise ValueError(f"{name} failed!")
    return f"{name}: done"

async def main():
    # TaskGroup: 構造化された並行性
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(fetch("api-1", 0.5))
            task2 = tg.create_task(fetch("api-2", 1.0))
            task3 = tg.create_task(fetch("api-3", 0.8))
        # ここに到達時、全タスク完了保証
        print(task1.result(), task2.result(), task3.result())
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"Error: {exc}")

asyncio.run(main())

12.3 Java 21 Structured Concurrency

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class StructuredConcurrencyDemo {
    record User(String name) {}
    record Order(String id) {}

    static User fetchUser(String userId) throws Exception {
        Thread.sleep(500);
        return new User("Alice");
    }

    static Order fetchOrder(String orderId) throws Exception {
        Thread.sleep(800);
        return new Order("ORD-123");
    }

    public static void main(String[] args) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Future<User> userFuture = scope.fork(() -> fetchUser("user-1"));
            Future<Order> orderFuture = scope.fork(() -> fetchOrder("order-1"));

            scope.join();
            scope.throwIfFailed();

            User user = userFuture.resultNow();
            Order order = orderFuture.resultNow();
            System.out.printf("User: %s, Order: %s%n", user.name(), order.id());
        }
    }
}

13. 言語別(げんごべつ)並行性モデル比較

次元GoRustJava 21PythonKotlinJavaScriptErlang/ElixirC#
モデルCSPOwnership + asyncVirtual ThreadasyncioCoroutineEvent LoopActorTask
軽量単位goroutineFutureVirtual ThreadcoroutinecoroutinePromiseprocessTask
スケジューリングM:N (GMP)ランタイム依存M:N1:1(シングル)M:N1:1(シングル)プリエンプティブM:NM:N
共有状態channelownershipsynchronizedasyncio.LockMutexなし(シングル)なし(メッセージ)lock
エラー伝播panic/recoverResult例外例外例外rejectlink/monitor例外
キャンセルcontextdropinterruptcancelJob.cancelAbortControllerexitCancelToken
構造化並行性errgroup限定的StructuredTaskScopeTaskGroupcoroutineScopeなしsupervisorなし
CPU-boundgoroutinethread/rayonthreadmultiprocessingDispatchers.DefaultWorker ThreadprocessParallel.ForEach
メモリ安全性GCコンパイル時GCGCGCGCGCGC
デッドロック防止チャネル設計コンパイル時注意必要注意必要注意必要該当なしメッセージベース注意必要
パフォーマンス高い非常に高い高い普通高いI/Oのみ高い(BEAM)高い
学習曲線低い高い普通低い普通低い普通普通

14. 実践パターン:レートリミッター

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Token Bucket Rate Limiter
type RateLimiter struct {
    tokens     chan struct{}
    refillRate time.Duration
    ctx        context.Context
    cancel     context.CancelFunc
}

func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
    ctx, cancel := context.WithCancel(context.Background())
    rl := &RateLimiter{
        tokens:     make(chan struct{}, maxTokens),
        refillRate: refillRate,
        ctx:        ctx,
        cancel:     cancel,
    }

    for i := 0; i < maxTokens; i++ {
        rl.tokens <- struct{}{}
    }

    go func() {
        ticker := time.NewTicker(refillRate)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case rl.tokens <- struct{}{}:
                default:
                }
            case <-ctx.Done():
                return
            }
        }
    }()

    return rl
}

func (rl *RateLimiter) Acquire(ctx context.Context) error {
    select {
    case <-rl.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (rl *RateLimiter) Close() {
    rl.cancel()
}

func main() {
    limiter := NewRateLimiter(5, 200*time.Millisecond)
    defer limiter.Close()

    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
            defer cancel()

            if err := limiter.Acquire(ctx); err != nil {
                fmt.Printf("Request %d: timed out\n", id)
                return
            }
            fmt.Printf("Request %d: processed at %s\n", id,
                time.Now().Format("15:04:05.000"))
        }(i)
    }
    wg.Wait()
}

15. パフォーマンス最適化(さいてきか)ヒント

15.1 適切な並行性モデル選択

ワークロードタイプ別選択

I/O-bound(ネットワーク、ディスク):
  → async/await、coroutine、goroutine
  → イベントループベースが効率的
  → スレッドプールは過度なコンテキストスイッチを引き起こす

CPU-bound(計算、変換):
OSスレッド(Python: multiprocessing)
Rust: rayonクレート
Go: goroutine(自動並列化)

混合:
  → 別々のスレッドプール/ディスパッチャ分離
Kotlin: Dispatchers.IO vs Dispatchers.Default
Python: asyncio + ProcessPoolExecutor

15.2 コンテキストスイッチング最小化

import asyncio
import concurrent.futures
import time

# CPU-boundタスクを別プロセスで実行
def cpu_heavy_task(n: int) -> int:
    """素数計算 (CPU-bound)"""
    count = 0
    for i in range(2, n):
        if all(i % j != 0 for j in range(2, int(i**0.5) + 1)):
            count += 1
    return count

async def main():
    loop = asyncio.get_event_loop()

    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
        start = time.perf_counter()
        results = await asyncio.gather(
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
        )
        elapsed = time.perf_counter() - start
        print(f"4 tasks completed in {elapsed:.2f}s: {results}")

asyncio.run(main())

16. クイズ

Q1. 並行性と並列性の核心的な違いは?

並行性はプログラムの構造的特性で、複数のタスクを論理的に同時に扱える設計です。並列性は実行の特性で、複数のタスクが物理的に同時に実行されることです。並行性はシングルコアでも可能ですが、並列性にはマルチコアが必要です。

Q2. GoroutineのGMPモデルでG、M、Pはそれぞれ何か?
  • G(Goroutine):実行するタスク単位。軽量スレッド。
  • M(Machine):OSスレッド。実際の実行を担当。
  • P(Processor):論理プロセッサ。GをMにマッピングするスケジューラ。GOMAXPROCSで数を決定。

Pはローカルrunキューを持ち、ワークスティーリングで負荷を均等分配します。

Q3. デッドロックが発生するには4つの条件全てが充足される必要がある。その4つは?
  1. 相互排除(Mutual Exclusion):リソースを一度に1つのプロセスのみが使用
  2. 保持待ち(Hold and Wait):リソースを保持したまま他のリソースを待機
  3. 非プリエンプション(No Preemption):割り当てられたリソースを強制的に奪えない
  4. 循環待ち(Circular Wait):プロセス間で循環形態の待機関係

1つでも破れればデッドロックは発生しません。最も一般的な解決策はロック順序を統一して循環待ちを防止することです。

Q4. ActorモデルとCSPの核心的な違いは?
  • Actorモデル:非同期メッセージ受け渡し。各Actorが固有のメールボックスを持つ。受信者を直接指定。Erlang、Akka。
  • CSP:同期チャネル通信。チャネルが独立したエンティティ。送信者と受信者が互いを知らない。Goのgoroutine + channel。

Actorは「誰に送るか」を知り、CSPは「どのチャネルに送るか」を知るだけです。

Q5. 構造化された並行性が解決する問題は?

構造化された並行性は以下の問題を解決します:

  1. タスク漏れ(Task Leaking):fire-and-forgetで開始したタスクが忘れられる問題
  2. エラー伝播:子タスクのエラーが親に伝達されない問題
  3. キャンセル伝播:親がキャンセルされても子が実行し続ける問題
  4. ライフタイム管理:スコープ終了後もタスクが実行し続ける問題

Java 21のStructuredTaskScope、PythonのTaskGroup、KotlinのcoroutineScopeが代表的です。


17. 参考資料(さんこうしりょう)

  1. Rob Pike - "Concurrency is not Parallelism" (Go Blog)
  2. Java 21 Virtual Threads - JEP 444
  3. Java 21 Structured Concurrency - JEP 453
  4. Python asyncio公式ドキュメント
  5. Go公式ドキュメント - Effective Go (Concurrency)
  6. Kotlin Coroutines公式ガイド
  7. Rust async-book (The Async Book)
  8. Akka Documentation - Actor Model
  9. Erlang/OTP - Processes
  10. "The Art of Multiprocessor Programming" - Maurice Herlihy, Nir Shavit
  11. "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 8)
  12. "Concurrency in Go" - Katherine Cox-Buday (O'Reilly)
  13. tokio.rs - Rust非同期ランタイム
  14. libuv - Node.jsイベントループ実装