Skip to content

Split View: 동시성 & 병렬 프로그래밍 완전 가이드 2025: Async/Await, Thread, Goroutine, Actor 모델

✨ Learn with Quiz
|

동시성 & 병렬 프로그래밍 완전 가이드 2025: Async/Await, Thread, Goroutine, Actor 모델

목차

1. 동시성 vs 병렬성: Rob Pike의 구분

"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." -- Rob Pike

이 한 문장이 동시성과 병렬성의 핵심 차이를 요약합니다.

1.1 개념적 구분

동시성(Concurrency) 은 프로그램의 구조적 특성입니다. 여러 작업을 논리적으로 동시에 다룰 수 있도록 프로그램을 설계하는 것입니다. 실제로 동시에 실행되는지는 중요하지 않습니다.

병렬성(Parallelism)실행의 특성입니다. 여러 작업이 물리적으로 동시에 실행되는 것을 의미합니다. 멀티코어 CPU에서 여러 스레드가 동시에 동작하는 것이 대표적입니다.

동시성 (Concurrency) - 구조
┌─────────────────────────────────────────┐
Task A: ██░░██░░░░██░░                │
Task B: ░░██░░██░░░░██                │
Task C: ░░░░░░░░██░░░░██              │
│         ─────────────────→ 시간         │
  (단일 코어에서 번갈아 실행)└─────────────────────────────────────────┘

병렬성 (Parallelism) - 실행
┌─────────────────────────────────────────┐
Core 1: ████████████████               │
Core 2: ████████████████               │
Core 3: ████████████████               │
│         ─────────────────→ 시간         │
  (다중 코어에서 동시에 실행)└─────────────────────────────────────────┘

1.2 실생활 비유

상황동시성병렬성
요리사 1명이 3개 요리를 번갈아 만듦OX
요리사 3명이 각각 1개씩 동시에 만듦OO
요리사 1명이 1개 요리만 만듦XX

1.3 왜 구분이 중요한가

동시성 없이 병렬성만 추구하면 복잡도가 폭발합니다. 올바른 동시성 구조를 먼저 설계한 뒤, 병렬 실행으로 성능을 끌어올리는 것이 정석입니다.

# 동시성 구조 (asyncio) - 단일 스레드에서도 효율적
import asyncio

async def fetch_data(url: str) -> str:
    print(f"Fetching {url}...")
    await asyncio.sleep(1)  # I/O 대기 시뮬레이션
    return f"Data from {url}"

async def main():
    # 동시성: 3개 요청을 구조적으로 동시에 다룸
    results = await asyncio.gather(
        fetch_data("https://api.example.com/users"),
        fetch_data("https://api.example.com/posts"),
        fetch_data("https://api.example.com/comments"),
    )
    for r in results:
        print(r)

asyncio.run(main())
# 총 ~1초 (3초가 아닌) - 동시성의 힘

2. 스레딩 모델: OS 스레드부터 가상 스레드까지

2.1 OS 스레드 (Kernel Thread)

운영체제가 직접 관리하는 스레드입니다. 생성 비용이 높고 (약 1MB 스택), 컨텍스트 스위칭 오버헤드가 존재합니다.

// Java - OS 스레드
public class ThreadExample {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Thread 1: " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Thread 2: " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("Both threads completed");
    }
}

2.2 유저 스레드 (User Thread / Green Thread)

런타임이 관리하는 경량 스레드입니다. OS 스레드 위에 다수의 유저 스레드를 매핑합니다.

스레드 모델 비교
┌───────────────────────────────────────────────┐
1:1 (OS Thread)    N:1 (Green Thread)│ ┌──┐ ┌──┐ ┌──┐    ┌──┐┌──┐┌──┐┌──┐┌──┐     │
│ │UT│ │UT│ │UT│    │GT││GT││GT││GT││GT│     │
│ └┬─┘ └┬─┘ └┬─┘    └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘     │
│  │    │    │        └──┬──┘  │  └──┬──┘       │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐       ┌┴─┐  │     │          │
│ │KT│ │KT│ │KT│       │KT│  │     │          │
│ └──┘ └──┘ └──┘       └──┘  │     │          │
│                              │     │          │
M:N (Hybrid)│ ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐                    │
│ │GT││GT││GT││GT││GT││GT│                    │
│ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘└┬─┘                    │
│  └─┬─┘ └──┬──┘ └──┬──┘                       │
│   ┌┴─┐   ┌┴─┐   ┌┴─┐                         │
│   │KT│   │KT│   │KT│                         │
│   └──┘   └──┘   └──┘                         │
└───────────────────────────────────────────────┘

2.3 Java 21 가상 스레드 (Virtual Thread)

Java 21에서 정식 도입된 가상 스레드는 M:N 모델을 구현합니다. 수백만 개의 가상 스레드를 생성할 수 있습니다.

// Java 21 Virtual Threads
import java.util.concurrent.Executors;
import java.time.Duration;

public class VirtualThreadDemo {
    public static void main(String[] args) throws Exception {
        // 100만 개의 가상 스레드 생성 가능!
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < 1_000_000; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    try {
                        Thread.sleep(Duration.ofSeconds(1));
                        if (taskId % 100_000 == 0) {
                            System.out.println("Task " + taskId + " done on " +
                                Thread.currentThread());
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
        }
        System.out.println("All tasks completed");
    }
}

2.4 스레드 모델 비교

특성OS 스레드Green ThreadVirtual Thread (Java 21)
스택 크기~1MB~KB~수 KB (동적)
생성 비용높음 (syscall)낮음매우 낮음
동시 개수수천수백만수백만
선점OS 선점협력적협력적 (I/O 시 양보)
I/O 블로킹스레드 블로킹전체 블로킹 위험자동 언마운트

3. 코루틴: 경량 동시성의 핵심

3.1 Stackful vs Stackless 코루틴

Stackful Coroutine (: Lua, Go의 goroutine)
┌──────────────────────────────┐
│ 자체 스택을 가짐               │
│ 스택 어디서든 일시중단 가능     │
│ 메모리 사용 높음               │
│ 중첩 함수 호출 중에도 양보 가능 │
└──────────────────────────────┘

Stackless Coroutine (: Python, Kotlin, Rust)
┌──────────────────────────────┐
│ 상태를 힙에 저장               │
│ 최상위에서만 일시중단 가능      │
│ 메모리 사용 낮음               │
│ 컴파일러가 상태 머신으로 변환   │
└──────────────────────────────┘

3.2 Python asyncio 코루틴

import asyncio
import aiohttp
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    """비동기 HTTP 요청"""
    async with session.get(url) as response:
        return {
            "url": url,
            "status": response.status,
            "length": len(await response.text()),
        }

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
    ]

    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        # 5개 요청을 동시에 실행
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - start
    print(f"5개 요청 완료: {elapsed:.2f}초")  # ~1초 (5초가 아닌)

    for r in results:
        print(f"  {r['url']}: status={r['status']}, length={r['length']}")

asyncio.run(main())

3.3 Kotlin 코루틴

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

suspend fun fetchData(id: Int): String {
    delay(1000) // 비동기 대기 (스레드 블로킹 없음)
    return "Result-$id"
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        // 구조적 동시성: coroutineScope가 모든 자식 코루틴 관리
        coroutineScope {
            val results = (1..10).map { id ->
                async(Dispatchers.IO) {
                    fetchData(id)
                }
            }
            results.awaitAll().forEach { println(it) }
        }
    }
    println("Completed in ${time}ms") // ~1000ms
}

4. Goroutine과 Go의 동시성 모델

4.1 Goroutine 내부 구조

Go 런타임은 M:N 스케줄링을 사용합니다. G(Goroutine), M(Machine/OS Thread), P(Processor) 세 가지 개념이 핵심입니다.

Go 런타임 스케줄러 (GMP 모델)
┌─────────────────────────────────────────────┐
│                                             │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐      │
│  │ G1   │ │ G2   │ │ G3   │ │ G4...│  └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘      │
│     │        │        │        │            │
│  ┌──┴────────┴──┐  ┌──┴────────┴──┐        │
│  │   P (Local   │  │   P (Local   │        │
│  │    Queue)    │  │    Queue)    │        │
│  └──────┬───────┘  └──────┬───────┘        │
│         │                 │                 │
│  ┌──────┴───────┐  ┌──────┴───────┐        │
│  │ M (OS Thread)│  │ M (OS Thread)│        │
│  └──────────────┘  └──────────────┘        │
│                                             │
Global Run Queue: [G5, G6, G7, ...]└─────────────────────────────────────────────┘

GOMAXPROCS = P (기본값: CPU 코어 수)

4.2 Goroutine 기본 사용

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg) // goroutine 생성 - 약 2KB 스택
    }

    wg.Wait()
    fmt.Println("All workers completed")
}

4.3 Work Stealing

P의 로컬 큐가 비면 다른 P의 큐에서 goroutine을 훔쳐옵니다. 이를 통해 부하를 균등하게 분배합니다.

Work Stealing 과정
┌─────────────┐     ┌─────────────┐
P1 (busy)   │     │ P2 (idle)[G1,G2,G3]  │ ──→ │ []│             │steal│             │
[G1,G2]    │ ──→ │ [G3]└─────────────┘     └─────────────┘

5. Async/Await 패턴 비교

5.1 JavaScript Promise와 Async/Await

// Promise 체이닝 vs Async/Await
// Promise 체이닝 방식
function fetchUserData(userId) {
  return fetch(`/api/users/${userId}`)
    .then(response => response.json())
    .then(user => fetch(`/api/posts?author=${user.id}`))
    .then(response => response.json())
    .then(posts => {
      return { user, posts };
    })
    .catch(error => {
      console.error('Error:', error);
      throw error;
    });
}

