Skip to content

✍️ 필사 모드: 동시성 & 병렬 프로그래밍 완전 가이드 2025: Async/Await, Thread, Goroutine, Actor 모델

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.

목차

1. 동시성 vs 병렬성: Rob Pike의 구분

"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." -- Rob Pike

이 한 문장이 동시성과 병렬성의 핵심 차이를 요약합니다.

1.1 개념적 구분

동시성(Concurrency) 은 프로그램의 구조적 특성입니다. 여러 작업을 논리적으로 동시에 다룰 수 있도록 프로그램을 설계하는 것입니다. 실제로 동시에 실행되는지는 중요하지 않습니다.

병렬성(Parallelism)실행의 특성입니다. 여러 작업이 물리적으로 동시에 실행되는 것을 의미합니다. 멀티코어 CPU에서 여러 스레드가 동시에 동작하는 것이 대표적입니다.

동시성 (Concurrency) - 구조
┌─────────────────────────────────────────┐
Task A: ██░░██░░░░██░░                │
Task B: ░░██░░██░░░░██                │
Task C: ░░░░░░░░██░░░░██              │
│         ─────────────────→ 시간         │
  (단일 코어에서 번갈아 실행)└─────────────────────────────────────────┘

병렬성 (Parallelism) - 실행
┌─────────────────────────────────────────┐
Core 1: ████████████████               │
Core 2: ████████████████               │
Core 3: ████████████████               │
│         ─────────────────→ 시간         │
  (다중 코어에서 동시에 실행)└─────────────────────────────────────────┘

1.2 실생활 비유

상황동시성병렬성
요리사 1명이 3개 요리를 번갈아 만듦OX
요리사 3명이 각각 1개씩 동시에 만듦OO
요리사 1명이 1개 요리만 만듦XX

1.3 왜 구분이 중요한가

동시성 없이 병렬성만 추구하면 복잡도가 폭발합니다. 올바른 동시성 구조를 먼저 설계한 뒤, 병렬 실행으로 성능을 끌어올리는 것이 정석입니다.

# 동시성 구조 (asyncio) - 단일 스레드에서도 효율적
import asyncio

async def fetch_data(url: str) -> str:
    print(f"Fetching {url}...")
    await asyncio.sleep(1)  # I/O 대기 시뮬레이션
    return f"Data from {url}"

async def main():
    # 동시성: 3개 요청을 구조적으로 동시에 다룸
    results = await asyncio.gather(
        fetch_data("https://api.example.com/users"),
        fetch_data("https://api.example.com/posts"),
        fetch_data("https://api.example.com/comments"),
    )
    for r in results:
        print(r)

asyncio.run(main())
# 총 ~1초 (3초가 아닌) - 동시성의 힘

2. 스레딩 모델: OS 스레드부터 가상 스레드까지

2.1 OS 스레드 (Kernel Thread)

운영체제가 직접 관리하는 스레드입니다. 생성 비용이 높고 (약 1MB 스택), 컨텍스트 스위칭 오버헤드가 존재합니다.

// Java - OS 스레드
public class ThreadExample {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Thread 1: " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Thread 2: " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("Both threads completed");
    }
}

2.2 유저 스레드 (User Thread / Green Thread)

런타임이 관리하는 경량 스레드입니다. OS 스레드 위에 다수의 유저 스레드를 매핑합니다.

스레드 모델 비교
┌───────────────────────────────────────────────┐
1:1 (OS Thread)    N:1 (Green Thread)│ ┌──┐ ┌──┐ ┌──┐    ┌──┐┌──┐┌──┐┌──┐┌──┐     │
│ │UT│ │UT│ │UT│    │GT││GT││GT││GT││GT│     │
│ └┬─┘ └┬─┘ └┬─┘    └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘     │
│  │    │    │        └──┬──┘  │  └──┬──┘       │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐       ┌┴─┐  │     │          │
│ │KT│ │KT│ │KT│       │KT│  │     │          │
│ └──┘ └──┘ └──┘       └──┘  │     │          │
│                              │     │          │
M:N (Hybrid)│ ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐                    │
│ │GT││GT││GT││GT││GT││GT│                    │
│ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘└┬─┘                    │
│  └─┬─┘ └──┬──┘ └──┬──┘                       │
│   ┌┴─┐   ┌┴─┐   ┌┴─┐                         │
│   │KT│   │KT│   │KT│                         │
│   └──┘   └──┘   └──┘                         │
└───────────────────────────────────────────────┘

2.3 Java 21 가상 스레드 (Virtual Thread)

Java 21에서 정식 도입된 가상 스레드는 M:N 모델을 구현합니다. 수백만 개의 가상 스레드를 생성할 수 있습니다.

// Java 21 Virtual Threads
import java.util.concurrent.Executors;
import java.time.Duration;

