目次(もくじ)
1. 並行性(へいこうせい) vs 並列性(へいれつせい):Rob Pikeの区別(くべつ)
"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." -- Rob Pike
この一文が並行性と並列性の核心的な違いを要約しています。
1.1 概念的(がいねんてき)な区別
並行性(Concurrency) はプログラムの構造的特性です。複数のタスクを論理的に同時に扱えるようにプログラムを設計することです。実際に同時に実行されるかは重要ではありません。
並列性(Parallelism) は実行の特性です。複数のタスクが物理的に同時に実行されることを意味します。マルチコアCPUで複数のスレッドが同時に動作するのが代表的です。
並行性 (Concurrency) - 構造
┌─────────────────────────────────────────┐
│ Task A: ██░░██░░░░██░░ │
│ Task B: ░░██░░██░░░░██ │
│ Task C: ░░░░░░░░██░░░░██ │
│ ─────────────────→ 時間 │
│ (シングルコアで交互に実行) │
└─────────────────────────────────────────┘
並列性 (Parallelism) - 実行
┌─────────────────────────────────────────┐
│ Core 1: ████████████████ │
│ Core 2: ████████████████ │
│ Core 3: ████████████████ │
│ ─────────────────→ 時間 │
│ (マルチコアで同時に実行) │
└─────────────────────────────────────────┘
1.2 実生活の例(たと)え
| 状況 | 並行性 | 並列性 |
|---|---|---|
| 料理人1人が3つの料理を交互に作る | O | X |
| 料理人3人がそれぞれ1つずつ同時に作る | O | O |
| 料理人1人が1つの料理だけ作る | X | X |
1.3 なぜ区別が重要か
並行性なしに並列性だけを追求すると複雑度が爆発します。正しい並行性構造をまず設計し、その後並列実行でパフォーマンスを引き出すのが正道です。
# 並行性構造 (asyncio) - シングルスレッドでも効率的
import asyncio
async def fetch_data(url: str) -> str:
print(f"Fetching {url}...")
await asyncio.sleep(1) # I/O待機シミュレーション
return f"Data from {url}"
async def main():
# 並行性:3つのリクエストを構造的に同時に扱う
results = await asyncio.gather(
fetch_data("https://api.example.com/users"),
fetch_data("https://api.example.com/posts"),
fetch_data("https://api.example.com/comments"),
)
for r in results:
print(r)
asyncio.run(main())
# 合計約1秒(3秒ではない) - 並行性の力
2. スレッディングモデル:OSスレッドから仮想(かそう)スレッドまで
2.1 OSスレッド(カーネルスレッド)
オペレーティングシステムが直接管理するスレッドです。生成コストが高く(約1MBスタック)、コンテキストスイッチングオーバーヘッドが存在します。
// Java - OSスレッド
public class ThreadExample {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread 1: " + i);
try { Thread.sleep(100); } catch (InterruptedException e) { break; }
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread 2: " + i);
try { Thread.sleep(100); } catch (InterruptedException e) { break; }
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Both threads completed");
}
}
2.2 ユーザースレッド(グリーンスレッド)
ランタイムが管理する軽量スレッドです。OSスレッドの上に多数のユーザースレッドをマッピングします。
スレッドモデル比較
┌───────────────────────────────────────────────┐
│ 1:1 (OSスレッド) N:1 (グリーンスレッド) │
│ ┌──┐ ┌──┐ ┌──┐ ┌──┐┌──┐┌──┐┌──┐┌──┐ │
│ │UT│ │UT│ │UT│ │GT││GT││GT││GT││GT│ │
│ └┬─┘ └┬─┘ └┬─┘ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘ │
│ │ │ │ └──┬──┘ │ └──┬──┘ │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐ ┌┴─┐ │ │ │
│ │KT│ │KT│ │KT│ │KT│ │ │ │
│ └──┘ └──┘ └──┘ └──┘ │ │ │
│ │ │ │
│ M:N (ハイブリッド) │
│ ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐ │
│ │GT││GT││GT││GT││GT││GT│ │
│ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘└┬─┘ │
│ └─┬─┘ └──┬──┘ └──┬──┘ │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐ │
│ │KT│ │KT│ │KT│ │
│ └──┘ └──┘ └──┘ │
└───────────────────────────────────────────────┘
2.3 Java 21 仮想スレッド(Virtual Thread)
Java 21で正式導入された仮想スレッドはM:Nモデルを実装します。数百万の仮想スレッドを生成できます。
// Java 21 Virtual Threads
import java.util.concurrent.Executors;
import java.time.Duration;
public class VirtualThreadDemo {
public static void main(String[] args) throws Exception {
// 100万の仮想スレッドを生成可能!
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 1_000_000; i++) {
final int taskId = i;
executor.submit(() -> {
try {
Thread.sleep(Duration.ofSeconds(1));
if (taskId % 100_000 == 0) {
System.out.println("Task " + taskId + " done on " +
Thread.currentThread());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
System.out.println("All tasks completed");
}
}
2.4 スレッドモデル比較
| 特性 | OSスレッド | グリーンスレッド | 仮想スレッド(Java 21) |
|---|---|---|---|
| スタックサイズ | 約1MB | 約KB | 数KB(動的) |
| 生成コスト | 高い(syscall) | 低い | 非常に低い |
| 同時数 | 数千 | 数百万 | 数百万 |
| プリエンプション | OSプリエンプティブ | 協調的 | 協調的(I/O時に譲渡) |
| I/Oブロッキング | スレッドブロック | 全体ブロックの危険 | 自動アンマウント |
3. コルーチン:軽量(けいりょう)並行性の核心
3.1 Stackful vs Stackless コルーチン
Stackful Coroutine(例:Lua、Goのgoroutine)
┌──────────────────────────────────┐
│ 自前のスタックを持つ │
│ スタックのどこでも一時中断可能 │
│ メモリ使用量が高い │
│ ネストした関数呼び出し中でも譲渡可 │
└──────────────────────────────────┘
Stackless Coroutine(例:Python、Kotlin、Rust)
┌──────────────────────────────────┐
│ 状態をヒープに保存 │
│ 最上位でのみ一時中断可能 │
│ メモリ使用量が低い │
│ コンパイラがステートマシンに変換 │
└──────────────────────────────────┘
3.2 Python asyncioコルーチン
import asyncio
import aiohttp
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""非同期HTTPリクエスト"""
async with session.get(url) as response:
return {
"url": url,
"status": response.status,
"length": len(await response.text()),
}
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
# 5つのリクエストを同時に実行
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"5つのリクエスト完了: {elapsed:.2f}秒") # 約1秒(5秒ではない)
for r in results:
print(f" {r['url']}: status={r['status']}, length={r['length']}")
asyncio.run(main())
3.3 Kotlinコルーチン
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
suspend fun fetchData(id: Int): String {
delay(1000) // 非同期待機(スレッドブロッキングなし)
return "Result-$id"
}
fun main() = runBlocking {
val time = measureTimeMillis {
// 構造化された並行性:coroutineScopeが全子コルーチンを管理
coroutineScope {
val results = (1..10).map { id ->
async(Dispatchers.IO) {
fetchData(id)
}
}
results.awaitAll().forEach { println(it) }
}
}
println("Completed in ${time}ms") // 約1000ms
}
4. GoroutineとGoの並行性モデル
4.1 Goroutine内部構造
Goランタイムは M:Nスケジューリングを使用します。G(Goroutine)、M(Machine/OSスレッド)、P(Processor)の3つの概念が核心です。
Goランタイムスケジューラ(GMPモデル)
┌─────────────────────────────────────────────┐
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ G1 │ │ G2 │ │ G3 │ │ G4 │ ... │
│ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │
│ │ │ │ │ │
│ ┌──┴────────┴──┐ ┌──┴────────┴──┐ │
│ │ P (Local │ │ P (Local │ │
│ │ Queue) │ │ Queue) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ┌──────┴───────┐ ┌──────┴───────┐ │
│ │ M (OSスレッド)│ │ M (OSスレッド)│ │
│ └──────────────┘ └──────────────┘ │
│ │
│ Global Run Queue: [G5, G6, G7, ...] │
└─────────────────────────────────────────────┘
GOMAXPROCS = Pの数(デフォルト:CPUコア数)
4.2 Goroutine基本使用法
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg) // goroutine生成 - 約2KBスタック
}
wg.Wait()
fmt.Println("All workers completed")
}
4.3 ワークスティーリング
Pのローカルキューが空になると、他のPのキューからgoroutineを盗みます。これにより負荷を均等に分配します。
ワークスティーリングプロセス
┌─────────────┐ ┌─────────────┐
│ P1 (busy) │ │ P2 (idle) │
│ [G1,G2,G3] │ ──→ │ [] │
│ │steal│ │
│ [G1,G2] │ ──→ │ [G3] │
└─────────────┘ └─────────────┘
5. Async/Awaitパターン比較
5.1 JavaScript PromiseとAsync/Await
// Promiseチェーン vs Async/Await
// Promiseチェーン方式
function fetchUserData(userId) {
return fetch(`/api/users/${userId}`)
.then(response => response.json())
.then(user => fetch(`/api/posts?author=${user.id}`))
.then(response => response.json())
.then(posts => {
return { user, posts };
})
.catch(error => {
console.error('Error:', error);
throw error;
});
}
// Async/Await方式(はるかに読みやすい)
async function fetchUserDataAsync(userId) {
try {
const userResponse = await fetch(`/api/users/${userId}`);
const user = await userResponse.json();
const postsResponse = await fetch(`/api/posts?author=${user.id}`);
const posts = await postsResponse.json();
return { user, posts };
} catch (error) {
console.error('Error:', error);
throw error;
}
}
// 並列実行
async function fetchMultipleUsers(userIds) {
const promises = userIds.map(id => fetchUserDataAsync(id));
return Promise.all(promises); // 全て並列実行
}
// Promise.allSettled - 一部失敗しても続行
async function fetchWithPartialFailure(urls) {
const results = await Promise.allSettled(
urls.map(url => fetch(url).then(r => r.json()))
);
const successes = results
.filter(r => r.status === 'fulfilled')
.map(r => r.value);
const failures = results
.filter(r => r.status === 'rejected')
.map(r => r.reason);
return { successes, failures };
}
5.2 Rust Async
use tokio;
use reqwest;
use futures::future::join_all;
#[derive(Debug)]
struct ApiResponse {
url: String,
status: u16,
body_length: usize,
}
async fn fetch_url(client: &reqwest::Client, url: &str) -> Result<ApiResponse, reqwest::Error> {
let response = client.get(url).send().await?;
let status = response.status().as_u16();
let body = response.text().await?;
Ok(ApiResponse {
url: url.to_string(),
status,
body_length: body.len(),
})
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let urls = vec![
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
];
// 全リクエストを同時に実行
let futures: Vec<_> = urls.iter()
.map(|url| fetch_url(&client, url))
.collect();
let results = join_all(futures).await;
for result in results {
match result {
Ok(resp) => println!("{}: status={}, length={}", resp.url, resp.status, resp.body_length),
Err(e) => eprintln!("Error: {}", e),
}
}
Ok(())
}
5.3 Async/Await言語比較
| 特性 | JavaScript | Python | Rust | C# | Kotlin |
|---|---|---|---|---|---|
| ランタイム | V8 Event Loop | asyncio | tokio/async-std | CLR Thread Pool | Dispatchers |
| Future型 | Promise | Coroutine | Future(遅延) | Task | Deferred |
| キャンセル | AbortController | Task.cancel() | drop | CancellationToken | Job.cancel() |
| ストリーム | AsyncIterator | async for | Stream trait | IAsyncEnumerable | Flow |
| エラー処理 | try/catch | try/except | Result | try/catch | try/catch |
| 並列実行 | Promise.all | gather | join! | Task.WhenAll | awaitAll |
| ゼロコスト | X | X | O | X | X |
6. イベントループ深層分析(しんそうぶんせき)
6.1 Node.js libuvイベントループ
Node.js イベントループフェーズ
┌───────────────────────────────────┐
│ ┌──────────────┐ │
│ ┌────│ timers │────┐ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ pending I/O │ │ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ idle/prepare │ │ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ poll │◄───┘ │
│ │ └──────────────┘ │
│ │ ┌──────────────┐ │
│ │ │ check │ │
│ │ │ (setImmediate)│ │
│ │ └──────────────┘ │
│ │ ┌──────────────┐ │
│ └────│close callbacks│─────────│
│ └──────────────┘ │
│ │
│ 各フェーズ間: microtaskキュー │
│ (Promise, queueMicrotask) │
└───────────────────────────────────┘
// イベントループの実行順序を理解する
console.log('1. 同期コード');
setTimeout(() => console.log('2. setTimeout (timers)'), 0);
setImmediate(() => console.log('3. setImmediate (check)'));
Promise.resolve().then(() => console.log('4. Promise (microtask)'));
process.nextTick(() => console.log('5. nextTick (microtask - 最優先)'));
console.log('6. 同期コード終了');
// 出力順序:
// 1. 同期コード
// 6. 同期コード終了
// 5. nextTick (microtask - 最優先)
// 4. Promise (microtask)
// 2. setTimeout (timers)
// 3. setImmediate (check)
6.2 Python asyncioイベントループ
import asyncio
import time
async def cpu_bound_simulation(name: str, duration: float):
"""CPU-boundタスクシミュレーション"""
print(f"[{name}] 開始")
await asyncio.sleep(duration)
print(f"[{name}] 完了 ({duration}秒)")
return f"{name}: done"
async def producer(queue: asyncio.Queue, n: int):
"""プロデューサー:キューにアイテム追加"""
for i in range(n):
await asyncio.sleep(0.1)
item = f"item-{i}"
await queue.put(item)
print(f" Produced: {item}")
await queue.put(None) # 終了シグナル
async def consumer(queue: asyncio.Queue, name: str):
"""コンシューマー:キューからアイテム消費"""
while True:
item = await queue.get()
if item is None:
await queue.put(None)
break
await asyncio.sleep(0.2)
print(f" {name} consumed: {item}")
queue.task_done()
async def main():
# 1. 同時実行
start = time.perf_counter()
results = await asyncio.gather(
cpu_bound_simulation("TaskA", 1.0),
cpu_bound_simulation("TaskB", 0.5),
cpu_bound_simulation("TaskC", 0.8),
)
elapsed = time.perf_counter() - start
print(f"合計時間: {elapsed:.2f}秒") # 約1秒
print(f"結果: {results}")
# 2. プロデューサー-コンシューマーパターン
print("\n--- プロデューサー-コンシューマー ---")
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue, 10),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
)
asyncio.run(main())
7. Actorモデル:共有状態(きょうゆうじょうたい)のない並行性
7.1 Actorモデルの概念
Actorモデル
┌─────────────────────────────────────────────────┐
│ │
│ ┌─────────┐ メッセージ ┌─────────┐ │
│ │ Actor A │ ───────→ │ Actor B │ │
│ │ [State] │ │ [State] │ │
│ │ [Mailbox] │ [Mailbox] │
│ └─────────┘ └────┬────┘ │
│ ↑ │ │
│ │ メッセージ │ 新Actor生成 │
│ └─────────────────────┘ │
│ ↓ │
│ ┌─────────┐ │
│ │ Actor C │ │
│ │ [State] │ │
│ └─────────┘ │
│ │
│ ルール: │
│ 1. メッセージのみで通信(共有状態なし) │
│ 2. 一度に1つのメッセージのみ処理 │
│ 3. 他のActorを生成可能 │
│ 4. 自分の状態のみ変更可能 │
└─────────────────────────────────────────────────┘
7.2 Erlang/Elixir Actor
%% Erlang - カウンターActor
-module(counter).
-export([start/0, increment/1, get/1, loop/1]).
start() ->
spawn(fun() -> loop(0) end).
increment(Pid) ->
Pid ! {increment, 1}.
get(Pid) ->
Pid ! {get, self()},
receive
{count, Value} -> Value
after 5000 ->
timeout
end.
loop(Count) ->
receive
{increment, N} ->
loop(Count + N);
{get, From} ->
From ! {count, Count},
loop(Count);
stop ->
ok
end.
%% 使用例:
%% Pid = counter:start().
%% counter:increment(Pid).
%% counter:increment(Pid).
%% counter:get(Pid). %% => 2
7.3 Akka (Scala) Actor
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
// メッセージ定義
sealed trait Command
case class Deposit(amount: Double) extends Command
case class Withdraw(amount: Double) extends Command
case class GetBalance(replyTo: ActorRef[Balance]) extends Command
case class Balance(amount: Double)
// 銀行口座Actor
object BankAccount {
def apply(balance: Double = 0.0): Behavior[Command] =
Behaviors.receive { (context, message) =>
message match {
case Deposit(amount) =>
val newBalance = balance + amount
context.log.info(s"Deposited $amount, balance: $newBalance")
BankAccount(newBalance)
case Withdraw(amount) =>
if (amount <= balance) {
val newBalance = balance - amount
context.log.info(s"Withdrew $amount, balance: $newBalance")
BankAccount(newBalance)
} else {
context.log.warn(s"Insufficient funds: $balance < $amount")
Behaviors.same
}
case GetBalance(replyTo) =>
replyTo ! Balance(balance)
Behaviors.same
}
}
}
7.4 Actorモデルの長所と短所
| 長所 | 短所 |
|---|---|
| 共有状態なし(競合状態防止) | デバッグが難しい場合がある |
| 位置透過性(分散可能) | メッセージ順序保証が限定的 |
| 障害分離(let it crash) | メールボックスオーバーフローの可能性 |
| スケーラビリティが良い | 同期リクエスト-レスポンスが複雑 |
8. CSP:Goのチャネルベース並行性
8.1 チャネル基本
package main
import (
"fmt"
"time"
)
func main() {
// バッファなしチャネル(同期)
ch := make(chan string)
go func() {
time.Sleep(time.Second)
ch <- "Hello from goroutine!"
}()
msg := <-ch // メッセージが来るまでブロック
fmt.Println(msg)
// バッファ付きチャネル(非同期)
buffered := make(chan int, 3)
buffered <- 1
buffered <- 2
buffered <- 3
// buffered <- 4 // バッファが一杯でブロック!
fmt.Println(<-buffered) // 1
fmt.Println(<-buffered) // 2
}
8.2 Fan-In / Fan-Outパターン
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Fan-Out:1つの入力を複数ワーカーに分配
func fanOut(input <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
ch := make(chan int)
channels[i] = ch
go func(workerID int, out chan<- int) {
defer close(out)
for val := range input {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
out <- val * val
fmt.Printf(" Worker %d processed %d\n", workerID, val)
}
}(i, ch)
}
return channels
}
// Fan-In:複数チャネルの結果を1つに合流
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
merged <- val
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 20; i++ {
input <- i
}
}()
// Fan-Out:4ワーカー
workerChannels := fanOut(input, 4)
// Fan-In:結果合流
results := fanIn(workerChannels...)
var sum int
for result := range results {
sum += result
}
fmt.Printf("Sum of squares: %d\n", sum)
}
8.3 Select文
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "from channel 1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "from channel 2"
}()
// select:準備できたチャネルを選択
for i := 0; i < 2; i++ {
select {
case msg := <-ch1:
fmt.Println("Received", msg)
case msg := <-ch2:
fmt.Println("Received", msg)
case <-time.After(3 * time.Second):
fmt.Println("Timeout!")
}
}
// ノンブロッキングselect
ch := make(chan int, 1)
select {
case val := <-ch:
fmt.Println("Got:", val)
default:
fmt.Println("Channel empty, moving on")
}
}
9. 同期プリミティブ
9.1 MutexとRWLock
package main
import (
"fmt"
"sync"
)
// Mutex:相互排除
type SafeCounter struct {
mu sync.Mutex
v map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.v[key]++
}
func (c *SafeCounter) Value(key string) int {
c.mu.Lock()
defer c.mu.Unlock()
return c.v[key]
}
// RWMutex:読み取りは同時に、書き込みは排他的
type SafeCache struct {
mu sync.RWMutex
store map[string]string
}
func (c *SafeCache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
val, ok := c.store[key]
return val, ok
}
func (c *SafeCache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.store[key] = value
}
func main() {
counter := SafeCounter{v: make(map[string]int)}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Inc("key")
}()
}
wg.Wait()
fmt.Println("Final count:", counter.Value("key")) // 正確に1000
}
9.2 セマフォ
import asyncio
async def worker(sem: asyncio.Semaphore, task_id: int):
async with sem: # 最大3つまで同時実行
print(f"Task {task_id}: 開始(スロット取得)")
await asyncio.sleep(1)
print(f"Task {task_id}: 完了(スロット返却)")
async def main():
sem = asyncio.Semaphore(3) # 同時実行を3つに制限
tasks = [worker(sem, i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
9.3 アトミック操作(そうさ)
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicDemo {
// AtomicLong - ロックフリーカウンター
private static final AtomicLong counter = new AtomicLong(0);
// AtomicReference - ロックフリー参照更新
private static final AtomicReference<String> config =
new AtomicReference<>("default");
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10_000; j++) {
counter.incrementAndGet(); // アトミック増加
}
});
threads[i].start();
}
for (Thread t : threads) t.join();
System.out.println("Counter: " + counter.get()); // 正確に100,000
boolean updated = config.compareAndSet("default", "production");
System.out.println("Config updated: " + updated); // true
System.out.println("Config value: " + config.get()); // production
}
}
10. Lock-freeデータ構造
10.1 CAS(Compare-And-Swap)
CASはlock-freeプログラミングの基礎です。「現在の値が期待値と同じなら新しい値に置き換える」アトミック操作です。
CAS動作原理
┌──────────────────────────────────────┐
│ CAS(address, expected, new_value) │
│ │
│ if *address == expected: │
│ *address = new_value │
│ return true // 成功 │
│ else: │
│ return false // 失敗 → リトライ │
│ │
│ (全体がハードウェアレベルでアトミック) │
└──────────────────────────────────────┘
10.2 ABA問題
ABA問題
Thread 1: 値Aを読む → (一時停止)
Thread 2: A → Bに変更
Thread 3: B → Aに変更
Thread 1: (再開) CAS(A, A, C) → 成功!(しかしAは元のAではない)
解決: バージョンカウンター追加
AtomicStampedReference (Java)
TaggedPointer (C/C++)
10.3 Lock-freeスタック(Treiber Stack)
import java.util.concurrent.atomic.AtomicReference;
public class LockFreeStack<T> {
private static class Node<T> {
final T value;
Node<T> next;
Node(T value) {
this.value = value;
}
}
private final AtomicReference<Node<T>> top = new AtomicReference<>(null);
public void push(T value) {
Node<T> newNode = new Node<>(value);
Node<T> oldTop;
do {
oldTop = top.get();
newNode.next = oldTop;
} while (!top.compareAndSet(oldTop, newNode)); // CASループ
}
public T pop() {
Node<T> oldTop;
Node<T> newTop;
do {
oldTop = top.get();
if (oldTop == null) return null;
newTop = oldTop.next;
} while (!top.compareAndSet(oldTop, newTop)); // CASループ
return oldTop.value;
}
}
11. 並行性バグパターンと防止(ぼうし)
11.1 デッドロック
デッドロック条件(4つ全て充足時に発生)
1. 相互排除(Mutual Exclusion)
2. 保持待ち(Hold and Wait)
3. 非プリエンプション(No Preemption)
4. 循環待ち(Circular Wait)
┌──────────┐ ┌──────────┐
│ Thread A │──Lock1──→│ Thread B │
│ │←──Lock2──│ │
│ (Lock2 │ │ (Lock1 │
│ 待機中) │ │ 待機中) │
└──────────┘ └──────────┘
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
# デッドロック発生コード
def thread_1():
with lock_a:
print("Thread 1: Lock A取得")
time.sleep(0.1)
with lock_b: # Lock B待ち → デッドロック!
print("Thread 1: Lock B取得")
def thread_2():
with lock_b:
print("Thread 2: Lock B取得")
time.sleep(0.1)
with lock_a: # Lock A待ち → デッドロック!
print("Thread 2: Lock A取得")
# 解決: ロック順序を統一
def thread_1_fixed():
with lock_a: # 常にA → B順序
print("Thread 1: Lock A取得")
time.sleep(0.1)
with lock_b:
print("Thread 1: Lock B取得")
def thread_2_fixed():
with lock_a: # 常にA → B順序(循環待ち防止)
print("Thread 2: Lock A取得")
time.sleep(0.1)
with lock_b:
print("Thread 2: Lock B取得")
11.2 レースコンディション
package main
import (
"fmt"
"sync"
)
func main() {
// レースコンディション例
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // DATA RACE! 読み-変更-書きがアトミックではない
}()
}
wg.Wait()
fmt.Println("Counter (unsafe):", counter) // 1000未満の可能性
// 解決1: Mutex
counter = 0
var mu sync.Mutex
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println("Counter (mutex):", counter) // 正確に1000
}
11.3 飢餓状態(きがじょうたい)と優先度逆転
飢餓状態 (Starvation)
- 特定のスレッドがリソースにアクセスできず無限待機
- 解決: 公平なロック (Fair Lock)、優先度エイジング
優先度逆転 (Priority Inversion)
┌─────────────────────────────────────┐
│ 高優先度タスク ← ブロックされる │
│ | (Lock待ち) │
│ 中優先度タスク ← 実行中 │
│ | (プリエンプト) │
│ 低優先度タスク ← Lock保持 │
│ │
│ 解決: Priority Inheritance Protocol │
│ 低優先度タスクがLock保持時に │
│ 高優先度を一時的に継承 │
└─────────────────────────────────────┘
12. 構造化された並行性(Structured Concurrency)
12.1 概念
構造化された並行性は、並行タスクの寿命をコードブロック(スコープ)に結び付けるパターンです。スコープ終了時に全子タスクの完了(またはキャンセル)を保証します。
非構造化並行性 構造化された並行性
┌────────────────┐ ┌────────────────┐
│ start task A │ │ scope { │
│ start task B │ │ task A │
│ ... (漏れ?) │ │ task B │
│ forget task A? │ │ } // A,B完了 │
└────────────────┘ └────────────────┘
12.2 Python TaskGroup(3.11+)
import asyncio
async def fetch(name: str, delay: float) -> str:
await asyncio.sleep(delay)
if name == "failing":
raise ValueError(f"{name} failed!")
return f"{name}: done"
async def main():
# TaskGroup: 構造化された並行性
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("api-1", 0.5))
task2 = tg.create_task(fetch("api-2", 1.0))
task3 = tg.create_task(fetch("api-3", 0.8))
# ここに到達時、全タスク完了保証
print(task1.result(), task2.result(), task3.result())
except* ValueError as eg:
for exc in eg.exceptions:
print(f"Error: {exc}")
asyncio.run(main())
12.3 Java 21 Structured Concurrency
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
public class StructuredConcurrencyDemo {
record User(String name) {}
record Order(String id) {}
static User fetchUser(String userId) throws Exception {
Thread.sleep(500);
return new User("Alice");
}
static Order fetchOrder(String orderId) throws Exception {
Thread.sleep(800);
return new Order("ORD-123");
}
public static void main(String[] args) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<User> userFuture = scope.fork(() -> fetchUser("user-1"));
Future<Order> orderFuture = scope.fork(() -> fetchOrder("order-1"));
scope.join();
scope.throwIfFailed();
User user = userFuture.resultNow();
Order order = orderFuture.resultNow();
System.out.printf("User: %s, Order: %s%n", user.name(), order.id());
}
}
}
13. 言語別(げんごべつ)並行性モデル比較
| 次元 | Go | Rust | Java 21 | Python | Kotlin | JavaScript | Erlang/Elixir | C# |
|---|---|---|---|---|---|---|---|---|
| モデル | CSP | Ownership + async | Virtual Thread | asyncio | Coroutine | Event Loop | Actor | Task |
| 軽量単位 | goroutine | Future | Virtual Thread | coroutine | coroutine | Promise | process | Task |
| スケジューリング | M:N (GMP) | ランタイム依存 | M:N | 1:1(シングル) | M:N | 1:1(シングル) | プリエンプティブM:N | M:N |
| 共有状態 | channel | ownership | synchronized | asyncio.Lock | Mutex | なし(シングル) | なし(メッセージ) | lock |
| エラー伝播 | panic/recover | Result | 例外 | 例外 | 例外 | reject | link/monitor | 例外 |
| キャンセル | context | drop | interrupt | cancel | Job.cancel | AbortController | exit | CancelToken |
| 構造化並行性 | errgroup | 限定的 | StructuredTaskScope | TaskGroup | coroutineScope | なし | supervisor | なし |
| CPU-bound | goroutine | thread/rayon | thread | multiprocessing | Dispatchers.Default | Worker Thread | process | Parallel.ForEach |
| メモリ安全性 | GC | コンパイル時 | GC | GC | GC | GC | GC | GC |
| デッドロック防止 | チャネル設計 | コンパイル時 | 注意必要 | 注意必要 | 注意必要 | 該当なし | メッセージベース | 注意必要 |
| パフォーマンス | 高い | 非常に高い | 高い | 普通 | 高い | I/Oのみ | 高い(BEAM) | 高い |
| 学習曲線 | 低い | 高い | 普通 | 低い | 普通 | 低い | 普通 | 普通 |
14. 実践パターン:レートリミッター
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Token Bucket Rate Limiter
type RateLimiter struct {
tokens chan struct{}
refillRate time.Duration
ctx context.Context
cancel context.CancelFunc
}
func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
ctx, cancel := context.WithCancel(context.Background())
rl := &RateLimiter{
tokens: make(chan struct{}, maxTokens),
refillRate: refillRate,
ctx: ctx,
cancel: cancel,
}
for i := 0; i < maxTokens; i++ {
rl.tokens <- struct{}{}
}
go func() {
ticker := time.NewTicker(refillRate)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case rl.tokens <- struct{}{}:
default:
}
case <-ctx.Done():
return
}
}
}()
return rl
}
func (rl *RateLimiter) Acquire(ctx context.Context) error {
select {
case <-rl.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (rl *RateLimiter) Close() {
rl.cancel()
}
func main() {
limiter := NewRateLimiter(5, 200*time.Millisecond)
defer limiter.Close()
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := limiter.Acquire(ctx); err != nil {
fmt.Printf("Request %d: timed out\n", id)
return
}
fmt.Printf("Request %d: processed at %s\n", id,
time.Now().Format("15:04:05.000"))
}(i)
}
wg.Wait()
}
15. パフォーマンス最適化(さいてきか)ヒント
15.1 適切な並行性モデル選択
ワークロードタイプ別選択
I/O-bound(ネットワーク、ディスク):
→ async/await、coroutine、goroutine
→ イベントループベースが効率的
→ スレッドプールは過度なコンテキストスイッチを引き起こす
CPU-bound(計算、変換):
→ OSスレッド(Python: multiprocessing)
→ Rust: rayonクレート
→ Go: goroutine(自動並列化)
混合:
→ 別々のスレッドプール/ディスパッチャ分離
→ Kotlin: Dispatchers.IO vs Dispatchers.Default
→ Python: asyncio + ProcessPoolExecutor
15.2 コンテキストスイッチング最小化
import asyncio
import concurrent.futures
import time
# CPU-boundタスクを別プロセスで実行
def cpu_heavy_task(n: int) -> int:
"""素数計算 (CPU-bound)"""
count = 0
for i in range(2, n):
if all(i % j != 0 for j in range(2, int(i**0.5) + 1)):
count += 1
return count
async def main():
loop = asyncio.get_event_loop()
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
start = time.perf_counter()
results = await asyncio.gather(
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
)
elapsed = time.perf_counter() - start
print(f"4 tasks completed in {elapsed:.2f}s: {results}")
asyncio.run(main())
16. クイズ
Q1. 並行性と並列性の核心的な違いは?
並行性はプログラムの構造的特性で、複数のタスクを論理的に同時に扱える設計です。並列性は実行の特性で、複数のタスクが物理的に同時に実行されることです。並行性はシングルコアでも可能ですが、並列性にはマルチコアが必要です。
Q2. GoroutineのGMPモデルでG、M、Pはそれぞれ何か?
- G(Goroutine):実行するタスク単位。軽量スレッド。
- M(Machine):OSスレッド。実際の実行を担当。
- P(Processor):論理プロセッサ。GをMにマッピングするスケジューラ。GOMAXPROCSで数を決定。
Pはローカルrunキューを持ち、ワークスティーリングで負荷を均等分配します。
Q3. デッドロックが発生するには4つの条件全てが充足される必要がある。その4つは?
- 相互排除(Mutual Exclusion):リソースを一度に1つのプロセスのみが使用
- 保持待ち(Hold and Wait):リソースを保持したまま他のリソースを待機
- 非プリエンプション(No Preemption):割り当てられたリソースを強制的に奪えない
- 循環待ち(Circular Wait):プロセス間で循環形態の待機関係
1つでも破れればデッドロックは発生しません。最も一般的な解決策はロック順序を統一して循環待ちを防止することです。
Q4. ActorモデルとCSPの核心的な違いは?
- Actorモデル:非同期メッセージ受け渡し。各Actorが固有のメールボックスを持つ。受信者を直接指定。Erlang、Akka。
- CSP:同期チャネル通信。チャネルが独立したエンティティ。送信者と受信者が互いを知らない。Goのgoroutine + channel。
Actorは「誰に送るか」を知り、CSPは「どのチャネルに送るか」を知るだけです。
Q5. 構造化された並行性が解決する問題は?
構造化された並行性は以下の問題を解決します:
- タスク漏れ(Task Leaking):fire-and-forgetで開始したタスクが忘れられる問題
- エラー伝播:子タスクのエラーが親に伝達されない問題
- キャンセル伝播:親がキャンセルされても子が実行し続ける問題
- ライフタイム管理:スコープ終了後もタスクが実行し続ける問題
Java 21のStructuredTaskScope、PythonのTaskGroup、KotlinのcoroutineScopeが代表的です。
17. 参考資料(さんこうしりょう)
- Rob Pike - "Concurrency is not Parallelism" (Go Blog)
- Java 21 Virtual Threads - JEP 444
- Java 21 Structured Concurrency - JEP 453
- Python asyncio公式ドキュメント
- Go公式ドキュメント - Effective Go (Concurrency)
- Kotlin Coroutines公式ガイド
- Rust async-book (The Async Book)
- Akka Documentation - Actor Model
- Erlang/OTP - Processes
- "The Art of Multiprocessor Programming" - Maurice Herlihy, Nir Shavit
- "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 8)
- "Concurrency in Go" - Katherine Cox-Buday (O'Reilly)
- tokio.rs - Rust非同期ランタイム
- libuv - Node.jsイベントループ実装
현재 단락 (1/1026)
"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things...