// Async/Await 방식 (훨씬 읽기 좋음)
async function fetchUserDataAsync(userId) {
  try {
    const userResponse = await fetch(`/api/users/${userId}`);
    const user = await userResponse.json();

    const postsResponse = await fetch(`/api/posts?author=${user.id}`);
    const posts = await postsResponse.json();

    return { user, posts };
  } catch (error) {
    console.error('Error:', error);
    throw error;
  }
}

// 병렬 실행
async function fetchMultipleUsers(userIds) {
  const promises = userIds.map(id => fetchUserDataAsync(id));
  return Promise.all(promises); // 모두 병렬 실행
}

// Promise.allSettled - 일부 실패해도 계속
async function fetchWithPartialFailure(urls) {
  const results = await Promise.allSettled(
    urls.map(url => fetch(url).then(r => r.json()))
  );

  const successes = results
    .filter(r => r.status === 'fulfilled')
    .map(r => r.value);
  const failures = results
    .filter(r => r.status === 'rejected')
    .map(r => r.reason);

  return { successes, failures };
}

5.2 Rust Async

use tokio;
use reqwest;
use futures::future::join_all;

#[derive(Debug)]
struct ApiResponse {
    url: String,
    status: u16,
    body_length: usize,
}

async fn fetch_url(client: &reqwest::Client, url: &str) -> Result<ApiResponse, reqwest::Error> {
    let response = client.get(url).send().await?;
    let status = response.status().as_u16();
    let body = response.text().await?;

    Ok(ApiResponse {
        url: url.to_string(),
        status,
        body_length: body.len(),
    })
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();
    let urls = vec![
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
    ];

    // 모든 요청을 동시에 실행
    let futures: Vec<_> = urls.iter()
        .map(|url| fetch_url(&client, url))
        .collect();

    let results = join_all(futures).await;

    for result in results {
        match result {
            Ok(resp) => println!("{}: status={}, length={}", resp.url, resp.status, resp.body_length),
            Err(e) => eprintln!("Error: {}", e),
        }
    }

    Ok(())
}

5.3 C# Task 기반 비동기

using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;

public class AsyncDemo
{
    private static readonly HttpClient _client = new HttpClient();

    public static async Task<string> FetchDataAsync(string url)
    {
        var response = await _client.GetAsync(url);
        response.EnsureSuccessStatusCode();
        return await response.Content.ReadAsStringAsync();
    }

    public static async Task Main()
    {
        var urls = new List<string>
        {
            "https://httpbin.org/get",
            "https://httpbin.org/ip",
            "https://httpbin.org/user-agent",
        };

        // 병렬 실행 with WhenAll
        var tasks = urls.Select(url => FetchDataAsync(url));
        var results = await Task.WhenAll(tasks);

        for (int i = 0; i < urls.Count; i++)
        {
            Console.WriteLine($"URL: {urls[i]}, Length: {results[i].Length}");
        }

        // 타임아웃 패턴
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
        try
        {
            var data = await _client.GetStringAsync(urls[0], cts.Token);
            Console.WriteLine($"Got data: {data.Length} chars");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Request timed out!");
        }
    }
}

5.4 Async/Await 언어 비교

특성JavaScriptPythonRustC#Kotlin
런타임V8 Event Loopasynciotokio/async-stdCLR Thread PoolDispatchers
Future 타입PromiseCoroutineFuture (lazy)TaskDeferred
취소AbortControllerTask.cancel()dropCancellationTokenJob.cancel()
스트림AsyncIteratorasync forStream traitIAsyncEnumerableFlow
에러 처리try/catchtry/exceptResulttry/catchtry/catch
병렬 실행Promise.allgatherjoin!Task.WhenAllawaitAll
Zero-costXXOXX

6. Event Loop 심층 분석

6.1 Node.js libuv Event Loop

Node.js Event Loop 단계
┌───────────────────────────────────┐
│         ┌──────────────┐          │
│    ┌────│   timers     │────┐     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │ pending I/O  │    │     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │  idle/prepare │    │     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │    poll       │◄───┘    │
│    │    └──────────────┘         │
│    │    ┌──────────────┐         │
│    │    │    check      │         │
│    │     (setImmediate)│         │
│    │    └──────────────┘         │
│    │    ┌──────────────┐         │
│    └────│close callbacks│─────────│
│         └──────────────┘         │
│                                   │
│  각 단계 사이: microtask queue    │
  (Promise, queueMicrotask)└───────────────────────────────────┘
// Event Loop 동작 순서 이해
console.log('1. 동기 코드');

setTimeout(() => console.log('2. setTimeout (timers)'), 0);

setImmediate(() => console.log('3. setImmediate (check)'));

Promise.resolve().then(() => console.log('4. Promise (microtask)'));

process.nextTick(() => console.log('5. nextTick (microtask - 최우선)'));

console.log('6. 동기 코드 끝');

// 출력 순서:
// 1. 동기 코드
// 6. 동기 코드 끝
// 5. nextTick (microtask - 최우선)
// 4. Promise (microtask)
// 2. setTimeout (timers)
// 3. setImmediate (check)

6.2 Python asyncio Event Loop

import asyncio
import time

async def cpu_bound_simulation(name: str, duration: float):
    """CPU-bound 작업 시뮬레이션"""
    print(f"[{name}] 시작")
    # await로 양보하지 않으면 이벤트 루프를 블로킹
    await asyncio.sleep(duration)
    print(f"[{name}] 완료 ({duration}초)")
    return f"{name}: done"

async def producer(queue: asyncio.Queue, n: int):
    """생산자: 큐에 아이템 추가"""
    for i in range(n):
        await asyncio.sleep(0.1)
        item = f"item-{i}"
        await queue.put(item)
        print(f"  Produced: {item}")
    await queue.put(None)  # 종료 신호

async def consumer(queue: asyncio.Queue, name: str):
    """소비자: 큐에서 아이템 소비"""
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # 다른 소비자도 종료
            break
        await asyncio.sleep(0.2)  # 처리 시뮬레이션
        print(f"  {name} consumed: {item}")
        queue.task_done()

async def main():
    # 1. 동시 실행
    start = time.perf_counter()
    results = await asyncio.gather(
        cpu_bound_simulation("TaskA", 1.0),
        cpu_bound_simulation("TaskB", 0.5),
        cpu_bound_simulation("TaskC", 0.8),
    )
    elapsed = time.perf_counter() - start
    print(f"총 시간: {elapsed:.2f}초")  # ~1초
    print(f"결과: {results}")

    # 2. 생산자-소비자 패턴
    print("\n--- 생산자-소비자 ---")
    queue = asyncio.Queue(maxsize=5)
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
    )

asyncio.run(main())

7. Actor 모델: 공유 상태 없는 동시성

7.1 Actor 모델 개념

Actor 모델
┌─────────────────────────────────────────────────┐
│                                                 │
│  ┌─────────┐  메시지   ┌─────────┐             │
│  │ Actor A │ ───────→  │ Actor B │             │
│  │ [State] │           │ [State] │             │
│  │ [Mailbox][Mailbox]│  └─────────┘           └────┬────┘             │
│       ↑                     │                   │
│       │        메시지        │ 새 Actor 생성     │
│       └─────────────────────┘                   │
│                              ↓                   │
│                        ┌─────────┐              │
│                        │ Actor C │              │
│                        │ [State] │              │
│                        └─────────┘              │
│                                                 │
│  규칙:1. 메시지로만 통신 (공유 상태 없음)2. 한 번에 하나의 메시지만 처리                  │
3. 다른 Actor를 생성할 수 있음                   │
4. 자신의 상태만 변경 가능                       │
└─────────────────────────────────────────────────┘

7.2 Erlang/Elixir Actor

%% Erlang - 카운터 Actor
-module(counter).
-export([start/0, increment/1, get/1, loop/1]).

start() ->
    spawn(fun() -> loop(0) end).

increment(Pid) ->
    Pid ! {increment, 1}.

get(Pid) ->
    Pid ! {get, self()},
    receive
        {count, Value} -> Value
    after 5000 ->
        timeout
    end.

loop(Count) ->
    receive
        {increment, N} ->
            loop(Count + N);
        {get, From} ->
            From ! {count, Count},
            loop(Count);
        stop ->
            ok
    end.

%% 사용 예:
%% Pid = counter:start().
%% counter:increment(Pid).
%% counter:increment(Pid).
%% counter:get(Pid). %% => 2

7.3 Akka (Scala) Actor

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors

// 메시지 정의
sealed trait Command
case class Deposit(amount: Double) extends Command
case class Withdraw(amount: Double) extends Command
case class GetBalance(replyTo: ActorRef[Balance]) extends Command
case class Balance(amount: Double)

// 은행 계좌 Actor
object BankAccount {
  def apply(balance: Double = 0.0): Behavior[Command] =
    Behaviors.receive { (context, message) =>
      message match {
        case Deposit(amount) =>
          val newBalance = balance + amount
          context.log.info(s"Deposited $amount, balance: $newBalance")
          BankAccount(newBalance)

        case Withdraw(amount) =>
          if (amount <= balance) {
            val newBalance = balance - amount
            context.log.info(s"Withdrew $amount, balance: $newBalance")
            BankAccount(newBalance)
          } else {
            context.log.warn(s"Insufficient funds: $balance < $amount")
            Behaviors.same
          }

        case GetBalance(replyTo) =>
          replyTo ! Balance(balance)
          Behaviors.same
      }
    }
}

// Supervision (장애 복구)
object BankSupervisor {
  def apply(): Behavior[Command] =
    Behaviors.supervise(BankAccount())
      .onFailure(SupervisorStrategy.restart)
}