public class VirtualThreadDemo {
    public static void main(String[] args) throws Exception {
        // 100만 개의 가상 스레드 생성 가능!
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < 1_000_000; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    try {
                        Thread.sleep(Duration.ofSeconds(1));
                        if (taskId % 100_000 == 0) {
                            System.out.println("Task " + taskId + " done on " +
                                Thread.currentThread());
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
        }
        System.out.println("All tasks completed");
    }
}

2.4 스레드 모델 비교

특성OS 스레드Green ThreadVirtual Thread (Java 21)
스택 크기~1MB~KB~수 KB (동적)
생성 비용높음 (syscall)낮음매우 낮음
동시 개수수천수백만수백만
선점OS 선점협력적협력적 (I/O 시 양보)
I/O 블로킹스레드 블로킹전체 블로킹 위험자동 언마운트

3. 코루틴: 경량 동시성의 핵심

3.1 Stackful vs Stackless 코루틴

Stackful Coroutine (: Lua, Go의 goroutine)
┌──────────────────────────────┐
│ 자체 스택을 가짐               │
│ 스택 어디서든 일시중단 가능     │
│ 메모리 사용 높음               │
│ 중첩 함수 호출 중에도 양보 가능 │
└──────────────────────────────┘

Stackless Coroutine (: Python, Kotlin, Rust)
┌──────────────────────────────┐
│ 상태를 힙에 저장               │
│ 최상위에서만 일시중단 가능      │
│ 메모리 사용 낮음               │
│ 컴파일러가 상태 머신으로 변환   │
└──────────────────────────────┘

3.2 Python asyncio 코루틴

import asyncio
import aiohttp
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    """비동기 HTTP 요청"""
    async with session.get(url) as response:
        return {
            "url": url,
            "status": response.status,
            "length": len(await response.text()),
        }

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
    ]

    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        # 5개 요청을 동시에 실행
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - start
    print(f"5개 요청 완료: {elapsed:.2f}초")  # ~1초 (5초가 아닌)

    for r in results:
        print(f"  {r['url']}: status={r['status']}, length={r['length']}")

asyncio.run(main())

3.3 Kotlin 코루틴

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

suspend fun fetchData(id: Int): String {
    delay(1000) // 비동기 대기 (스레드 블로킹 없음)
    return "Result-$id"
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        // 구조적 동시성: coroutineScope가 모든 자식 코루틴 관리
        coroutineScope {
            val results = (1..10).map { id ->
                async(Dispatchers.IO) {
                    fetchData(id)
                }
            }
            results.awaitAll().forEach { println(it) }
        }
    }
    println("Completed in ${time}ms") // ~1000ms
}

4. Goroutine과 Go의 동시성 모델

4.1 Goroutine 내부 구조

Go 런타임은 M:N 스케줄링을 사용합니다. G(Goroutine), M(Machine/OS Thread), P(Processor) 세 가지 개념이 핵심입니다.

Go 런타임 스케줄러 (GMP 모델)
┌─────────────────────────────────────────────┐
│                                             │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐      │
│  │ G1   │ │ G2   │ │ G3   │ │ G4...│  └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘      │
│     │        │        │        │            │
│  ┌──┴────────┴──┐  ┌──┴────────┴──┐        │
│  │   P (Local   │  │   P (Local   │        │
│  │    Queue)    │  │    Queue)    │        │
│  └──────┬───────┘  └──────┬───────┘        │
│         │                 │                 │
│  ┌──────┴───────┐  ┌──────┴───────┐        │
│  │ M (OS Thread)│  │ M (OS Thread)│        │
│  └──────────────┘  └──────────────┘        │
│                                             │
Global Run Queue: [G5, G6, G7, ...]└─────────────────────────────────────────────┘

GOMAXPROCS = P (기본값: CPU 코어 수)

4.2 Goroutine 기본 사용

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg) // goroutine 생성 - 약 2KB 스택
    }

    wg.Wait()
    fmt.Println("All workers completed")
}

4.3 Work Stealing

P의 로컬 큐가 비면 다른 P의 큐에서 goroutine을 훔쳐옵니다. 이를 통해 부하를 균등하게 분배합니다.

Work Stealing 과정
┌─────────────┐     ┌─────────────┐
P1 (busy)   │     │ P2 (idle)[G1,G2,G3]  │ ──→ │ []│             │steal│             │
[G1,G2]    │ ──→ │ [G3]└─────────────┘     └─────────────┘

5. Async/Await 패턴 비교

5.1 JavaScript Promise와 Async/Await

// Promise 체이닝 vs Async/Await
// Promise 체이닝 방식
function fetchUserData(userId) {
  return fetch(`/api/users/${userId}`)
    .then(response => response.json())
    .then(user => fetch(`/api/posts?author=${user.id}`))
    .then(response => response.json())
    .then(posts => {
      return { user, posts };
    })
    .catch(error => {
      console.error('Error:', error);
      throw error;
    });
}

// Async/Await 방식 (훨씬 읽기 좋음)
async function fetchUserDataAsync(userId) {
  try {
    const userResponse = await fetch(`/api/users/${userId}`);
    const user = await userResponse.json();

    const postsResponse = await fetch(`/api/posts?author=${user.id}`);
    const posts = await postsResponse.json();

    return { user, posts };
  } catch (error) {
    console.error('Error:', error);
    throw error;
  }
}