7.4 Actor 모델의 장점과 단점

장점단점
공유 상태 없음 (경쟁 조건 방지)디버깅이 어려울 수 있음
위치 투명성 (분산 가능)메시지 순서 보장 제한적
장애 격리 (let it crash)메일박스 오버플로우 가능
확장성이 좋음동기 요청-응답 패턴이 복잡

8. CSP: Go의 채널 기반 동시성

8.1 채널 기본

package main

import (
    "fmt"
    "time"
)

func main() {
    // 버퍼 없는 채널 (동기)
    ch := make(chan string)

    go func() {
        time.Sleep(time.Second)
        ch <- "Hello from goroutine!"
    }()

    msg := <-ch // 메시지가 올 때까지 블로킹
    fmt.Println(msg)

    // 버퍼 있는 채널 (비동기)
    buffered := make(chan int, 3)
    buffered <- 1
    buffered <- 2
    buffered <- 3
    // buffered <- 4 // 버퍼가 꽉 차서 블로킹!

    fmt.Println(<-buffered) // 1
    fmt.Println(<-buffered) // 2
}

8.2 Fan-In / Fan-Out 패턴

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Fan-Out: 하나의 입력을 여러 워커에게 분배
func fanOut(input <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan int)
        channels[i] = ch
        go func(workerID int, out chan<- int) {
            defer close(out)
            for val := range input {
                // 처리 시뮬레이션
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                out <- val * val
                fmt.Printf("  Worker %d processed %d\n", workerID, val)
            }
        }(i, ch)
    }
    return channels
}

// Fan-In: 여러 채널의 결과를 하나로 합침
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                merged <- val
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // 입력 채널
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 20; i++ {
            input <- i
        }
    }()

    // Fan-Out: 4개 워커
    workerChannels := fanOut(input, 4)

    // Fan-In: 결과 합침
    results := fanIn(workerChannels...)

    // 결과 수집
    var sum int
    for result := range results {
        sum += result
    }
    fmt.Printf("Sum of squares: %d\n", sum)
}

8.3 Select 문

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from channel 1"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from channel 2"
    }()

    // select: 여러 채널 중 준비된 것을 선택
    for i := 0; i < 2; i++ {
        select {
        case msg := <-ch1:
            fmt.Println("Received", msg)
        case msg := <-ch2:
            fmt.Println("Received", msg)
        case <-time.After(3 * time.Second):
            fmt.Println("Timeout!")
        }
    }

    // 논블로킹 select
    ch := make(chan int, 1)
    select {
    case val := <-ch:
        fmt.Println("Got:", val)
    default:
        fmt.Println("Channel empty, moving on")
    }
}

9. 동기화 프리미티브

9.1 Mutex와 RWLock

package main

import (
    "fmt"
    "sync"
)

// Mutex: 상호 배제
type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.v[key]++
}

func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.v[key]
}

// RWMutex: 읽기는 동시에, 쓰기는 배타적
type SafeCache struct {
    mu    sync.RWMutex
    store map[string]string
}

func (c *SafeCache) Get(key string) (string, bool) {
    c.mu.RLock()         // 읽기 잠금 (여러 고루틴 동시 읽기 가능)
    defer c.mu.RUnlock()
    val, ok := c.store[key]
    return val, ok
}

func (c *SafeCache) Set(key, value string) {
    c.mu.Lock()          // 쓰기 잠금 (배타적)
    defer c.mu.Unlock()
    c.store[key] = value
}

func main() {
    counter := SafeCounter{v: make(map[string]int)}
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc("key")
        }()
    }
    wg.Wait()
    fmt.Println("Final count:", counter.Value("key")) // 정확히 1000
}

9.2 Semaphore

import asyncio

async def worker(sem: asyncio.Semaphore, task_id: int):
    async with sem:  # 최대 3개만 동시 실행
        print(f"Task {task_id}: 시작 (슬롯 획득)")
        await asyncio.sleep(1)
        print(f"Task {task_id}: 완료 (슬롯 반환)")

async def main():
    sem = asyncio.Semaphore(3)  # 동시 실행 3개로 제한
    tasks = [worker(sem, i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

9.3 Condition Variable

import threading
import time
import random

class BoundedBuffer:
    def __init__(self, capacity: int):
        self.buffer = []
        self.capacity = capacity
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)

    def produce(self, item):
        with self.not_full:
            while len(self.buffer) >= self.capacity:
                self.not_full.wait()  # 버퍼가 비워질 때까지 대기
            self.buffer.append(item)
            self.not_empty.notify()  # 소비자에게 알림

    def consume(self):
        with self.not_empty:
            while len(self.buffer) == 0:
                self.not_empty.wait()  # 아이템이 들어올 때까지 대기
            item = self.buffer.pop(0)
            self.not_full.notify()  # 생산자에게 알림
            return item

buf = BoundedBuffer(5)

def producer():
    for i in range(20):
        buf.produce(i)
        print(f"Produced: {i}")
        time.sleep(random.uniform(0.01, 0.1))

def consumer(name):
    for _ in range(10):
        item = buf.consume()
        print(f"{name} consumed: {item}")
        time.sleep(random.uniform(0.05, 0.15))

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("C1",))
t3 = threading.Thread(target=consumer, args=("C2",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

9.4 Atomic 연산

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class AtomicDemo {
    // AtomicLong - 잠금 없는 카운터
    private static final AtomicLong counter = new AtomicLong(0);

    // AtomicReference - 잠금 없는 참조 업데이트
    private static final AtomicReference<String> config =
        new AtomicReference<>("default");

    public static void main(String[] args) throws InterruptedException {
        // CAS (Compare-And-Swap) 루프
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10_000; j++) {
                    counter.incrementAndGet(); // 원자적 증가
                }
            });
            threads[i].start();
        }

        for (Thread t : threads) t.join();
        System.out.println("Counter: " + counter.get()); // 정확히 100,000

        // CAS 직접 사용
        boolean updated = config.compareAndSet("default", "production");
        System.out.println("Config updated: " + updated); // true
        System.out.println("Config value: " + config.get()); // production
    }
}

10. Lock-free 자료구조

10.1 CAS (Compare-And-Swap)

CAS는 lock-free 프로그래밍의 기초입니다. "현재 값이 예상 값과 같으면 새 값으로 교체"하는 원자적 연산입니다.

CAS 동작 원리
┌──────────────────────────────────────┐
CAS(address, expected, new_value)│                                      │
if *address == expected:*address = new_value             │
return true   // 성공            │
else:return false  // 실패 → 재시도   │
│                                      │
 (위 전체가 하드웨어 수준에서 원자적)└──────────────────────────────────────┘

10.2 ABA 문제

ABA 문제
Thread 1:A를 읽음  (일시 중단)
Thread 2: AB로 변경
Thread 3: BA로 변경
Thread 1: (재개) CAS(A, A, C) → 성공! (하지만 A는 원래의 A가 아님)

해결: 버전 카운터 추가
AtomicStampedReference (Java)
TaggedPointer (C/C++)

10.3 Lock-free 스택 (Treiber Stack)

import java.util.concurrent.atomic.AtomicReference;

public class LockFreeStack<T> {
    private static class Node<T> {
        final T value;
        Node<T> next;

        Node(T value) {
            this.value = value;
        }
    }

    private final AtomicReference<Node<T>> top = new AtomicReference<>(null);

    public void push(T value) {
        Node<T> newNode = new Node<>(value);
        Node<T> oldTop;
        do {
            oldTop = top.get();
            newNode.next = oldTop;
        } while (!top.compareAndSet(oldTop, newNode)); // CAS 루프
    }

    public T pop() {
        Node<T> oldTop;
        Node<T> newTop;
        do {
            oldTop = top.get();
            if (oldTop == null) return null;
            newTop = oldTop.next;
        } while (!top.compareAndSet(oldTop, newTop)); // CAS 루프
        return oldTop.value;
    }
}

11. 동시성 버그 패턴과 방지

11.1 데드락 (Deadlock)

데드락 조건 (4가지 모두 충족 시 발생)
1. 상호 배제 (Mutual Exclusion)
2. 점유 대기 (Hold and Wait)
3. 비선점 (No Preemption)
4. 순환 대기 (Circular Wait)

┌──────────┐          ┌──────────┐
Thread A │──Lock1──→│ Thread B│          │←──Lock2──│          │
 (Lock2 (Lock1│  대기중)  │          │  대기중)└──────────┘          └──────────┘
import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

# 데드락 발생 코드
def thread_1():
    with lock_a:
        print("Thread 1: Lock A 획득")
        time.sleep(0.1)
        with lock_b:  # Lock B 대기 → 데드락!
            print("Thread 1: Lock B 획득")

def thread_2():
    with lock_b:
        print("Thread 2: Lock B 획득")
        time.sleep(0.1)
        with lock_a:  # Lock A 대기 → 데드락!
            print("Thread 2: Lock A 획득")

# 해결: 잠금 순서 통일
def thread_1_fixed():
    with lock_a:     # 항상 A → B 순서
        print("Thread 1: Lock A 획득")
        time.sleep(0.1)
        with lock_b:
            print("Thread 1: Lock B 획득")

def thread_2_fixed():
    with lock_a:     # 항상 A → B 순서 (순환 대기 방지)
        print("Thread 2: Lock A 획득")
        time.sleep(0.1)
        with lock_b:
            print("Thread 2: Lock B 획득")

11.2 레이스 컨디션 (Race Condition)

package main

import (
    "fmt"
    "sync"
)

func main() {
    // 레이스 컨디션 예시
    counter := 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // DATA RACE! 읽기-수정-쓰기가 원자적이지 않음
        }()
    }
    wg.Wait()
    fmt.Println("Counter (unsafe):", counter) // 1000 미만일 수 있음

    // 해결 1: Mutex
    counter = 0
    var mu sync.Mutex
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counter++
            mu.Unlock()
        }()
    }
    wg.Wait()
    fmt.Println("Counter (mutex):", counter) // 정확히 1000

    // 해결 2: 채널
    counter = 0
    ch := make(chan int, 1000)
    for i := 0; i < 1000; i++ {
        go func() {
            ch <- 1
        }()
    }
    for i := 0; i < 1000; i++ {
        counter += <-ch
    }
    fmt.Println("Counter (channel):", counter) // 정확히 1000
}

11.3 기아 상태와 우선순위 역전

기아 상태 (Starvation)
- 특정 스레드가 자원에 접근하지 못하고 무한 대기
- 해결: 공정한 잠금 (Fair Lock), 우선순위 에이징

우선순위 역전 (Priority Inversion)
┌─────────────────────────────────────┐
High Priority Task   ← 블로킹됨     │
 (Lock 대기)Medium Priority Task ← 실행 중      │
 (선점)Low Priority TaskLock 보유    │
│                                     │
│ 해결: Priority Inheritance Protocol│ 낮은 우선순위 태스크가 Lock 보유 시  │
│ 높은 우선순위를 일시적으로 상속      │
└─────────────────────────────────────┘

12. 구조적 동시성 (Structured Concurrency)

12.1 개념

구조적 동시성은 동시 작업의 수명을 코드 블록(스코프)에 묶는 패턴입니다. 스코프가 끝나면 모든 자식 작업이 완료(또는 취소)됨을 보장합니다.

비구조적 동시성            구조적 동시성
┌────────────────┐      ┌────────────────┐
│ start task A   │      │ scope {│ start task B   │      │   task A... (누출?)    │      │   task B│ forget task A? │      │ } // A,B 완료  │
└────────────────┘      └────────────────┘

12.2 Python TaskGroup (3.11+)

import asyncio

async def fetch(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    if name == "failing":
        raise ValueError(f"{name} failed!")
    return f"{name}: done"

async def main():
    # TaskGroup: 구조적 동시성
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(fetch("api-1", 0.5))
            task2 = tg.create_task(fetch("api-2", 1.0))
            task3 = tg.create_task(fetch("api-3", 0.8))
        # 여기 도달 시 모든 태스크 완료 보장
        print(task1.result(), task2.result(), task3.result())
    except* ValueError as eg:
        # ExceptionGroup으로 에러 처리
        for exc in eg.exceptions:
            print(f"Error: {exc}")

asyncio.run(main())

12.3 Java 21 Structured Concurrency

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class StructuredConcurrencyDemo {

    record User(String name) {}
    record Order(String id) {}

    static User fetchUser(String userId) throws Exception {
        Thread.sleep(500);
        return new User("Alice");
    }

    static Order fetchOrder(String orderId) throws Exception {
        Thread.sleep(800);
        return new Order("ORD-123");
    }

    public static void main(String[] args) throws Exception {
        // ShutdownOnFailure: 하나라도 실패하면 나머지 취소
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Future<User> userFuture = scope.fork(() -> fetchUser("user-1"));
            Future<Order> orderFuture = scope.fork(() -> fetchOrder("order-1"));

            scope.join();           // 모든 태스크 완료 대기
            scope.throwIfFailed();  // 실패 시 예외 전파

            // 여기 도달 시 모든 결과 사용 가능
            User user = userFuture.resultNow();
            Order order = orderFuture.resultNow();
            System.out.printf("User: %s, Order: %s%n", user.name(), order.id());
        }
    }
}

13. 언어별 동시성 모델 비교

차원GoRustJava 21PythonKotlinJavaScriptErlang/ElixirC#
모델CSPOwnership + asyncVirtual ThreadasyncioCoroutineEvent LoopActorTask
경량 단위goroutineFutureVirtual ThreadcoroutinecoroutinePromiseprocessTask
스케줄링M:N (GMP)런타임 의존M:N1:1 (싱글)M:N1:1 (싱글)선점적 M:NM:N
공유 상태channelownershipsynchronizedasyncio.LockMutex없음 (싱글)없음 (메시지)lock
에러 전파panic/recoverResult예외예외예외rejectlink/monitor예외
취소contextdropinterruptcancelJob.cancelAbortControllerexitCancelToken
구조적 동시성errgroup제한적StructuredTaskScopeTaskGroupcoroutineScope없음supervisor없음
CPU-boundgoroutinethread/rayonthreadmultiprocessingDispatchers.DefaultWorker ThreadprocessParallel.ForEach
메모리 안전GC컴파일 타임GCGCGCGCGCGC
데드락 방지채널 설계컴파일 타임주의 필요주의 필요주의 필요해당 없음메시지 기반주의 필요
성능높음매우 높음높음보통높음I/O만높음 (BEAM)높음
학습 곡선낮음높음보통낮음보통낮음보통보통

14. 실전 패턴: Rate Limiter

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Token Bucket Rate Limiter
type RateLimiter struct {
    tokens     chan struct{}
    refillRate time.Duration
    ctx        context.Context
    cancel     context.CancelFunc
}

func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
    ctx, cancel := context.WithCancel(context.Background())
    rl := &RateLimiter{
        tokens:     make(chan struct{}, maxTokens),
        refillRate: refillRate,
        ctx:        ctx,
        cancel:     cancel,
    }

    // 토큰 채우기
    for i := 0; i < maxTokens; i++ {
        rl.tokens <- struct{}{}
    }

    // 주기적 리필
    go func() {
        ticker := time.NewTicker(refillRate)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case rl.tokens <- struct{}{}:
                default: // 이미 꽉 참
                }
            case <-ctx.Done():
                return
            }
        }
    }()

    return rl
}

func (rl *RateLimiter) Acquire(ctx context.Context) error {
    select {
    case <-rl.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (rl *RateLimiter) Close() {
    rl.cancel()
}

func main() {
    limiter := NewRateLimiter(5, 200*time.Millisecond)
    defer limiter.Close()

    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
            defer cancel()

            if err := limiter.Acquire(ctx); err != nil {
                fmt.Printf("Request %d: timed out\n", id)
                return
            }
            fmt.Printf("Request %d: processed at %s\n", id,
                time.Now().Format("15:04:05.000"))
        }(i)
    }
    wg.Wait()
}

15. 성능 최적화 팁

15.1 적절한 동시성 모델 선택

워크로드 유형에 따른 선택

I/O-bound (네트워크, 디스크):
  → async/await, coroutine, goroutine
  → 이벤트 루프 기반이 효율적
  → 스레드 풀은 과도한 컨텍스트 스위칭 유발

CPU-bound (계산, 변환):
OS 스레드 (Python: multiprocessing)
Rust: rayon 크레이트
Go: goroutine (자동 병렬화)

혼합:
  → 별도 스레드 풀/디스패처 분리
Kotlin: Dispatchers.IO vs Dispatchers.Default
Python: asyncio + ProcessPoolExecutor

15.2 컨텍스트 스위칭 최소화

import asyncio
import concurrent.futures
import time

# CPU-bound 작업을 별도 프로세스에서 실행
def cpu_heavy_task(n: int) -> int:
    """소수 계산 (CPU-bound)"""
    count = 0
    for i in range(2, n):
        if all(i % j != 0 for j in range(2, int(i**0.5) + 1)):
            count += 1
    return count

async def main():
    loop = asyncio.get_event_loop()

    # ProcessPoolExecutor로 CPU-bound 작업 분리
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
        start = time.perf_counter()
        results = await asyncio.gather(
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
        )
        elapsed = time.perf_counter() - start
        print(f"4 tasks completed in {elapsed:.2f}s: {results}")

asyncio.run(main())

16. 퀴즈

Q1. 동시성과 병렬성의 핵심 차이는?

동시성은 프로그램의 구조적 특성으로, 여러 작업을 논리적으로 동시에 다룰 수 있는 설계입니다. 병렬성은 실행의 특성으로, 여러 작업이 물리적으로 동시에 실행되는 것입니다. 동시성은 싱글 코어에서도 가능하지만, 병렬성은 멀티코어가 필요합니다.

Q2. Goroutine의 GMP 모델에서 G, M, P는 각각 무엇인가?
  • G (Goroutine): 실행할 작업 단위. 경량 스레드.
  • M (Machine): OS 스레드. 실제 실행을 담당.
  • P (Processor): 논리적 프로세서. G를 M에 매핑하는 스케줄러. GOMAXPROCS 값으로 수를 결정.

P는 로컬 run queue를 가지며, work stealing으로 부하를 균등 분배합니다.

Q3. 데드락이 발생하려면 어떤 4가지 조건이 모두 충족되어야 하는가?
  1. 상호 배제 (Mutual Exclusion): 자원을 한 번에 하나의 프로세스만 사용
  2. 점유 대기 (Hold and Wait): 자원을 점유한 채 다른 자원을 대기
  3. 비선점 (No Preemption): 이미 할당된 자원을 강제로 빼앗을 수 없음
  4. 순환 대기 (Circular Wait): 프로세스 간 순환 형태의 대기 관계

하나라도 깨면 데드락이 발생하지 않습니다. 가장 일반적인 해결책은 잠금 순서를 통일하여 순환 대기를 방지하는 것입니다.