// 병렬 실행
async function fetchMultipleUsers(userIds) {
  const promises = userIds.map(id => fetchUserDataAsync(id));
  return Promise.all(promises); // 모두 병렬 실행
}

// Promise.allSettled - 일부 실패해도 계속
async function fetchWithPartialFailure(urls) {
  const results = await Promise.allSettled(
    urls.map(url => fetch(url).then(r => r.json()))
  );

  const successes = results
    .filter(r => r.status === 'fulfilled')
    .map(r => r.value);
  const failures = results
    .filter(r => r.status === 'rejected')
    .map(r => r.reason);

  return { successes, failures };
}

5.2 Rust Async

use tokio;
use reqwest;
use futures::future::join_all;

#[derive(Debug)]
struct ApiResponse {
    url: String,
    status: u16,
    body_length: usize,
}

async fn fetch_url(client: &reqwest::Client, url: &str) -> Result<ApiResponse, reqwest::Error> {
    let response = client.get(url).send().await?;
    let status = response.status().as_u16();
    let body = response.text().await?;

    Ok(ApiResponse {
        url: url.to_string(),
        status,
        body_length: body.len(),
    })
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();
    let urls = vec![
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
    ];

    // 모든 요청을 동시에 실행
    let futures: Vec<_> = urls.iter()
        .map(|url| fetch_url(&client, url))
        .collect();

    let results = join_all(futures).await;

    for result in results {
        match result {
            Ok(resp) => println!("{}: status={}, length={}", resp.url, resp.status, resp.body_length),
            Err(e) => eprintln!("Error: {}", e),
        }
    }

    Ok(())
}

5.3 C# Task 기반 비동기

using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;

public class AsyncDemo
{
    private static readonly HttpClient _client = new HttpClient();

    public static async Task<string> FetchDataAsync(string url)
    {
        var response = await _client.GetAsync(url);
        response.EnsureSuccessStatusCode();
        return await response.Content.ReadAsStringAsync();
    }

    public static async Task Main()
    {
        var urls = new List<string>
        {
            "https://httpbin.org/get",
            "https://httpbin.org/ip",
            "https://httpbin.org/user-agent",
        };

        // 병렬 실행 with WhenAll
        var tasks = urls.Select(url => FetchDataAsync(url));
        var results = await Task.WhenAll(tasks);

        for (int i = 0; i < urls.Count; i++)
        {
            Console.WriteLine($"URL: {urls[i]}, Length: {results[i].Length}");
        }

        // 타임아웃 패턴
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
        try
        {
            var data = await _client.GetStringAsync(urls[0], cts.Token);
            Console.WriteLine($"Got data: {data.Length} chars");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Request timed out!");
        }
    }
}

5.4 Async/Await 언어 비교

특성JavaScriptPythonRustC#Kotlin
런타임V8 Event Loopasynciotokio/async-stdCLR Thread PoolDispatchers
Future 타입PromiseCoroutineFuture (lazy)TaskDeferred
취소AbortControllerTask.cancel()dropCancellationTokenJob.cancel()
스트림AsyncIteratorasync forStream traitIAsyncEnumerableFlow
에러 처리try/catchtry/exceptResulttry/catchtry/catch
병렬 실행Promise.allgatherjoin!Task.WhenAllawaitAll
Zero-costXXOXX

6. Event Loop 심층 분석

6.1 Node.js libuv Event Loop

Node.js Event Loop 단계
┌───────────────────────────────────┐
│         ┌──────────────┐          │
│    ┌────│   timers     │────┐     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │ pending I/O  │    │     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │  idle/prepare │    │     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │    poll       │◄───┘    │
│    │    └──────────────┘         │
│    │    ┌──────────────┐         │
│    │    │    check      │         │
│    │     (setImmediate)│         │
│    │    └──────────────┘         │
│    │    ┌──────────────┐         │
│    └────│close callbacks│─────────│
│         └──────────────┘         │
│                                   │
│  각 단계 사이: microtask queue    │
  (Promise, queueMicrotask)└───────────────────────────────────┘
// Event Loop 동작 순서 이해
console.log('1. 동기 코드');

setTimeout(() => console.log('2. setTimeout (timers)'), 0);

setImmediate(() => console.log('3. setImmediate (check)'));

Promise.resolve().then(() => console.log('4. Promise (microtask)'));

process.nextTick(() => console.log('5. nextTick (microtask - 최우선)'));

console.log('6. 동기 코드 끝');

// 출력 순서:
// 1. 동기 코드
// 6. 동기 코드 끝
// 5. nextTick (microtask - 최우선)
// 4. Promise (microtask)
// 2. setTimeout (timers)
// 3. setImmediate (check)

6.2 Python asyncio Event Loop

import asyncio
import time

async def cpu_bound_simulation(name: str, duration: float):
    """CPU-bound 작업 시뮬레이션"""
    print(f"[{name}] 시작")
    # await로 양보하지 않으면 이벤트 루프를 블로킹
    await asyncio.sleep(duration)
    print(f"[{name}] 완료 ({duration}초)")
    return f"{name}: done"