Q4. Actor 모델과 CSP의 핵심 차이는?
  • Actor 모델: 비동기 메시지 전달. 각 Actor는 고유한 메일박스를 가짐. 수신자를 직접 지정. Erlang, Akka.
  • CSP: 동기 채널 통신. 채널이 독립적인 엔티티. 송신자와 수신자가 서로를 모름. Go의 goroutine + channel.

Actor는 "누구에게 보낼지"를 알고, CSP는 "어느 채널에 보낼지"를 알 뿐입니다.

Q5. 구조적 동시성이 해결하는 문제는?

구조적 동시성은 다음 문제를 해결합니다:

  1. 작업 누출 (Task Leaking): fire-and-forget으로 시작한 작업이 잊혀지는 문제
  2. 에러 전파: 자식 작업의 에러가 부모에게 전달되지 않는 문제
  3. 취소 전파: 부모가 취소되어도 자식이 계속 실행되는 문제
  4. 수명 관리: 스코프가 끝났는데 작업이 계속 실행되는 문제

Java 21의 StructuredTaskScope, Python의 TaskGroup, Kotlin의 coroutineScope가 대표적입니다.


17. 참고 자료

  1. Rob Pike - "Concurrency is not Parallelism" (Go Blog)
  2. Java 21 Virtual Threads - JEP 444
  3. Java 21 Structured Concurrency - JEP 453
  4. Python asyncio 공식 문서
  5. Go 공식 문서 - Effective Go (Concurrency)
  6. Kotlin Coroutines 공식 가이드
  7. Rust async-book (The Async Book)
  8. Akka Documentation - Actor Model
  9. Erlang/OTP - Processes
  10. "The Art of Multiprocessor Programming" - Maurice Herlihy, Nir Shavit
  11. "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 8)
  12. "Concurrency in Go" - Katherine Cox-Buday (O'Reilly)
  13. tokio.rs - Rust 비동기 런타임
  14. libuv - Node.js Event Loop 구현체

Concurrency & Parallel Programming Complete Guide 2025: Async/Await, Thread, Goroutine, Actor Model

Table of Contents

1. Concurrency vs Parallelism: Rob Pike's Distinction

"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." -- Rob Pike

This single sentence captures the essential difference between concurrency and parallelism.

1.1 Conceptual Distinction

Concurrency is a structural property of a program. It means designing a program so that it can logically handle multiple tasks at the same time. Whether they actually execute simultaneously is irrelevant.

Parallelism is a property of execution. It means multiple tasks physically running at the same time. The classic example is multiple threads running simultaneously on a multi-core CPU.

Concurrency - Structure
┌─────────────────────────────────────────┐
Task A: ██░░██░░░░██░░                │
Task B: ░░██░░██░░░░██                │
Task C: ░░░░░░░░██░░░░██              │
│         ─────────────────→ Time  (Interleaved on a single core)└─────────────────────────────────────────┘

Parallelism - Execution
┌─────────────────────────────────────────┐
Core 1: ████████████████               │
Core 2: ████████████████               │
Core 3: ████████████████               │
│         ─────────────────→ Time  (Simultaneous on multiple cores)└─────────────────────────────────────────┘

1.2 Real-World Analogy

ScenarioConcurrentParallel
1 chef alternating between 3 dishesYesNo
3 chefs each cooking 1 dish simultaneouslyYesYes
1 chef cooking only 1 dishNoNo

1.3 Why the Distinction Matters

Pursuing parallelism without concurrency leads to exploding complexity. The correct approach is to first design proper concurrent structure, then leverage parallel execution for performance.

# Concurrent structure (asyncio) - efficient even on single thread
import asyncio

async def fetch_data(url: str) -> str:
    print(f"Fetching {url}...")
    await asyncio.sleep(1)  # I/O wait simulation
    return f"Data from {url}"

async def main():
    # Concurrency: structurally handling 3 requests simultaneously
    results = await asyncio.gather(
        fetch_data("https://api.example.com/users"),
        fetch_data("https://api.example.com/posts"),
        fetch_data("https://api.example.com/comments"),
    )
    for r in results:
        print(r)

asyncio.run(main())
# Total ~1 second (not 3) - the power of concurrency

2. Threading Models: From OS Threads to Virtual Threads

2.1 OS Threads (Kernel Threads)

Threads managed directly by the operating system. High creation cost (approximately 1MB stack) and context switching overhead.

// Java - OS Threads
public class ThreadExample {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Thread 1: " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Thread 2: " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("Both threads completed");
    }
}

2.2 User Threads (Green Threads)

Lightweight threads managed by the runtime. Multiple user threads are mapped onto OS threads.

Thread Model Comparison
┌───────────────────────────────────────────────┐
1:1 (OS Thread)    N:1 (Green Thread)│ ┌──┐ ┌──┐ ┌──┐    ┌──┐┌──┐┌──┐┌──┐┌──┐     │
│ │UT│ │UT│ │UT│    │GT││GT││GT││GT││GT│     │
│ └┬─┘ └┬─┘ └┬─┘    └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘     │
│  │    │    │        └──┬──┘  │  └──┬──┘       │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐       ┌┴─┐  │     │          │
│ │KT│ │KT│ │KT│       │KT│  │     │          │
│ └──┘ └──┘ └──┘       └──┘  │     │          │
│                              │     │          │
M:N (Hybrid)│ ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐                    │
│ │GT││GT││GT││GT││GT││GT│                    │
│ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘└┬─┘                    │
│  └─┬─┘ └──┬──┘ └──┬──┘                       │
│   ┌┴─┐   ┌┴─┐   ┌┴─┐                         │
│   │KT│   │KT│   │KT│                         │
│   └──┘   └──┘   └──┘                         │
└───────────────────────────────────────────────┘

2.3 Java 21 Virtual Threads

Virtual threads, officially introduced in Java 21, implement the M:N model. You can create millions of virtual threads.

// Java 21 Virtual Threads
import java.util.concurrent.Executors;
import java.time.Duration;

public class VirtualThreadDemo {
    public static void main(String[] args) throws Exception {
        // Can create 1 million virtual threads!
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < 1_000_000; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    try {
                        Thread.sleep(Duration.ofSeconds(1));
                        if (taskId % 100_000 == 0) {
                            System.out.println("Task " + taskId + " done on " +
                                Thread.currentThread());
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
        }
        System.out.println("All tasks completed");
    }
}

2.4 Thread Model Comparison

PropertyOS ThreadGreen ThreadVirtual Thread (Java 21)
Stack Size~1MB~KB~Few KB (dynamic)
Creation CostHigh (syscall)LowVery Low
Concurrent CountThousandsMillionsMillions
PreemptionOS preemptiveCooperativeCooperative (yields on I/O)
I/O BlockingBlocks threadRisk of blocking allAuto-unmount

3. Coroutines: The Core of Lightweight Concurrency

3.1 Stackful vs Stackless Coroutines

Stackful Coroutine (e.g., Lua, Go's goroutine)
┌──────────────────────────────────┐
Has its own stack                │
Can suspend anywhere in stack    │
Higher memory usage              │
Can yield from nested calls      │
└──────────────────────────────────┘

Stackless Coroutine (e.g., Python, Kotlin, Rust)
┌──────────────────────────────────┐
State stored on heap             │
Can only suspend at top level    │
Lower memory usage               │
Compiler transforms to state machine │
└──────────────────────────────────┘

3.2 Python asyncio Coroutines

import asyncio
import aiohttp
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    """Asynchronous HTTP request"""
    async with session.get(url) as response:
        return {
            "url": url,
            "status": response.status,
            "length": len(await response.text()),
        }

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
    ]

    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        # Execute 5 requests concurrently
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - start
    print(f"5 requests completed: {elapsed:.2f}s")  # ~1s (not 5s)

    for r in results:
        print(f"  {r['url']}: status={r['status']}, length={r['length']}")

asyncio.run(main())

3.3 Kotlin Coroutines

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

suspend fun fetchData(id: Int): String {
    delay(1000) // Async wait (no thread blocking)
    return "Result-$id"
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        // Structured concurrency: coroutineScope manages all child coroutines
        coroutineScope {
            val results = (1..10).map { id ->
                async(Dispatchers.IO) {
                    fetchData(id)
                }
            }
            results.awaitAll().forEach { println(it) }
        }
    }
    println("Completed in ${time}ms") // ~1000ms
}

4. Goroutines and Go's Concurrency Model

4.1 Goroutine Internal Structure

Go's runtime uses M:N scheduling. Three concepts are central: G (Goroutine), M (Machine/OS Thread), and P (Processor).

Go Runtime Scheduler (GMP Model)
┌─────────────────────────────────────────────┐
│                                             │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐      │
│  │ G1   │ │ G2   │ │ G3   │ │ G4...│  └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘      │
│     │        │        │        │            │
│  ┌──┴────────┴──┐  ┌──┴────────┴──┐        │
│  │   P (Local   │  │   P (Local   │        │
│  │    Queue)    │  │    Queue)    │        │
│  └──────┬───────┘  └──────┬───────┘        │
│         │                 │                 │
│  ┌──────┴───────┐  ┌──────┴───────┐        │
│  │ M (OS Thread)│  │ M (OS Thread)│        │
│  └──────────────┘  └──────────────┘        │
│                                             │
Global Run Queue: [G5, G6, G7, ...]└─────────────────────────────────────────────┘

GOMAXPROCS = number of P's (default: CPU core count)

4.2 Basic Goroutine Usage

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg) // goroutine creation - ~2KB stack
    }

    wg.Wait()
    fmt.Println("All workers completed")
}

4.3 Work Stealing

When a P's local queue is empty, it steals goroutines from another P's queue. This distributes load evenly.