async def producer(queue: asyncio.Queue, n: int):
    """생산자: 큐에 아이템 추가"""
    for i in range(n):
        await asyncio.sleep(0.1)
        item = f"item-{i}"
        await queue.put(item)
        print(f"  Produced: {item}")
    await queue.put(None)  # 종료 신호

async def consumer(queue: asyncio.Queue, name: str):
    """소비자: 큐에서 아이템 소비"""
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # 다른 소비자도 종료
            break
        await asyncio.sleep(0.2)  # 처리 시뮬레이션
        print(f"  {name} consumed: {item}")
        queue.task_done()

async def main():
    # 1. 동시 실행
    start = time.perf_counter()
    results = await asyncio.gather(
        cpu_bound_simulation("TaskA", 1.0),
        cpu_bound_simulation("TaskB", 0.5),
        cpu_bound_simulation("TaskC", 0.8),
    )
    elapsed = time.perf_counter() - start
    print(f"총 시간: {elapsed:.2f}초")  # ~1초
    print(f"결과: {results}")

    # 2. 생산자-소비자 패턴
    print("\n--- 생산자-소비자 ---")
    queue = asyncio.Queue(maxsize=5)
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
    )

asyncio.run(main())

7. Actor 모델: 공유 상태 없는 동시성

7.1 Actor 모델 개념

Actor 모델
┌─────────────────────────────────────────────────┐
│                                                 │
│  ┌─────────┐  메시지   ┌─────────┐             │
│  │ Actor A │ ───────→  │ Actor B │             │
│  │ [State] │           │ [State] │             │
│  │ [Mailbox][Mailbox]│  └─────────┘           └────┬────┘             │
│       ↑                     │                   │
│       │        메시지        │ 새 Actor 생성     │
│       └─────────────────────┘                   │
│                              ↓                   │
│                        ┌─────────┐              │
│                        │ Actor C │              │
│                        │ [State] │              │
│                        └─────────┘              │
│                                                 │
│  규칙:1. 메시지로만 통신 (공유 상태 없음)2. 한 번에 하나의 메시지만 처리                  │
3. 다른 Actor를 생성할 수 있음                   │
4. 자신의 상태만 변경 가능                       │
└─────────────────────────────────────────────────┘

7.2 Erlang/Elixir Actor

%% Erlang - 카운터 Actor
-module(counter).
-export([start/0, increment/1, get/1, loop/1]).

start() ->
    spawn(fun() -> loop(0) end).

increment(Pid) ->
    Pid ! {increment, 1}.

get(Pid) ->
    Pid ! {get, self()},
    receive
        {count, Value} -> Value
    after 5000 ->
        timeout
    end.

loop(Count) ->
    receive
        {increment, N} ->
            loop(Count + N);
        {get, From} ->
            From ! {count, Count},
            loop(Count);
        stop ->
            ok
    end.

%% 사용 예:
%% Pid = counter:start().
%% counter:increment(Pid).
%% counter:increment(Pid).
%% counter:get(Pid). %% => 2

7.3 Akka (Scala) Actor

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors

// 메시지 정의
sealed trait Command
case class Deposit(amount: Double) extends Command
case class Withdraw(amount: Double) extends Command
case class GetBalance(replyTo: ActorRef[Balance]) extends Command
case class Balance(amount: Double)

// 은행 계좌 Actor
object BankAccount {
  def apply(balance: Double = 0.0): Behavior[Command] =
    Behaviors.receive { (context, message) =>
      message match {
        case Deposit(amount) =>
          val newBalance = balance + amount
          context.log.info(s"Deposited $amount, balance: $newBalance")
          BankAccount(newBalance)

        case Withdraw(amount) =>
          if (amount <= balance) {
            val newBalance = balance - amount
            context.log.info(s"Withdrew $amount, balance: $newBalance")
            BankAccount(newBalance)
          } else {
            context.log.warn(s"Insufficient funds: $balance < $amount")
            Behaviors.same
          }

        case GetBalance(replyTo) =>
          replyTo ! Balance(balance)
          Behaviors.same
      }
    }
}

// Supervision (장애 복구)
object BankSupervisor {
  def apply(): Behavior[Command] =
    Behaviors.supervise(BankAccount())
      .onFailure(SupervisorStrategy.restart)
}

7.4 Actor 모델의 장점과 단점

장점단점
공유 상태 없음 (경쟁 조건 방지)디버깅이 어려울 수 있음
위치 투명성 (분산 가능)메시지 순서 보장 제한적
장애 격리 (let it crash)메일박스 오버플로우 가능
확장성이 좋음동기 요청-응답 패턴이 복잡

8. CSP: Go의 채널 기반 동시성

8.1 채널 기본

package main

import (
    "fmt"
    "time"
)

func main() {
    // 버퍼 없는 채널 (동기)
    ch := make(chan string)

    go func() {
        time.Sleep(time.Second)
        ch <- "Hello from goroutine!"
    }()

    msg := <-ch // 메시지가 올 때까지 블로킹
    fmt.Println(msg)

    // 버퍼 있는 채널 (비동기)
    buffered := make(chan int, 3)
    buffered <- 1
    buffered <- 2
    buffered <- 3
    // buffered <- 4 // 버퍼가 꽉 차서 블로킹!

    fmt.Println(<-buffered) // 1
    fmt.Println(<-buffered) // 2
}