Work Stealing Process
┌─────────────┐     ┌─────────────┐
P1 (busy)   │     │ P2 (idle)[G1,G2,G3]  │ ──→ │ []│             │steal│             │
[G1,G2]    │ ──→ │ [G3]└─────────────┘     └─────────────┘

5. Async/Await Pattern Comparison

5.1 JavaScript Promise and Async/Await

// Promise chaining vs Async/Await
// Promise chaining approach
function fetchUserData(userId) {
  return fetch(`/api/users/${userId}`)
    .then(response => response.json())
    .then(user => fetch(`/api/posts?author=${user.id}`))
    .then(response => response.json())
    .then(posts => {
      return { user, posts };
    })
    .catch(error => {
      console.error('Error:', error);
      throw error;
    });
}

// Async/Await approach (much more readable)
async function fetchUserDataAsync(userId) {
  try {
    const userResponse = await fetch(`/api/users/${userId}`);
    const user = await userResponse.json();

    const postsResponse = await fetch(`/api/posts?author=${user.id}`);
    const posts = await postsResponse.json();

    return { user, posts };
  } catch (error) {
    console.error('Error:', error);
    throw error;
  }
}

// Parallel execution
async function fetchMultipleUsers(userIds) {
  const promises = userIds.map(id => fetchUserDataAsync(id));
  return Promise.all(promises); // All execute in parallel
}

// Promise.allSettled - continues even if some fail
async function fetchWithPartialFailure(urls) {
  const results = await Promise.allSettled(
    urls.map(url => fetch(url).then(r => r.json()))
  );

  const successes = results
    .filter(r => r.status === 'fulfilled')
    .map(r => r.value);
  const failures = results
    .filter(r => r.status === 'rejected')
    .map(r => r.reason);

  return { successes, failures };
}

5.2 Rust Async

use tokio;
use reqwest;
use futures::future::join_all;

#[derive(Debug)]
struct ApiResponse {
    url: String,
    status: u16,
    body_length: usize,
}

async fn fetch_url(client: &reqwest::Client, url: &str) -> Result<ApiResponse, reqwest::Error> {
    let response = client.get(url).send().await?;
    let status = response.status().as_u16();
    let body = response.text().await?;

    Ok(ApiResponse {
        url: url.to_string(),
        status,
        body_length: body.len(),
    })
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();
    let urls = vec![
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
    ];

    // Execute all requests concurrently
    let futures: Vec<_> = urls.iter()
        .map(|url| fetch_url(&client, url))
        .collect();

    let results = join_all(futures).await;

    for result in results {
        match result {
            Ok(resp) => println!("{}: status={}, length={}", resp.url, resp.status, resp.body_length),
            Err(e) => eprintln!("Error: {}", e),
        }
    }

    Ok(())
}

5.3 C# Task-Based Asynchrony

using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;

public class AsyncDemo
{
    private static readonly HttpClient _client = new HttpClient();

    public static async Task<string> FetchDataAsync(string url)
    {
        var response = await _client.GetAsync(url);
        response.EnsureSuccessStatusCode();
        return await response.Content.ReadAsStringAsync();
    }

    public static async Task Main()
    {
        var urls = new List<string>
        {
            "https://httpbin.org/get",
            "https://httpbin.org/ip",
            "https://httpbin.org/user-agent",
        };

        // Parallel execution with WhenAll
        var tasks = urls.Select(url => FetchDataAsync(url));
        var results = await Task.WhenAll(tasks);

        for (int i = 0; i < urls.Count; i++)
        {
            Console.WriteLine($"URL: {urls[i]}, Length: {results[i].Length}");
        }

        // Timeout pattern
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
        try
        {
            var data = await _client.GetStringAsync(urls[0], cts.Token);
            Console.WriteLine($"Got data: {data.Length} chars");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Request timed out!");
        }
    }
}

5.4 Async/Await Language Comparison

PropertyJavaScriptPythonRustC#Kotlin
RuntimeV8 Event Loopasynciotokio/async-stdCLR Thread PoolDispatchers
Future TypePromiseCoroutineFuture (lazy)TaskDeferred
CancellationAbortControllerTask.cancel()dropCancellationTokenJob.cancel()
StreamsAsyncIteratorasync forStream traitIAsyncEnumerableFlow
Error Handlingtry/catchtry/exceptResulttry/catchtry/catch
Parallel ExecPromise.allgatherjoin!Task.WhenAllawaitAll
Zero-costNoNoYesNoNo

6. Event Loop Deep Dive

6.1 Node.js libuv Event Loop

Node.js Event Loop Phases
┌───────────────────────────────────┐
│         ┌──────────────┐          │
│    ┌────│   timers     │────┐     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │ pending I/O  │    │     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │  idle/prepare │    │     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │    poll       │◄───┘    │
│    │    └──────────────┘         │
│    │    ┌──────────────┐         │
│    │    │    check      │         │
│    │     (setImmediate)│         │
│    │    └──────────────┘         │
│    │    ┌──────────────┐         │
│    └────│close callbacks│─────────│
│         └──────────────┘         │
│                                   │
Between each phase: microtask    │
queue (Promise, queueMicrotask)└───────────────────────────────────┘
// Understanding Event Loop execution order
console.log('1. Synchronous code');

setTimeout(() => console.log('2. setTimeout (timers)'), 0);

setImmediate(() => console.log('3. setImmediate (check)'));

Promise.resolve().then(() => console.log('4. Promise (microtask)'));

process.nextTick(() => console.log('5. nextTick (microtask - highest priority)'));

console.log('6. End of synchronous code');

// Output order:
// 1. Synchronous code
// 6. End of synchronous code
// 5. nextTick (microtask - highest priority)
// 4. Promise (microtask)
// 2. setTimeout (timers)
// 3. setImmediate (check)

6.2 Python asyncio Event Loop

import asyncio
import time

async def cpu_bound_simulation(name: str, duration: float):
    """CPU-bound work simulation"""
    print(f"[{name}] Starting")
    # Without await yield, the event loop would be blocked
    await asyncio.sleep(duration)
    print(f"[{name}] Complete ({duration}s)")
    return f"{name}: done"

async def producer(queue: asyncio.Queue, n: int):
    """Producer: add items to queue"""
    for i in range(n):
        await asyncio.sleep(0.1)
        item = f"item-{i}"
        await queue.put(item)
        print(f"  Produced: {item}")
    await queue.put(None)  # Sentinel value

async def consumer(queue: asyncio.Queue, name: str):
    """Consumer: consume items from queue"""
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # Pass sentinel to other consumers
            break
        await asyncio.sleep(0.2)  # Processing simulation
        print(f"  {name} consumed: {item}")
        queue.task_done()

async def main():
    # 1. Concurrent execution
    start = time.perf_counter()
    results = await asyncio.gather(
        cpu_bound_simulation("TaskA", 1.0),
        cpu_bound_simulation("TaskB", 0.5),
        cpu_bound_simulation("TaskC", 0.8),
    )
    elapsed = time.perf_counter() - start
    print(f"Total time: {elapsed:.2f}s")  # ~1s
    print(f"Results: {results}")

    # 2. Producer-consumer pattern
    print("\n--- Producer-Consumer ---")
    queue = asyncio.Queue(maxsize=5)
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
    )

asyncio.run(main())

7. Actor Model: Concurrency Without Shared State

7.1 Actor Model Concepts

Actor Model
┌─────────────────────────────────────────────────┐
│                                                 │
│  ┌─────────┐  message  ┌─────────┐             │
│  │ Actor A │ ───────→  │ Actor B │             │
│  │ [State] │           │ [State] │             │
│  │ [Mailbox][Mailbox]│  └─────────┘           └────┬────┘             │
│       ↑                     │                   │
│       │       message       │ spawn new Actor│       └─────────────────────┘                   │
│                              ↓                   │
│                        ┌─────────┐              │
│                        │ Actor C │              │
│                        │ [State] │              │
│                        └─────────┘              │
│                                                 │
Rules:1. Communicate only via messages (no shared state)2. Process one message at a time               │
3. Can spawn new Actors4. Can only modify own state                   │
└─────────────────────────────────────────────────┘

7.2 Erlang/Elixir Actor

%% Erlang - Counter Actor
-module(counter).
-export([start/0, increment/1, get/1, loop/1]).

start() ->
    spawn(fun() -> loop(0) end).

increment(Pid) ->
    Pid ! {increment, 1}.

get(Pid) ->
    Pid ! {get, self()},
    receive
        {count, Value} -> Value
    after 5000 ->
        timeout
    end.

loop(Count) ->
    receive
        {increment, N} ->
            loop(Count + N);
        {get, From} ->
            From ! {count, Count},
            loop(Count);
        stop ->
            ok
    end.

%% Usage:
%% Pid = counter:start().
%% counter:increment(Pid).
%% counter:increment(Pid).
%% counter:get(Pid). %% => 2

7.3 Akka (Scala) Actor

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors

// Message definitions
sealed trait Command
case class Deposit(amount: Double) extends Command
case class Withdraw(amount: Double) extends Command
case class GetBalance(replyTo: ActorRef[Balance]) extends Command
case class Balance(amount: Double)

// Bank Account Actor
object BankAccount {
  def apply(balance: Double = 0.0): Behavior[Command] =
    Behaviors.receive { (context, message) =>
      message match {
        case Deposit(amount) =>
          val newBalance = balance + amount
          context.log.info(s"Deposited $amount, balance: $newBalance")
          BankAccount(newBalance)

        case Withdraw(amount) =>
          if (amount <= balance) {
            val newBalance = balance - amount
            context.log.info(s"Withdrew $amount, balance: $newBalance")
            BankAccount(newBalance)
          } else {
            context.log.warn(s"Insufficient funds: $balance < $amount")
            Behaviors.same
          }

        case GetBalance(replyTo) =>
          replyTo ! Balance(balance)
          Behaviors.same
      }
    }
}