8.2 Fan-In / Fan-Out 패턴

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Fan-Out: 하나의 입력을 여러 워커에게 분배
func fanOut(input <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan int)
        channels[i] = ch
        go func(workerID int, out chan<- int) {
            defer close(out)
            for val := range input {
                // 처리 시뮬레이션
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                out <- val * val
                fmt.Printf("  Worker %d processed %d\n", workerID, val)
            }
        }(i, ch)
    }
    return channels
}

// Fan-In: 여러 채널의 결과를 하나로 합침
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                merged <- val
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // 입력 채널
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 20; i++ {
            input <- i
        }
    }()

    // Fan-Out: 4개 워커
    workerChannels := fanOut(input, 4)

    // Fan-In: 결과 합침
    results := fanIn(workerChannels...)

    // 결과 수집
    var sum int
    for result := range results {
        sum += result
    }
    fmt.Printf("Sum of squares: %d\n", sum)
}

8.3 Select 문

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from channel 1"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from channel 2"
    }()

    // select: 여러 채널 중 준비된 것을 선택
    for i := 0; i < 2; i++ {
        select {
        case msg := <-ch1:
            fmt.Println("Received", msg)
        case msg := <-ch2:
            fmt.Println("Received", msg)
        case <-time.After(3 * time.Second):
            fmt.Println("Timeout!")
        }
    }

    // 논블로킹 select
    ch := make(chan int, 1)
    select {
    case val := <-ch:
        fmt.Println("Got:", val)
    default:
        fmt.Println("Channel empty, moving on")
    }
}

9. 동기화 프리미티브

9.1 Mutex와 RWLock

package main

import (
    "fmt"
    "sync"
)

// Mutex: 상호 배제
type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.v[key]++
}

func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.v[key]
}

// RWMutex: 읽기는 동시에, 쓰기는 배타적
type SafeCache struct {
    mu    sync.RWMutex
    store map[string]string
}

func (c *SafeCache) Get(key string) (string, bool) {
    c.mu.RLock()         // 읽기 잠금 (여러 고루틴 동시 읽기 가능)
    defer c.mu.RUnlock()
    val, ok := c.store[key]
    return val, ok
}

func (c *SafeCache) Set(key, value string) {
    c.mu.Lock()          // 쓰기 잠금 (배타적)
    defer c.mu.Unlock()
    c.store[key] = value
}

func main() {
    counter := SafeCounter{v: make(map[string]int)}
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc("key")
        }()
    }
    wg.Wait()
    fmt.Println("Final count:", counter.Value("key")) // 정확히 1000
}

9.2 Semaphore

import asyncio

async def worker(sem: asyncio.Semaphore, task_id: int):
    async with sem:  # 최대 3개만 동시 실행
        print(f"Task {task_id}: 시작 (슬롯 획득)")
        await asyncio.sleep(1)
        print(f"Task {task_id}: 완료 (슬롯 반환)")

async def main():
    sem = asyncio.Semaphore(3)  # 동시 실행 3개로 제한
    tasks = [worker(sem, i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

9.3 Condition Variable

import threading
import time
import random

class BoundedBuffer:
    def __init__(self, capacity: int):
        self.buffer = []
        self.capacity = capacity
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)

    def produce(self, item):
        with self.not_full:
            while len(self.buffer) >= self.capacity:
                self.not_full.wait()  # 버퍼가 비워질 때까지 대기
            self.buffer.append(item)
            self.not_empty.notify()  # 소비자에게 알림

    def consume(self):
        with self.not_empty:
            while len(self.buffer) == 0:
                self.not_empty.wait()  # 아이템이 들어올 때까지 대기
            item = self.buffer.pop(0)
            self.not_full.notify()  # 생산자에게 알림
            return item

buf = BoundedBuffer(5)

def producer():
    for i in range(20):
        buf.produce(i)
        print(f"Produced: {i}")
        time.sleep(random.uniform(0.01, 0.1))

def consumer(name):
    for _ in range(10):
        item = buf.consume()
        print(f"{name} consumed: {item}")
        time.sleep(random.uniform(0.05, 0.15))

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("C1",))
t3 = threading.Thread(target=consumer, args=("C2",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

9.4 Atomic 연산

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class AtomicDemo {
    // AtomicLong - 잠금 없는 카운터
    private static final AtomicLong counter = new AtomicLong(0);

    // AtomicReference - 잠금 없는 참조 업데이트
    private static final AtomicReference<String> config =
        new AtomicReference<>("default");

    public static void main(String[] args) throws InterruptedException {
        // CAS (Compare-And-Swap) 루프
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10_000; j++) {
                    counter.incrementAndGet(); // 원자적 증가
                }
            });
            threads[i].start();
        }

        for (Thread t : threads) t.join();
        System.out.println("Counter: " + counter.get()); // 정확히 100,000

        // CAS 직접 사용
        boolean updated = config.compareAndSet("default", "production");
        System.out.println("Config updated: " + updated); // true
        System.out.println("Config value: " + config.get()); // production
    }
}