// Supervision (fault recovery)
object BankSupervisor {
  def apply(): Behavior[Command] =
    Behaviors.supervise(BankAccount())
      .onFailure(SupervisorStrategy.restart)
}

7.4 Actor Model Pros and Cons

ProsCons
No shared state (prevents race conditions)Can be difficult to debug
Location transparency (distributable)Limited message ordering guarantees
Fault isolation (let it crash)Potential mailbox overflow
Good scalabilitySynchronous request-response is complex

8. CSP: Go's Channel-Based Concurrency

8.1 Channel Basics

package main

import (
    "fmt"
    "time"
)

func main() {
    // Unbuffered channel (synchronous)
    ch := make(chan string)

    go func() {
        time.Sleep(time.Second)
        ch <- "Hello from goroutine!"
    }()

    msg := <-ch // Blocks until message arrives
    fmt.Println(msg)

    // Buffered channel (asynchronous)
    buffered := make(chan int, 3)
    buffered <- 1
    buffered <- 2
    buffered <- 3
    // buffered <- 4 // Would block - buffer full!

    fmt.Println(<-buffered) // 1
    fmt.Println(<-buffered) // 2
}

8.2 Fan-In / Fan-Out Pattern

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Fan-Out: distribute one input to multiple workers
func fanOut(input <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan int)
        channels[i] = ch
        go func(workerID int, out chan<- int) {
            defer close(out)
            for val := range input {
                // Processing simulation
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                out <- val * val
                fmt.Printf("  Worker %d processed %d\n", workerID, val)
            }
        }(i, ch)
    }
    return channels
}

// Fan-In: merge results from multiple channels into one
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                merged <- val
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // Input channel
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 20; i++ {
            input <- i
        }
    }()

    // Fan-Out: 4 workers
    workerChannels := fanOut(input, 4)

    // Fan-In: merge results
    results := fanIn(workerChannels...)

    // Collect results
    var sum int
    for result := range results {
        sum += result
    }
    fmt.Printf("Sum of squares: %d\n", sum)
}

8.3 Select Statement

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from channel 1"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from channel 2"
    }()

    // select: choose whichever channel is ready
    for i := 0; i < 2; i++ {
        select {
        case msg := <-ch1:
            fmt.Println("Received", msg)
        case msg := <-ch2:
            fmt.Println("Received", msg)
        case <-time.After(3 * time.Second):
            fmt.Println("Timeout!")
        }
    }

    // Non-blocking select
    ch := make(chan int, 1)
    select {
    case val := <-ch:
        fmt.Println("Got:", val)
    default:
        fmt.Println("Channel empty, moving on")
    }
}

9. Synchronization Primitives

9.1 Mutex and RWLock

package main

import (
    "fmt"
    "sync"
)

// Mutex: mutual exclusion
type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.v[key]++
}

func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.v[key]
}

// RWMutex: concurrent reads, exclusive writes
type SafeCache struct {
    mu    sync.RWMutex
    store map[string]string
}

func (c *SafeCache) Get(key string) (string, bool) {
    c.mu.RLock()         // Read lock (multiple goroutines can read concurrently)
    defer c.mu.RUnlock()
    val, ok := c.store[key]
    return val, ok
}

func (c *SafeCache) Set(key, value string) {
    c.mu.Lock()          // Write lock (exclusive)
    defer c.mu.Unlock()
    c.store[key] = value
}

func main() {
    counter := SafeCounter{v: make(map[string]int)}
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc("key")
        }()
    }
    wg.Wait()
    fmt.Println("Final count:", counter.Value("key")) // Exactly 1000
}

9.2 Semaphore

import asyncio

async def worker(sem: asyncio.Semaphore, task_id: int):
    async with sem:  # Max 3 concurrent executions
        print(f"Task {task_id}: started (slot acquired)")
        await asyncio.sleep(1)
        print(f"Task {task_id}: completed (slot released)")

async def main():
    sem = asyncio.Semaphore(3)  # Limit to 3 concurrent
    tasks = [worker(sem, i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

9.3 Condition Variable

import threading
import time
import random

class BoundedBuffer:
    def __init__(self, capacity: int):
        self.buffer = []
        self.capacity = capacity
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)

    def produce(self, item):
        with self.not_full:
            while len(self.buffer) >= self.capacity:
                self.not_full.wait()  # Wait until buffer has space
            self.buffer.append(item)
            self.not_empty.notify()  # Notify consumer

    def consume(self):
        with self.not_empty:
            while len(self.buffer) == 0:
                self.not_empty.wait()  # Wait until item available
            item = self.buffer.pop(0)
            self.not_full.notify()  # Notify producer
            return item

buf = BoundedBuffer(5)

def producer():
    for i in range(20):
        buf.produce(i)
        print(f"Produced: {i}")
        time.sleep(random.uniform(0.01, 0.1))

def consumer(name):
    for _ in range(10):
        item = buf.consume()
        print(f"{name} consumed: {item}")
        time.sleep(random.uniform(0.05, 0.15))

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("C1",))
t3 = threading.Thread(target=consumer, args=("C2",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

9.4 Atomic Operations

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class AtomicDemo {
    // AtomicLong - lock-free counter
    private static final AtomicLong counter = new AtomicLong(0);

    // AtomicReference - lock-free reference update
    private static final AtomicReference<String> config =
        new AtomicReference<>("default");

    public static void main(String[] args) throws InterruptedException {
        // CAS (Compare-And-Swap) loop
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10_000; j++) {
                    counter.incrementAndGet(); // Atomic increment
                }
            });
            threads[i].start();
        }

        for (Thread t : threads) t.join();
        System.out.println("Counter: " + counter.get()); // Exactly 100,000

        // Direct CAS usage
        boolean updated = config.compareAndSet("default", "production");
        System.out.println("Config updated: " + updated); // true
        System.out.println("Config value: " + config.get()); // production
    }
}

10. Lock-Free Data Structures

10.1 CAS (Compare-And-Swap)

CAS is the foundation of lock-free programming. It is an atomic operation that says "if the current value equals the expected value, replace it with the new value."

CAS Operation
┌──────────────────────────────────────┐
CAS(address, expected, new_value)│                                      │
if *address == expected:*address = new_value             │
return true   // success         │
else:return false  // fail -> retry   │
│                                      │
 (entire operation is hardware-atomic)└──────────────────────────────────────┘

10.2 The ABA Problem

ABA Problem
Thread 1: reads value A -> (suspended)
Thread 2: changes A -> B
Thread 3: changes B -> A
Thread 1: (resumes) CAS(A, A, C) -> success! (but A is not the original A)

Solution: add version counter
AtomicStampedReference (Java)
TaggedPointer (C/C++)

10.3 Lock-Free Stack (Treiber Stack)

import java.util.concurrent.atomic.AtomicReference;

public class LockFreeStack<T> {
    private static class Node<T> {
        final T value;
        Node<T> next;

        Node(T value) {
            this.value = value;
        }
    }

    private final AtomicReference<Node<T>> top = new AtomicReference<>(null);

    public void push(T value) {
        Node<T> newNode = new Node<>(value);
        Node<T> oldTop;
        do {
            oldTop = top.get();
            newNode.next = oldTop;
        } while (!top.compareAndSet(oldTop, newNode)); // CAS loop
    }

    public T pop() {
        Node<T> oldTop;
        Node<T> newTop;
        do {
            oldTop = top.get();
            if (oldTop == null) return null;
            newTop = oldTop.next;
        } while (!top.compareAndSet(oldTop, newTop)); // CAS loop
        return oldTop.value;
    }
}

11. Concurrency Bug Patterns and Prevention

11.1 Deadlock

Deadlock Conditions (all 4 must be met)
1. Mutual Exclusion
2. Hold and Wait
3. No Preemption
4. Circular Wait

┌──────────┐          ┌──────────┐
Thread A │──Lock1──→│ Thread B│          │←──Lock2──│          │
 (waiting │           (waiting │
Lock2)  │          │  Lock1)└──────────┘          └──────────┘
import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

# Code that causes deadlock
def thread_1():
    with lock_a:
        print("Thread 1: Acquired Lock A")
        time.sleep(0.1)
        with lock_b:  # Waiting for Lock B -> deadlock!
            print("Thread 1: Acquired Lock B")

def thread_2():
    with lock_b:
        print("Thread 2: Acquired Lock B")
        time.sleep(0.1)
        with lock_a:  # Waiting for Lock A -> deadlock!
            print("Thread 2: Acquired Lock A")

# Solution: consistent lock ordering
def thread_1_fixed():
    with lock_a:     # Always A -> B order
        print("Thread 1: Acquired Lock A")
        time.sleep(0.1)
        with lock_b:
            print("Thread 1: Acquired Lock B")

def thread_2_fixed():
    with lock_a:     # Always A -> B order (prevents circular wait)
        print("Thread 2: Acquired Lock A")
        time.sleep(0.1)
        with lock_b:
            print("Thread 2: Acquired Lock B")

11.2 Race Condition

package main

import (
    "fmt"
    "sync"
)

func main() {
    // Race condition example
    counter := 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // DATA RACE! read-modify-write is not atomic
        }()
    }
    wg.Wait()
    fmt.Println("Counter (unsafe):", counter) // May be less than 1000

    // Solution 1: Mutex
    counter = 0
    var mu sync.Mutex
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counter++
            mu.Unlock()
        }()
    }
    wg.Wait()
    fmt.Println("Counter (mutex):", counter) // Exactly 1000

    // Solution 2: Channel
    counter = 0
    ch := make(chan int, 1000)
    for i := 0; i < 1000; i++ {
        go func() {
            ch <- 1
        }()
    }
    for i := 0; i < 1000; i++ {
        counter += <-ch
    }
    fmt.Println("Counter (channel):", counter) // Exactly 1000
}