10. Lock-free 자료구조

10.1 CAS (Compare-And-Swap)

CAS는 lock-free 프로그래밍의 기초입니다. "현재 값이 예상 값과 같으면 새 값으로 교체"하는 원자적 연산입니다.

CAS 동작 원리
┌──────────────────────────────────────┐
CAS(address, expected, new_value)│                                      │
if *address == expected:*address = new_value             │
return true   // 성공            │
else:return false  // 실패 → 재시도   │
│                                      │
 (위 전체가 하드웨어 수준에서 원자적)└──────────────────────────────────────┘

10.2 ABA 문제

ABA 문제
Thread 1:A를 읽음  (일시 중단)
Thread 2: AB로 변경
Thread 3: BA로 변경
Thread 1: (재개) CAS(A, A, C) → 성공! (하지만 A는 원래의 A가 아님)

해결: 버전 카운터 추가
AtomicStampedReference (Java)
TaggedPointer (C/C++)

10.3 Lock-free 스택 (Treiber Stack)

import java.util.concurrent.atomic.AtomicReference;

public class LockFreeStack<T> {
    private static class Node<T> {
        final T value;
        Node<T> next;

        Node(T value) {
            this.value = value;
        }
    }

    private final AtomicReference<Node<T>> top = new AtomicReference<>(null);

    public void push(T value) {
        Node<T> newNode = new Node<>(value);
        Node<T> oldTop;
        do {
            oldTop = top.get();
            newNode.next = oldTop;
        } while (!top.compareAndSet(oldTop, newNode)); // CAS 루프
    }

    public T pop() {
        Node<T> oldTop;
        Node<T> newTop;
        do {
            oldTop = top.get();
            if (oldTop == null) return null;
            newTop = oldTop.next;
        } while (!top.compareAndSet(oldTop, newTop)); // CAS 루프
        return oldTop.value;
    }
}

11. 동시성 버그 패턴과 방지

11.1 데드락 (Deadlock)

데드락 조건 (4가지 모두 충족 시 발생)
1. 상호 배제 (Mutual Exclusion)
2. 점유 대기 (Hold and Wait)
3. 비선점 (No Preemption)
4. 순환 대기 (Circular Wait)

┌──────────┐          ┌──────────┐
Thread A │──Lock1──→│ Thread B│          │←──Lock2──│          │
 (Lock2 (Lock1│  대기중)  │          │  대기중)└──────────┘          └──────────┘
import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

# 데드락 발생 코드
def thread_1():
    with lock_a:
        print("Thread 1: Lock A 획득")
        time.sleep(0.1)
        with lock_b:  # Lock B 대기 → 데드락!
            print("Thread 1: Lock B 획득")

def thread_2():
    with lock_b:
        print("Thread 2: Lock B 획득")
        time.sleep(0.1)
        with lock_a:  # Lock A 대기 → 데드락!
            print("Thread 2: Lock A 획득")

# 해결: 잠금 순서 통일
def thread_1_fixed():
    with lock_a:     # 항상 A → B 순서
        print("Thread 1: Lock A 획득")
        time.sleep(0.1)
        with lock_b:
            print("Thread 1: Lock B 획득")

def thread_2_fixed():
    with lock_a:     # 항상 A → B 순서 (순환 대기 방지)
        print("Thread 2: Lock A 획득")
        time.sleep(0.1)
        with lock_b:
            print("Thread 2: Lock B 획득")

11.2 레이스 컨디션 (Race Condition)

package main

import (
    "fmt"
    "sync"
)

func main() {
    // 레이스 컨디션 예시
    counter := 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // DATA RACE! 읽기-수정-쓰기가 원자적이지 않음
        }()
    }
    wg.Wait()
    fmt.Println("Counter (unsafe):", counter) // 1000 미만일 수 있음

    // 해결 1: Mutex
    counter = 0
    var mu sync.Mutex
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counter++
            mu.Unlock()
        }()
    }
    wg.Wait()
    fmt.Println("Counter (mutex):", counter) // 정확히 1000

    // 해결 2: 채널
    counter = 0
    ch := make(chan int, 1000)
    for i := 0; i < 1000; i++ {
        go func() {
            ch <- 1
        }()
    }
    for i := 0; i < 1000; i++ {
        counter += <-ch
    }
    fmt.Println("Counter (channel):", counter) // 정확히 1000
}

11.3 기아 상태와 우선순위 역전

기아 상태 (Starvation)
- 특정 스레드가 자원에 접근하지 못하고 무한 대기
- 해결: 공정한 잠금 (Fair Lock), 우선순위 에이징

우선순위 역전 (Priority Inversion)
┌─────────────────────────────────────┐
High Priority Task   ← 블로킹됨     │
 (Lock 대기)Medium Priority Task ← 실행 중      │
 (선점)Low Priority TaskLock 보유    │
│                                     │
│ 해결: Priority Inheritance Protocol│ 낮은 우선순위 태스크가 Lock 보유 시  │
│ 높은 우선순위를 일시적으로 상속      │
└─────────────────────────────────────┘