11.3 Starvation and Priority Inversion

Starvation
- A specific thread waits indefinitely for resource access
- Solution: Fair Lock, Priority Aging

Priority Inversion
┌─────────────────────────────────────┐
High Priority Task   <- blocked     │
| (waiting for Lock)Medium Priority Task <- running     │
| (preempts)Low Priority Task    <- holds Lock│                                     │
Solution: Priority InheritanceLow priority task temporarily       │
│ inherits high priority when holding │
│ a lock needed by high priority task │
└─────────────────────────────────────┘

12. Structured Concurrency

12.1 Concept

Structured concurrency is a pattern that ties the lifetime of concurrent operations to a code block (scope). When the scope ends, all child tasks are guaranteed to be complete (or cancelled).

Unstructured Concurrency     Structured Concurrency
┌────────────────┐         ┌────────────────┐
│ start task A   │         │ scope {│ start task B   │         │   task A... (leaked?)  │         │   task B│ forget task A? │         │ } // A,B done  │
└────────────────┘         └────────────────┘

12.2 Python TaskGroup (3.11+)

import asyncio

async def fetch(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    if name == "failing":
        raise ValueError(f"{name} failed!")
    return f"{name}: done"

async def main():
    # TaskGroup: structured concurrency
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(fetch("api-1", 0.5))
            task2 = tg.create_task(fetch("api-2", 1.0))
            task3 = tg.create_task(fetch("api-3", 0.8))
        # All tasks guaranteed complete when we reach here
        print(task1.result(), task2.result(), task3.result())
    except* ValueError as eg:
        # Error handling with ExceptionGroup
        for exc in eg.exceptions:
            print(f"Error: {exc}")

asyncio.run(main())

12.3 Java 21 Structured Concurrency

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class StructuredConcurrencyDemo {

    record User(String name) {}
    record Order(String id) {}

    static User fetchUser(String userId) throws Exception {
        Thread.sleep(500);
        return new User("Alice");
    }

    static Order fetchOrder(String orderId) throws Exception {
        Thread.sleep(800);
        return new Order("ORD-123");
    }

    public static void main(String[] args) throws Exception {
        // ShutdownOnFailure: cancel remaining if any fails
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Future<User> userFuture = scope.fork(() -> fetchUser("user-1"));
            Future<Order> orderFuture = scope.fork(() -> fetchOrder("order-1"));

            scope.join();           // Wait for all tasks
            scope.throwIfFailed();  // Propagate failure

            // All results available when we reach here
            User user = userFuture.resultNow();
            Order order = orderFuture.resultNow();
            System.out.printf("User: %s, Order: %s%n", user.name(), order.id());
        }
    }
}

13. Language Concurrency Model Comparison

DimensionGoRustJava 21PythonKotlinJavaScriptErlang/ElixirC#
ModelCSPOwnership + asyncVirtual ThreadasyncioCoroutineEvent LoopActorTask
Lightweight UnitgoroutineFutureVirtual ThreadcoroutinecoroutinePromiseprocessTask
SchedulingM:N (GMP)Runtime-dependentM:N1:1 (single)M:N1:1 (single)Preemptive M:NM:N
Shared Statechannelownershipsynchronizedasyncio.LockMutexnone (single)none (message)lock
Error Propagationpanic/recoverResultexceptionexceptionexceptionrejectlink/monitorexception
CancellationcontextdropinterruptcancelJob.cancelAbortControllerexitCancelToken
Structured Conc.errgrouplimitedStructuredTaskScopeTaskGroupcoroutineScopenonesupervisornone
CPU-boundgoroutinethread/rayonthreadmultiprocessingDispatchers.DefaultWorker ThreadprocessParallel.ForEach
Memory SafetyGCcompile-timeGCGCGCGCGCGC
Deadlock Prev.channel designcompile-timecare neededcare neededcare neededN/Amessage-basedcare needed
Performancehighvery highhighmoderatehighI/O onlyhigh (BEAM)high
Learning Curvelowhighmediumlowmediumlowmediummedium

14. Practical Pattern: Rate Limiter

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Token Bucket Rate Limiter
type RateLimiter struct {
    tokens     chan struct{}
    refillRate time.Duration
    ctx        context.Context
    cancel     context.CancelFunc
}

func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
    ctx, cancel := context.WithCancel(context.Background())
    rl := &RateLimiter{
        tokens:     make(chan struct{}, maxTokens),
        refillRate: refillRate,
        ctx:        ctx,
        cancel:     cancel,
    }

    // Fill tokens
    for i := 0; i < maxTokens; i++ {
        rl.tokens <- struct{}{}
    }

    // Periodic refill
    go func() {
        ticker := time.NewTicker(refillRate)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case rl.tokens <- struct{}{}:
                default: // Already full
                }
            case <-ctx.Done():
                return
            }
        }
    }()

    return rl
}

func (rl *RateLimiter) Acquire(ctx context.Context) error {
    select {
    case <-rl.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (rl *RateLimiter) Close() {
    rl.cancel()
}

func main() {
    limiter := NewRateLimiter(5, 200*time.Millisecond)
    defer limiter.Close()

    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
            defer cancel()

            if err := limiter.Acquire(ctx); err != nil {
                fmt.Printf("Request %d: timed out\n", id)
                return
            }
            fmt.Printf("Request %d: processed at %s\n", id,
                time.Now().Format("15:04:05.000"))
        }(i)
    }
    wg.Wait()
}

15. Performance Optimization Tips

15.1 Choosing the Right Concurrency Model

Selection by Workload Type

I/O-bound (network, disk):
  -> async/await, coroutine, goroutine
  -> Event loop-based is most efficient
  -> Thread pools cause excessive context switching

CPU-bound (computation, transformation):
  -> OS threads (Python: multiprocessing)
  -> Rust: rayon crate
  -> Go: goroutines (automatic parallelization)

Mixed:
  -> Separate thread pools/dispatchers
  -> Kotlin: Dispatchers.IO vs Dispatchers.Default
  -> Python: asyncio + ProcessPoolExecutor

15.2 Minimizing Context Switching

import asyncio
import concurrent.futures
import time

# Execute CPU-bound work in separate processes
def cpu_heavy_task(n: int) -> int:
    """Prime counting (CPU-bound)"""
    count = 0
    for i in range(2, n):
        if all(i % j != 0 for j in range(2, int(i**0.5) + 1)):
            count += 1
    return count

async def main():
    loop = asyncio.get_event_loop()

    # Separate CPU-bound work with ProcessPoolExecutor
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
        start = time.perf_counter()
        results = await asyncio.gather(
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
        )
        elapsed = time.perf_counter() - start
        print(f"4 tasks completed in {elapsed:.2f}s: {results}")

asyncio.run(main())

16. Quiz

Q1. What is the key difference between concurrency and parallelism?

Concurrency is a structural property of a program -- designing it so multiple tasks can be logically handled at the same time. Parallelism is a property of execution -- multiple tasks physically running at the same time. Concurrency is possible on a single core, but parallelism requires multiple cores.

Q2. In Go's GMP model, what are G, M, and P?
  • G (Goroutine): The unit of work to execute. A lightweight thread.
  • M (Machine): An OS thread. Responsible for actual execution.
  • P (Processor): A logical processor. The scheduler that maps G to M. The number is determined by GOMAXPROCS.

P has a local run queue and uses work stealing to distribute load evenly.

Q3. What 4 conditions must all be met for deadlock to occur?
  1. Mutual Exclusion: Only one process can use a resource at a time
  2. Hold and Wait: Holding a resource while waiting for another
  3. No Preemption: Resources cannot be forcibly taken from a process
  4. Circular Wait: Circular chain of processes each waiting for the next

Breaking any one prevents deadlock. The most common solution is consistent lock ordering to prevent circular wait.

Q4. What is the core difference between the Actor model and CSP?
  • Actor Model: Asynchronous message passing. Each Actor has its own mailbox. Sender specifies the receiver directly. Erlang, Akka.
  • CSP: Synchronous channel communication. Channels are independent entities. Sender and receiver are unaware of each other. Go's goroutine + channel.

Actors know "who to send to," while CSP knows "which channel to send to."

Q5. What problems does structured concurrency solve?

Structured concurrency solves:

  1. Task Leaking: Fire-and-forget tasks that are forgotten
  2. Error Propagation: Child task errors not reaching the parent
  3. Cancellation Propagation: Children continuing after parent cancellation
  4. Lifetime Management: Tasks continuing after their scope has ended

Key implementations include Java 21's StructuredTaskScope, Python's TaskGroup, and Kotlin's coroutineScope.


17. References

  1. Rob Pike - "Concurrency is not Parallelism" (Go Blog)
  2. Java 21 Virtual Threads - JEP 444
  3. Java 21 Structured Concurrency - JEP 453
  4. Python asyncio Official Documentation
  5. Go Official Documentation - Effective Go (Concurrency)
  6. Kotlin Coroutines Official Guide
  7. Rust async-book (The Async Book)
  8. Akka Documentation - Actor Model
  9. Erlang/OTP - Processes
  10. "The Art of Multiprocessor Programming" - Maurice Herlihy, Nir Shavit
  11. "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 8)
  12. "Concurrency in Go" - Katherine Cox-Buday (O'Reilly)
  13. tokio.rs - Rust Async Runtime
  14. libuv - Node.js Event Loop Implementation