12. 구조적 동시성 (Structured Concurrency)

12.1 개념

구조적 동시성은 동시 작업의 수명을 코드 블록(스코프)에 묶는 패턴입니다. 스코프가 끝나면 모든 자식 작업이 완료(또는 취소)됨을 보장합니다.

비구조적 동시성            구조적 동시성
┌────────────────┐      ┌────────────────┐
│ start task A   │      │ scope {│ start task B   │      │   task A... (누출?)    │      │   task B│ forget task A? │      │ } // A,B 완료  │
└────────────────┘      └────────────────┘

12.2 Python TaskGroup (3.11+)

import asyncio

async def fetch(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    if name == "failing":
        raise ValueError(f"{name} failed!")
    return f"{name}: done"

async def main():
    # TaskGroup: 구조적 동시성
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(fetch("api-1", 0.5))
            task2 = tg.create_task(fetch("api-2", 1.0))
            task3 = tg.create_task(fetch("api-3", 0.8))
        # 여기 도달 시 모든 태스크 완료 보장
        print(task1.result(), task2.result(), task3.result())
    except* ValueError as eg:
        # ExceptionGroup으로 에러 처리
        for exc in eg.exceptions:
            print(f"Error: {exc}")

asyncio.run(main())

12.3 Java 21 Structured Concurrency

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class StructuredConcurrencyDemo {

    record User(String name) {}
    record Order(String id) {}

    static User fetchUser(String userId) throws Exception {
        Thread.sleep(500);
        return new User("Alice");
    }

    static Order fetchOrder(String orderId) throws Exception {
        Thread.sleep(800);
        return new Order("ORD-123");
    }

    public static void main(String[] args) throws Exception {
        // ShutdownOnFailure: 하나라도 실패하면 나머지 취소
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Future<User> userFuture = scope.fork(() -> fetchUser("user-1"));
            Future<Order> orderFuture = scope.fork(() -> fetchOrder("order-1"));

            scope.join();           // 모든 태스크 완료 대기
            scope.throwIfFailed();  // 실패 시 예외 전파

            // 여기 도달 시 모든 결과 사용 가능
            User user = userFuture.resultNow();
            Order order = orderFuture.resultNow();
            System.out.printf("User: %s, Order: %s%n", user.name(), order.id());
        }
    }
}

13. 언어별 동시성 모델 비교

차원GoRustJava 21PythonKotlinJavaScriptErlang/ElixirC#
모델CSPOwnership + asyncVirtual ThreadasyncioCoroutineEvent LoopActorTask
경량 단위goroutineFutureVirtual ThreadcoroutinecoroutinePromiseprocessTask
스케줄링M:N (GMP)런타임 의존M:N1:1 (싱글)M:N1:1 (싱글)선점적 M:NM:N
공유 상태channelownershipsynchronizedasyncio.LockMutex없음 (싱글)없음 (메시지)lock
에러 전파panic/recoverResult예외예외예외rejectlink/monitor예외
취소contextdropinterruptcancelJob.cancelAbortControllerexitCancelToken
구조적 동시성errgroup제한적StructuredTaskScopeTaskGroupcoroutineScope없음supervisor없음
CPU-boundgoroutinethread/rayonthreadmultiprocessingDispatchers.DefaultWorker ThreadprocessParallel.ForEach
메모리 안전GC컴파일 타임GCGCGCGCGCGC
데드락 방지채널 설계컴파일 타임주의 필요주의 필요주의 필요해당 없음메시지 기반주의 필요
성능높음매우 높음높음보통높음I/O만높음 (BEAM)높음
학습 곡선낮음높음보통낮음보통낮음보통보통

14. 실전 패턴: Rate Limiter

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Token Bucket Rate Limiter
type RateLimiter struct {
    tokens     chan struct{}
    refillRate time.Duration
    ctx        context.Context
    cancel     context.CancelFunc
}

func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
    ctx, cancel := context.WithCancel(context.Background())
    rl := &RateLimiter{
        tokens:     make(chan struct{}, maxTokens),
        refillRate: refillRate,
        ctx:        ctx,
        cancel:     cancel,
    }

    // 토큰 채우기
    for i := 0; i < maxTokens; i++ {
        rl.tokens <- struct{}{}
    }

    // 주기적 리필
    go func() {
        ticker := time.NewTicker(refillRate)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case rl.tokens <- struct{}{}:
                default: // 이미 꽉 참
                }
            case <-ctx.Done():
                return
            }
        }
    }()

    return rl
}

func (rl *RateLimiter) Acquire(ctx context.Context) error {
    select {
    case <-rl.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (rl *RateLimiter) Close() {
    rl.cancel()
}

func main() {
    limiter := NewRateLimiter(5, 200*time.Millisecond)
    defer limiter.Close()

    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
            defer cancel()

            if err := limiter.Acquire(ctx); err != nil {
                fmt.Printf("Request %d: timed out\n", id)
                return
            }
            fmt.Printf("Request %d: processed at %s\n", id,
                time.Now().Format("15:04:05.000"))
        }(i)
    }
    wg.Wait()
}

15. 성능 최적화 팁

15.1 적절한 동시성 모델 선택

워크로드 유형에 따른 선택

I/O-bound (네트워크, 디스크):
  → async/await, coroutine, goroutine
  → 이벤트 루프 기반이 효율적
  → 스레드 풀은 과도한 컨텍스트 스위칭 유발

CPU-bound (계산, 변환):
OS 스레드 (Python: multiprocessing)
Rust: rayon 크레이트
Go: goroutine (자동 병렬화)

혼합:
  → 별도 스레드 풀/디스패처 분리
Kotlin: Dispatchers.IO vs Dispatchers.Default
Python: asyncio + ProcessPoolExecutor

15.2 컨텍스트 스위칭 최소화

import asyncio
import concurrent.futures
import time

# CPU-bound 작업을 별도 프로세스에서 실행
def cpu_heavy_task(n: int) -> int:
    """소수 계산 (CPU-bound)"""
    count = 0
    for i in range(2, n):
        if all(i % j != 0 for j in range(2, int(i**0.5) + 1)):
            count += 1
    return count

async def main():
    loop = asyncio.get_event_loop()

    # ProcessPoolExecutor로 CPU-bound 작업 분리
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
        start = time.perf_counter()
        results = await asyncio.gather(
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
        )
        elapsed = time.perf_counter() - start
        print(f"4 tasks completed in {elapsed:.2f}s: {results}")

asyncio.run(main())

16. 퀴즈

Q1. 동시성과 병렬성의 핵심 차이는?

동시성은 프로그램의 구조적 특성으로, 여러 작업을 논리적으로 동시에 다룰 수 있는 설계입니다. 병렬성은 실행의 특성으로, 여러 작업이 물리적으로 동시에 실행되는 것입니다. 동시성은 싱글 코어에서도 가능하지만, 병렬성은 멀티코어가 필요합니다.

Q2. Goroutine의 GMP 모델에서 G, M, P는 각각 무엇인가?
  • G (Goroutine): 실행할 작업 단위. 경량 스레드.
  • M (Machine): OS 스레드. 실제 실행을 담당.
  • P (Processor): 논리적 프로세서. G를 M에 매핑하는 스케줄러. GOMAXPROCS 값으로 수를 결정.

P는 로컬 run queue를 가지며, work stealing으로 부하를 균등 분배합니다.

Q3. 데드락이 발생하려면 어떤 4가지 조건이 모두 충족되어야 하는가?
  1. 상호 배제 (Mutual Exclusion): 자원을 한 번에 하나의 프로세스만 사용
  2. 점유 대기 (Hold and Wait): 자원을 점유한 채 다른 자원을 대기
  3. 비선점 (No Preemption): 이미 할당된 자원을 강제로 빼앗을 수 없음
  4. 순환 대기 (Circular Wait): 프로세스 간 순환 형태의 대기 관계

하나라도 깨면 데드락이 발생하지 않습니다. 가장 일반적인 해결책은 잠금 순서를 통일하여 순환 대기를 방지하는 것입니다.

Q4. Actor 모델과 CSP의 핵심 차이는?
  • Actor 모델: 비동기 메시지 전달. 각 Actor는 고유한 메일박스를 가짐. 수신자를 직접 지정. Erlang, Akka.
  • CSP: 동기 채널 통신. 채널이 독립적인 엔티티. 송신자와 수신자가 서로를 모름. Go의 goroutine + channel.

Actor는 "누구에게 보낼지"를 알고, CSP는 "어느 채널에 보낼지"를 알 뿐입니다.

Q5. 구조적 동시성이 해결하는 문제는?

구조적 동시성은 다음 문제를 해결합니다:

  1. 작업 누출 (Task Leaking): fire-and-forget으로 시작한 작업이 잊혀지는 문제
  2. 에러 전파: 자식 작업의 에러가 부모에게 전달되지 않는 문제
  3. 취소 전파: 부모가 취소되어도 자식이 계속 실행되는 문제
  4. 수명 관리: 스코프가 끝났는데 작업이 계속 실행되는 문제

Java 21의 StructuredTaskScope, Python의 TaskGroup, Kotlin의 coroutineScope가 대표적입니다.


17. 참고 자료

  1. Rob Pike - "Concurrency is not Parallelism" (Go Blog)
  2. Java 21 Virtual Threads - JEP 444
  3. Java 21 Structured Concurrency - JEP 453
  4. Python asyncio 공식 문서
  5. Go 공식 문서 - Effective Go (Concurrency)
  6. Kotlin Coroutines 공식 가이드
  7. Rust async-book (The Async Book)
  8. Akka Documentation - Actor Model
  9. Erlang/OTP - Processes
  10. "The Art of Multiprocessor Programming" - Maurice Herlihy, Nir Shavit
  11. "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 8)
  12. "Concurrency in Go" - Katherine Cox-Buday (O'Reilly)
  13. tokio.rs - Rust 비동기 런타임
  14. libuv - Node.js Event Loop 구현체

현재 단락 (1/1131)

"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things...

작성 글자: 0원문 글자: 27,392작성 단락: 0/1131