Skip to content

필사 모드: 동시성 & 병렬 프로그래밍 완전 가이드 2025: Async/Await, Thread, Goroutine, Actor 모델

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

목차

1. 동시성 vs 병렬성: Rob Pike의 구분

"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." -- Rob Pike

이 한 문장이 동시성과 병렬성의 핵심 차이를 요약합니다.

1.1 개념적 구분

**동시성(Concurrency)** 은 프로그램의 **구조적 특성**입니다. 여러 작업을 논리적으로 동시에 다룰 수 있도록 프로그램을 설계하는 것입니다. 실제로 동시에 실행되는지는 중요하지 않습니다.

**병렬성(Parallelism)** 은 **실행의 특성**입니다. 여러 작업이 물리적으로 동시에 실행되는 것을 의미합니다. 멀티코어 CPU에서 여러 스레드가 동시에 동작하는 것이 대표적입니다.

동시성 (Concurrency) - 구조

┌─────────────────────────────────────────┐

│ Task A: ██░░██░░░░██░░ │

│ Task B: ░░██░░██░░░░██ │

│ Task C: ░░░░░░░░██░░░░██ │

│ ─────────────────→ 시간 │

│ (단일 코어에서 번갈아 실행) │

└─────────────────────────────────────────┘

병렬성 (Parallelism) - 실행

┌─────────────────────────────────────────┐

│ Core 1: ████████████████ │

│ Core 2: ████████████████ │

│ Core 3: ████████████████ │

│ ─────────────────→ 시간 │

│ (다중 코어에서 동시에 실행) │

└─────────────────────────────────────────┘

1.2 실생활 비유

| 상황 | 동시성 | 병렬성 |

|------|--------|--------|

| 요리사 1명이 3개 요리를 번갈아 만듦 | O | X |

| 요리사 3명이 각각 1개씩 동시에 만듦 | O | O |

| 요리사 1명이 1개 요리만 만듦 | X | X |

1.3 왜 구분이 중요한가

동시성 없이 병렬성만 추구하면 복잡도가 폭발합니다. 올바른 동시성 구조를 먼저 설계한 뒤, 병렬 실행으로 성능을 끌어올리는 것이 정석입니다.

동시성 구조 (asyncio) - 단일 스레드에서도 효율적

async def fetch_data(url: str) -> str:

print(f"Fetching {url}...")

await asyncio.sleep(1) # I/O 대기 시뮬레이션

return f"Data from {url}"

async def main():

동시성: 3개 요청을 구조적으로 동시에 다룸

results = await asyncio.gather(

fetch_data("https://api.example.com/users"),

fetch_data("https://api.example.com/posts"),

fetch_data("https://api.example.com/comments"),

)

for r in results:

print(r)

asyncio.run(main())

총 ~1초 (3초가 아닌) - 동시성의 힘

2. 스레딩 모델: OS 스레드부터 가상 스레드까지

2.1 OS 스레드 (Kernel Thread)

운영체제가 직접 관리하는 스레드입니다. 생성 비용이 높고 (약 1MB 스택), 컨텍스트 스위칭 오버헤드가 존재합니다.

// Java - OS 스레드

public class ThreadExample {

public static void main(String[] args) throws InterruptedException {

Thread t1 = new Thread(() -> {

for (int i = 0; i < 5; i++) {

System.out.println("Thread 1: " + i);

try { Thread.sleep(100); } catch (InterruptedException e) { break; }

}

});

Thread t2 = new Thread(() -> {

for (int i = 0; i < 5; i++) {

System.out.println("Thread 2: " + i);

try { Thread.sleep(100); } catch (InterruptedException e) { break; }

}

});

t1.start();

t2.start();

t1.join();

t2.join();

System.out.println("Both threads completed");

}

}

2.2 유저 스레드 (User Thread / Green Thread)

런타임이 관리하는 경량 스레드입니다. OS 스레드 위에 다수의 유저 스레드를 매핑합니다.

스레드 모델 비교

┌───────────────────────────────────────────────┐

│ 1:1 (OS Thread) N:1 (Green Thread) │

│ ┌──┐ ┌──┐ ┌──┐ ┌──┐┌──┐┌──┐┌──┐┌──┐ │

│ │UT│ │UT│ │UT│ │GT││GT││GT││GT││GT│ │

│ └┬─┘ └┬─┘ └┬─┘ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘ │

│ │ │ │ └──┬──┘ │ └──┬──┘ │

│ ┌┴─┐ ┌┴─┐ ┌┴─┐ ┌┴─┐ │ │ │

│ │KT│ │KT│ │KT│ │KT│ │ │ │

│ └──┘ └──┘ └──┘ └──┘ │ │ │

│ │ │ │

│ M:N (Hybrid) │

│ ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐ │

│ │GT││GT││GT││GT││GT││GT│ │

│ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘└┬─┘ │

│ └─┬─┘ └──┬──┘ └──┬──┘ │

│ ┌┴─┐ ┌┴─┐ ┌┴─┐ │

│ │KT│ │KT│ │KT│ │

│ └──┘ └──┘ └──┘ │

└───────────────────────────────────────────────┘

2.3 Java 21 가상 스레드 (Virtual Thread)

Java 21에서 정식 도입된 가상 스레드는 M:N 모델을 구현합니다. 수백만 개의 가상 스레드를 생성할 수 있습니다.

// Java 21 Virtual Threads

public class VirtualThreadDemo {

public static void main(String[] args) throws Exception {

// 100만 개의 가상 스레드 생성 가능!

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {

for (int i = 0; i < 1_000_000; i++) {

final int taskId = i;

executor.submit(() -> {

try {

Thread.sleep(Duration.ofSeconds(1));

if (taskId % 100_000 == 0) {

System.out.println("Task " + taskId + " done on " +

Thread.currentThread());

}

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

});

}

}

System.out.println("All tasks completed");

}

}

2.4 스레드 모델 비교

| 특성 | OS 스레드 | Green Thread | Virtual Thread (Java 21) |

|------|----------|-------------|------------------------|

| 스택 크기 | ~1MB | ~KB | ~수 KB (동적) |

| 생성 비용 | 높음 (syscall) | 낮음 | 매우 낮음 |

| 동시 개수 | 수천 | 수백만 | 수백만 |

| 선점 | OS 선점 | 협력적 | 협력적 (I/O 시 양보) |

| I/O 블로킹 | 스레드 블로킹 | 전체 블로킹 위험 | 자동 언마운트 |

3. 코루틴: 경량 동시성의 핵심

3.1 Stackful vs Stackless 코루틴

Stackful Coroutine (예: Lua, Go의 goroutine)

┌──────────────────────────────┐

│ 자체 스택을 가짐 │

│ 스택 어디서든 일시중단 가능 │

│ 메모리 사용 높음 │

│ 중첩 함수 호출 중에도 양보 가능 │

└──────────────────────────────┘

Stackless Coroutine (예: Python, Kotlin, Rust)

┌──────────────────────────────┐

│ 상태를 힙에 저장 │

│ 최상위에서만 일시중단 가능 │

│ 메모리 사용 낮음 │

│ 컴파일러가 상태 머신으로 변환 │

└──────────────────────────────┘

3.2 Python asyncio 코루틴

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:

"""비동기 HTTP 요청"""

async with session.get(url) as response:

return {

"url": url,

"status": response.status,

"length": len(await response.text()),

}

async def main():

urls = [

"https://httpbin.org/delay/1",

"https://httpbin.org/delay/1",

"https://httpbin.org/delay/1",

"https://httpbin.org/delay/1",

"https://httpbin.org/delay/1",

]

start = time.perf_counter()

async with aiohttp.ClientSession() as session:

5개 요청을 동시에 실행

tasks = [fetch_url(session, url) for url in urls]

results = await asyncio.gather(*tasks)

elapsed = time.perf_counter() - start

print(f"5개 요청 완료: {elapsed:.2f}초") # ~1초 (5초가 아닌)

for r in results:

print(f" {r['url']}: status={r['status']}, length={r['length']}")

asyncio.run(main())

3.3 Kotlin 코루틴

suspend fun fetchData(id: Int): String {

delay(1000) // 비동기 대기 (스레드 블로킹 없음)

return "Result-$id"

}

fun main() = runBlocking {

val time = measureTimeMillis {

// 구조적 동시성: coroutineScope가 모든 자식 코루틴 관리

coroutineScope {

val results = (1..10).map { id ->

async(Dispatchers.IO) {

fetchData(id)

}

}

results.awaitAll().forEach { println(it) }

}

}

println("Completed in ${time}ms") // ~1000ms

}

4. Goroutine과 Go의 동시성 모델

4.1 Goroutine 내부 구조

Go 런타임은 M:N 스케줄링을 사용합니다. G(Goroutine), M(Machine/OS Thread), P(Processor) 세 가지 개념이 핵심입니다.

Go 런타임 스케줄러 (GMP 모델)

┌─────────────────────────────────────────────┐

│ │

│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │

│ │ G1 │ │ G2 │ │ G3 │ │ G4 │ ... │

│ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │

│ │ │ │ │ │

│ ┌──┴────────┴──┐ ┌──┴────────┴──┐ │

│ │ P (Local │ │ P (Local │ │

│ │ Queue) │ │ Queue) │ │

│ └──────┬───────┘ └──────┬───────┘ │

│ │ │ │

│ ┌──────┴───────┐ ┌──────┴───────┐ │

│ │ M (OS Thread)│ │ M (OS Thread)│ │

│ └──────────────┘ └──────────────┘ │

│ │

│ Global Run Queue: [G5, G6, G7, ...] │

└─────────────────────────────────────────────┘

GOMAXPROCS = P의 수 (기본값: CPU 코어 수)

4.2 Goroutine 기본 사용

package main

"fmt"

"sync"

"time"

)

func worker(id int, wg *sync.WaitGroup) {

defer wg.Done()

fmt.Printf("Worker %d starting\n", id)

time.Sleep(time.Second)

fmt.Printf("Worker %d done\n", id)

}

func main() {

var wg sync.WaitGroup

for i := 1; i <= 5; i++ {

wg.Add(1)

go worker(i, &wg) // goroutine 생성 - 약 2KB 스택

}

wg.Wait()

fmt.Println("All workers completed")

}

4.3 Work Stealing

P의 로컬 큐가 비면 다른 P의 큐에서 goroutine을 훔쳐옵니다. 이를 통해 부하를 균등하게 분배합니다.

Work Stealing 과정

┌─────────────┐ ┌─────────────┐

│ P1 (busy) │ │ P2 (idle) │

│ [G1,G2,G3] │ ──→ │ [] │

│ │steal│ │

│ [G1,G2] │ ──→ │ [G3] │

└─────────────┘ └─────────────┘

5. Async/Await 패턴 비교

5.1 JavaScript Promise와 Async/Await

// Promise 체이닝 vs Async/Await

// Promise 체이닝 방식

function fetchUserData(userId) {

return fetch(`/api/users/${userId}`)

.then(response => response.json())

.then(user => fetch(`/api/posts?author=${user.id}`))

.then(response => response.json())

.then(posts => {

return { user, posts };

})

.catch(error => {

console.error('Error:', error);

throw error;

});

}

// Async/Await 방식 (훨씬 읽기 좋음)

async function fetchUserDataAsync(userId) {

try {

const userResponse = await fetch(`/api/users/${userId}`);

const user = await userResponse.json();

const postsResponse = await fetch(`/api/posts?author=${user.id}`);

const posts = await postsResponse.json();

return { user, posts };

} catch (error) {

console.error('Error:', error);

throw error;

}

}

// 병렬 실행

async function fetchMultipleUsers(userIds) {

const promises = userIds.map(id => fetchUserDataAsync(id));

return Promise.all(promises); // 모두 병렬 실행

}

// Promise.allSettled - 일부 실패해도 계속

async function fetchWithPartialFailure(urls) {

const results = await Promise.allSettled(

urls.map(url => fetch(url).then(r => r.json()))

);

const successes = results

.filter(r => r.status === 'fulfilled')

.map(r => r.value);

const failures = results

.filter(r => r.status === 'rejected')

.map(r => r.reason);

return { successes, failures };

}

5.2 Rust Async

use tokio;

use reqwest;

use futures::future::join_all;

#[derive(Debug)]

struct ApiResponse {

url: String,

status: u16,

body_length: usize,

}

async fn fetch_url(client: &reqwest::Client, url: &str) -> Result<ApiResponse, reqwest::Error> {

let response = client.get(url).send().await?;

let status = response.status().as_u16();

let body = response.text().await?;

Ok(ApiResponse {

url: url.to_string(),

status,

body_length: body.len(),

})

}

#[tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error>> {

let client = reqwest::Client::new();

let urls = vec![

"https://httpbin.org/get",

"https://httpbin.org/ip",

"https://httpbin.org/user-agent",

];

// 모든 요청을 동시에 실행

let futures: Vec<_> = urls.iter()

.map(|url| fetch_url(&client, url))

.collect();

let results = join_all(futures).await;

for result in results {

match result {

Ok(resp) => println!("{}: status={}, length={}", resp.url, resp.status, resp.body_length),

Err(e) => eprintln!("Error: {}", e),

}

}

Ok(())

}

5.3 C# Task 기반 비동기

using System;

using System.Net.Http;

using System.Threading.Tasks;

using System.Collections.Generic;

using System.Linq;

public class AsyncDemo

{

private static readonly HttpClient _client = new HttpClient();

public static async Task<string> FetchDataAsync(string url)

{

var response = await _client.GetAsync(url);

response.EnsureSuccessStatusCode();

return await response.Content.ReadAsStringAsync();

}

public static async Task Main()

{

var urls = new List<string>

{

"https://httpbin.org/get",

"https://httpbin.org/ip",

"https://httpbin.org/user-agent",

};

// 병렬 실행 with WhenAll

var tasks = urls.Select(url => FetchDataAsync(url));

var results = await Task.WhenAll(tasks);

for (int i = 0; i < urls.Count; i++)

{

Console.WriteLine($"URL: {urls[i]}, Length: {results[i].Length}");

}

// 타임아웃 패턴

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

try

{

var data = await _client.GetStringAsync(urls[0], cts.Token);

Console.WriteLine($"Got data: {data.Length} chars");

}

catch (OperationCanceledException)

{

Console.WriteLine("Request timed out!");

}

}

}

5.4 Async/Await 언어 비교

| 특성 | JavaScript | Python | Rust | C# | Kotlin |

|------|-----------|--------|------|----|--------|

| 런타임 | V8 Event Loop | asyncio | tokio/async-std | CLR Thread Pool | Dispatchers |

| Future 타입 | Promise | Coroutine | Future (lazy) | Task | Deferred |

| 취소 | AbortController | Task.cancel() | drop | CancellationToken | Job.cancel() |

| 스트림 | AsyncIterator | async for | Stream trait | IAsyncEnumerable | Flow |

| 에러 처리 | try/catch | try/except | Result | try/catch | try/catch |

| 병렬 실행 | Promise.all | gather | join! | Task.WhenAll | awaitAll |

| Zero-cost | X | X | O | X | X |

6. Event Loop 심층 분석

6.1 Node.js libuv Event Loop

Node.js Event Loop 단계

┌───────────────────────────────────┐

│ ┌──────────────┐ │

│ ┌────│ timers │────┐ │

│ │ └──────────────┘ │ │

│ │ ┌──────────────┐ │ │

│ │ │ pending I/O │ │ │

│ │ └──────────────┘ │ │

│ │ ┌──────────────┐ │ │

│ │ │ idle/prepare │ │ │

│ │ └──────────────┘ │ │

│ │ ┌──────────────┐ │ │

│ │ │ poll │◄───┘ │

│ │ └──────────────┘ │

│ │ ┌──────────────┐ │

│ │ │ check │ │

│ │ │ (setImmediate)│ │

│ │ └──────────────┘ │

│ │ ┌──────────────┐ │

│ └────│close callbacks│─────────│

│ └──────────────┘ │

│ │

│ 각 단계 사이: microtask queue │

│ (Promise, queueMicrotask) │

└───────────────────────────────────┘

// Event Loop 동작 순서 이해

console.log('1. 동기 코드');

setTimeout(() => console.log('2. setTimeout (timers)'), 0);

setImmediate(() => console.log('3. setImmediate (check)'));

Promise.resolve().then(() => console.log('4. Promise (microtask)'));

process.nextTick(() => console.log('5. nextTick (microtask - 최우선)'));

console.log('6. 동기 코드 끝');

// 출력 순서:

// 1. 동기 코드

// 6. 동기 코드 끝

// 5. nextTick (microtask - 최우선)

// 4. Promise (microtask)

// 2. setTimeout (timers)

// 3. setImmediate (check)

6.2 Python asyncio Event Loop

async def cpu_bound_simulation(name: str, duration: float):

"""CPU-bound 작업 시뮬레이션"""

print(f"[{name}] 시작")

await로 양보하지 않으면 이벤트 루프를 블로킹

await asyncio.sleep(duration)

print(f"[{name}] 완료 ({duration}초)")

return f"{name}: done"

async def producer(queue: asyncio.Queue, n: int):

"""생산자: 큐에 아이템 추가"""

for i in range(n):

await asyncio.sleep(0.1)

item = f"item-{i}"

await queue.put(item)

print(f" Produced: {item}")

await queue.put(None) # 종료 신호

async def consumer(queue: asyncio.Queue, name: str):

"""소비자: 큐에서 아이템 소비"""

while True:

item = await queue.get()

if item is None:

await queue.put(None) # 다른 소비자도 종료

break

await asyncio.sleep(0.2) # 처리 시뮬레이션

print(f" {name} consumed: {item}")

queue.task_done()

async def main():

1. 동시 실행

start = time.perf_counter()

results = await asyncio.gather(

cpu_bound_simulation("TaskA", 1.0),

cpu_bound_simulation("TaskB", 0.5),

cpu_bound_simulation("TaskC", 0.8),

)

elapsed = time.perf_counter() - start

print(f"총 시간: {elapsed:.2f}초") # ~1초

print(f"결과: {results}")

2. 생산자-소비자 패턴

print("\n--- 생산자-소비자 ---")

queue = asyncio.Queue(maxsize=5)

await asyncio.gather(

producer(queue, 10),

consumer(queue, "Consumer-1"),

consumer(queue, "Consumer-2"),

)

asyncio.run(main())

7. Actor 모델: 공유 상태 없는 동시성

7.1 Actor 모델 개념

Actor 모델

┌─────────────────────────────────────────────────┐

│ │

│ ┌─────────┐ 메시지 ┌─────────┐ │

│ │ Actor A │ ───────→ │ Actor B │ │

│ │ [State] │ │ [State] │ │

│ │ [Mailbox] │ [Mailbox] │

│ └─────────┘ └────┬────┘ │

│ ↑ │ │

│ │ 메시지 │ 새 Actor 생성 │

│ └─────────────────────┘ │

│ ↓ │

│ ┌─────────┐ │

│ │ Actor C │ │

│ │ [State] │ │

│ └─────────┘ │

│ │

│ 규칙: │

│ 1. 메시지로만 통신 (공유 상태 없음) │

│ 2. 한 번에 하나의 메시지만 처리 │

│ 3. 다른 Actor를 생성할 수 있음 │

│ 4. 자신의 상태만 변경 가능 │

└─────────────────────────────────────────────────┘

7.2 Erlang/Elixir Actor

%% Erlang - 카운터 Actor

-module(counter).

-export([start/0, increment/1, get/1, loop/1]).

start() ->

spawn(fun() -> loop(0) end).

increment(Pid) ->

Pid ! {increment, 1}.

get(Pid) ->

Pid ! {get, self()},

receive

{count, Value} -> Value

after 5000 ->

timeout

end.

loop(Count) ->

receive

{increment, N} ->

loop(Count + N);

{get, From} ->

From ! {count, Count},

loop(Count);

stop ->

ok

end.

%% 사용 예:

%% Pid = counter:start().

%% counter:increment(Pid).

%% counter:increment(Pid).

%% counter:get(Pid). %% => 2

7.3 Akka (Scala) Actor

// 메시지 정의

sealed trait Command

case class Deposit(amount: Double) extends Command

case class Withdraw(amount: Double) extends Command

case class GetBalance(replyTo: ActorRef[Balance]) extends Command

case class Balance(amount: Double)

// 은행 계좌 Actor

object BankAccount {

def apply(balance: Double = 0.0): Behavior[Command] =

Behaviors.receive { (context, message) =>

message match {

case Deposit(amount) =>

val newBalance = balance + amount

context.log.info(s"Deposited $amount, balance: $newBalance")

BankAccount(newBalance)

case Withdraw(amount) =>

if (amount <= balance) {

val newBalance = balance - amount

context.log.info(s"Withdrew $amount, balance: $newBalance")

BankAccount(newBalance)

} else {

context.log.warn(s"Insufficient funds: $balance < $amount")

Behaviors.same

}

case GetBalance(replyTo) =>

replyTo ! Balance(balance)

Behaviors.same

}

}

}

// Supervision (장애 복구)

object BankSupervisor {

def apply(): Behavior[Command] =

Behaviors.supervise(BankAccount())

.onFailure(SupervisorStrategy.restart)

}

7.4 Actor 모델의 장점과 단점

| 장점 | 단점 |

|------|------|

| 공유 상태 없음 (경쟁 조건 방지) | 디버깅이 어려울 수 있음 |

| 위치 투명성 (분산 가능) | 메시지 순서 보장 제한적 |

| 장애 격리 (let it crash) | 메일박스 오버플로우 가능 |

| 확장성이 좋음 | 동기 요청-응답 패턴이 복잡 |

8. CSP: Go의 채널 기반 동시성

8.1 채널 기본

package main

"fmt"

"time"

)

func main() {

// 버퍼 없는 채널 (동기)

ch := make(chan string)

go func() {

time.Sleep(time.Second)

ch <- "Hello from goroutine!"

}()

msg := <-ch // 메시지가 올 때까지 블로킹

fmt.Println(msg)

// 버퍼 있는 채널 (비동기)

buffered := make(chan int, 3)

buffered <- 1

buffered <- 2

buffered <- 3

// buffered <- 4 // 버퍼가 꽉 차서 블로킹!

fmt.Println(<-buffered) // 1

fmt.Println(<-buffered) // 2

}

8.2 Fan-In / Fan-Out 패턴

package main

"fmt"

"math/rand"

"sync"

"time"

)

// Fan-Out: 하나의 입력을 여러 워커에게 분배

func fanOut(input <-chan int, workers int) []<-chan int {

channels := make([]<-chan int, workers)

for i := 0; i < workers; i++ {

ch := make(chan int)

channels[i] = ch

go func(workerID int, out chan<- int) {

defer close(out)

for val := range input {

// 처리 시뮬레이션

time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

out <- val * val

fmt.Printf(" Worker %d processed %d\n", workerID, val)

}

}(i, ch)

}

return channels

}

// Fan-In: 여러 채널의 결과를 하나로 합침

func fanIn(channels ...<-chan int) <-chan int {

var wg sync.WaitGroup

merged := make(chan int)

for _, ch := range channels {

wg.Add(1)

go func(c <-chan int) {

defer wg.Done()

for val := range c {

merged <- val

}

}(ch)

}

go func() {

wg.Wait()

close(merged)

}()

return merged

}

func main() {

// 입력 채널

input := make(chan int)

go func() {

defer close(input)

for i := 1; i <= 20; i++ {

input <- i

}

}()

// Fan-Out: 4개 워커

workerChannels := fanOut(input, 4)

// Fan-In: 결과 합침

results := fanIn(workerChannels...)

// 결과 수집

var sum int

for result := range results {

sum += result

}

fmt.Printf("Sum of squares: %d\n", sum)

}

8.3 Select 문

package main

"fmt"

"time"

)

func main() {

ch1 := make(chan string)

ch2 := make(chan string)

go func() {

time.Sleep(1 * time.Second)

ch1 <- "from channel 1"

}()

go func() {

time.Sleep(2 * time.Second)

ch2 <- "from channel 2"

}()

// select: 여러 채널 중 준비된 것을 선택

for i := 0; i < 2; i++ {

select {

case msg := <-ch1:

fmt.Println("Received", msg)

case msg := <-ch2:

fmt.Println("Received", msg)

case <-time.After(3 * time.Second):

fmt.Println("Timeout!")

}

}

// 논블로킹 select

ch := make(chan int, 1)

select {

case val := <-ch:

fmt.Println("Got:", val)

default:

fmt.Println("Channel empty, moving on")

}

}

9. 동기화 프리미티브

9.1 Mutex와 RWLock

package main

"fmt"

"sync"

)

// Mutex: 상호 배제

type SafeCounter struct {

mu sync.Mutex

v map[string]int

}

func (c *SafeCounter) Inc(key string) {

c.mu.Lock()

defer c.mu.Unlock()

c.v[key]++

}

func (c *SafeCounter) Value(key string) int {

c.mu.Lock()

defer c.mu.Unlock()

return c.v[key]

}

// RWMutex: 읽기는 동시에, 쓰기는 배타적

type SafeCache struct {

mu sync.RWMutex

store map[string]string

}

func (c *SafeCache) Get(key string) (string, bool) {

c.mu.RLock() // 읽기 잠금 (여러 고루틴 동시 읽기 가능)

defer c.mu.RUnlock()

val, ok := c.store[key]

return val, ok

}

func (c *SafeCache) Set(key, value string) {

c.mu.Lock() // 쓰기 잠금 (배타적)

defer c.mu.Unlock()

c.store[key] = value

}

func main() {

counter := SafeCounter{v: make(map[string]int)}

var wg sync.WaitGroup

for i := 0; i < 1000; i++ {

wg.Add(1)

go func() {

defer wg.Done()

counter.Inc("key")

}()

}

wg.Wait()

fmt.Println("Final count:", counter.Value("key")) // 정확히 1000

}

9.2 Semaphore

async def worker(sem: asyncio.Semaphore, task_id: int):

async with sem: # 최대 3개만 동시 실행

print(f"Task {task_id}: 시작 (슬롯 획득)")

await asyncio.sleep(1)

print(f"Task {task_id}: 완료 (슬롯 반환)")

async def main():

sem = asyncio.Semaphore(3) # 동시 실행 3개로 제한

tasks = [worker(sem, i) for i in range(10)]

await asyncio.gather(*tasks)

asyncio.run(main())

9.3 Condition Variable

class BoundedBuffer:

def __init__(self, capacity: int):

self.buffer = []

self.capacity = capacity

self.lock = threading.Lock()

self.not_full = threading.Condition(self.lock)

self.not_empty = threading.Condition(self.lock)

def produce(self, item):

with self.not_full:

while len(self.buffer) >= self.capacity:

self.not_full.wait() # 버퍼가 비워질 때까지 대기

self.buffer.append(item)

self.not_empty.notify() # 소비자에게 알림

def consume(self):

with self.not_empty:

while len(self.buffer) == 0:

self.not_empty.wait() # 아이템이 들어올 때까지 대기

item = self.buffer.pop(0)

self.not_full.notify() # 생산자에게 알림

return item

buf = BoundedBuffer(5)

def producer():

for i in range(20):

buf.produce(i)

print(f"Produced: {i}")

time.sleep(random.uniform(0.01, 0.1))

def consumer(name):

for _ in range(10):

item = buf.consume()

print(f"{name} consumed: {item}")

time.sleep(random.uniform(0.05, 0.15))

t1 = threading.Thread(target=producer)

t2 = threading.Thread(target=consumer, args=("C1",))

t3 = threading.Thread(target=consumer, args=("C2",))

t1.start(); t2.start(); t3.start()

t1.join(); t2.join(); t3.join()

9.4 Atomic 연산

public class AtomicDemo {

// AtomicLong - 잠금 없는 카운터

private static final AtomicLong counter = new AtomicLong(0);

// AtomicReference - 잠금 없는 참조 업데이트

private static final AtomicReference<String> config =

new AtomicReference<>("default");

public static void main(String[] args) throws InterruptedException {

// CAS (Compare-And-Swap) 루프

Thread[] threads = new Thread[10];

for (int i = 0; i < 10; i++) {

threads[i] = new Thread(() -> {

for (int j = 0; j < 10_000; j++) {

counter.incrementAndGet(); // 원자적 증가

}

});

threads[i].start();

}

for (Thread t : threads) t.join();

System.out.println("Counter: " + counter.get()); // 정확히 100,000

// CAS 직접 사용

boolean updated = config.compareAndSet("default", "production");

System.out.println("Config updated: " + updated); // true

System.out.println("Config value: " + config.get()); // production

}

}

10. Lock-free 자료구조

10.1 CAS (Compare-And-Swap)

CAS는 lock-free 프로그래밍의 기초입니다. "현재 값이 예상 값과 같으면 새 값으로 교체"하는 원자적 연산입니다.

CAS 동작 원리

┌──────────────────────────────────────┐

│ CAS(address, expected, new_value) │

│ │

│ if *address == expected: │

│ *address = new_value │

│ return true // 성공 │

│ else: │

│ return false // 실패 → 재시도 │

│ │

│ (위 전체가 하드웨어 수준에서 원자적) │

└──────────────────────────────────────┘

10.2 ABA 문제

ABA 문제

Thread 1: 값 A를 읽음 → (일시 중단)

Thread 2: A → B로 변경

Thread 3: B → A로 변경

Thread 1: (재개) CAS(A, A, C) → 성공! (하지만 A는 원래의 A가 아님)

해결: 버전 카운터 추가

AtomicStampedReference (Java)

TaggedPointer (C/C++)

10.3 Lock-free 스택 (Treiber Stack)

public class LockFreeStack<T> {

private static class Node<T> {

final T value;

Node<T> next;

Node(T value) {

this.value = value;

}

}

private final AtomicReference<Node<T>> top = new AtomicReference<>(null);

public void push(T value) {

Node<T> newNode = new Node<>(value);

Node<T> oldTop;

do {

oldTop = top.get();

newNode.next = oldTop;

} while (!top.compareAndSet(oldTop, newNode)); // CAS 루프

}

public T pop() {

Node<T> oldTop;

Node<T> newTop;

do {

oldTop = top.get();

if (oldTop == null) return null;

newTop = oldTop.next;

} while (!top.compareAndSet(oldTop, newTop)); // CAS 루프

return oldTop.value;

}

}

11. 동시성 버그 패턴과 방지

11.1 데드락 (Deadlock)

데드락 조건 (4가지 모두 충족 시 발생)

1. 상호 배제 (Mutual Exclusion)

2. 점유 대기 (Hold and Wait)

3. 비선점 (No Preemption)

4. 순환 대기 (Circular Wait)

┌──────────┐ ┌──────────┐

│ Thread A │──Lock1──→│ Thread B │

│ │←──Lock2──│ │

│ (Lock2 │ │ (Lock1 │

│ 대기중) │ │ 대기중) │

└──────────┘ └──────────┘

lock_a = threading.Lock()

lock_b = threading.Lock()

데드락 발생 코드

def thread_1():

with lock_a:

print("Thread 1: Lock A 획득")

time.sleep(0.1)

with lock_b: # Lock B 대기 → 데드락!

print("Thread 1: Lock B 획득")

def thread_2():

with lock_b:

print("Thread 2: Lock B 획득")

time.sleep(0.1)

with lock_a: # Lock A 대기 → 데드락!

print("Thread 2: Lock A 획득")

해결: 잠금 순서 통일

def thread_1_fixed():

with lock_a: # 항상 A → B 순서

print("Thread 1: Lock A 획득")

time.sleep(0.1)

with lock_b:

print("Thread 1: Lock B 획득")

def thread_2_fixed():

with lock_a: # 항상 A → B 순서 (순환 대기 방지)

print("Thread 2: Lock A 획득")

time.sleep(0.1)

with lock_b:

print("Thread 2: Lock B 획득")

11.2 레이스 컨디션 (Race Condition)

package main

"fmt"

"sync"

)

func main() {

// 레이스 컨디션 예시

counter := 0

var wg sync.WaitGroup

for i := 0; i < 1000; i++ {

wg.Add(1)

go func() {

defer wg.Done()

counter++ // DATA RACE! 읽기-수정-쓰기가 원자적이지 않음

}()

}

wg.Wait()

fmt.Println("Counter (unsafe):", counter) // 1000 미만일 수 있음

// 해결 1: Mutex

counter = 0

var mu sync.Mutex

for i := 0; i < 1000; i++ {

wg.Add(1)

go func() {

defer wg.Done()

mu.Lock()

counter++

mu.Unlock()

}()

}

wg.Wait()

fmt.Println("Counter (mutex):", counter) // 정확히 1000

// 해결 2: 채널

counter = 0

ch := make(chan int, 1000)

for i := 0; i < 1000; i++ {

go func() {

ch <- 1

}()

}

for i := 0; i < 1000; i++ {

counter += <-ch

}

fmt.Println("Counter (channel):", counter) // 정확히 1000

}

11.3 기아 상태와 우선순위 역전

기아 상태 (Starvation)

- 특정 스레드가 자원에 접근하지 못하고 무한 대기

- 해결: 공정한 잠금 (Fair Lock), 우선순위 에이징

우선순위 역전 (Priority Inversion)

┌─────────────────────────────────────┐

│ High Priority Task ← 블로킹됨 │

│ ↓ (Lock 대기) │

│ Medium Priority Task ← 실행 중 │

│ ↓ (선점) │

│ Low Priority Task ← Lock 보유 │

│ │

│ 해결: Priority Inheritance Protocol │

│ 낮은 우선순위 태스크가 Lock 보유 시 │

│ 높은 우선순위를 일시적으로 상속 │

└─────────────────────────────────────┘

12. 구조적 동시성 (Structured Concurrency)

12.1 개념

구조적 동시성은 동시 작업의 수명을 코드 블록(스코프)에 묶는 패턴입니다. 스코프가 끝나면 모든 자식 작업이 완료(또는 취소)됨을 보장합니다.

비구조적 동시성 구조적 동시성

┌────────────────┐ ┌────────────────┐

│ start task A │ │ scope { │

│ start task B │ │ task A │

│ ... (누출?) │ │ task B │

│ forget task A? │ │ } // A,B 완료 │

└────────────────┘ └────────────────┘

12.2 Python TaskGroup (3.11+)

async def fetch(name: str, delay: float) -> str:

await asyncio.sleep(delay)

if name == "failing":

raise ValueError(f"{name} failed!")

return f"{name}: done"

async def main():

TaskGroup: 구조적 동시성

try:

async with asyncio.TaskGroup() as tg:

task1 = tg.create_task(fetch("api-1", 0.5))

task2 = tg.create_task(fetch("api-2", 1.0))

task3 = tg.create_task(fetch("api-3", 0.8))

여기 도달 시 모든 태스크 완료 보장

print(task1.result(), task2.result(), task3.result())

except* ValueError as eg:

ExceptionGroup으로 에러 처리

for exc in eg.exceptions:

print(f"Error: {exc}")

asyncio.run(main())

12.3 Java 21 Structured Concurrency

public class StructuredConcurrencyDemo {

record User(String name) {}

record Order(String id) {}

static User fetchUser(String userId) throws Exception {

Thread.sleep(500);

return new User("Alice");

}

static Order fetchOrder(String orderId) throws Exception {

Thread.sleep(800);

return new Order("ORD-123");

}

public static void main(String[] args) throws Exception {

// ShutdownOnFailure: 하나라도 실패하면 나머지 취소

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

Future<User> userFuture = scope.fork(() -> fetchUser("user-1"));

Future<Order> orderFuture = scope.fork(() -> fetchOrder("order-1"));

scope.join(); // 모든 태스크 완료 대기

scope.throwIfFailed(); // 실패 시 예외 전파

// 여기 도달 시 모든 결과 사용 가능

User user = userFuture.resultNow();

Order order = orderFuture.resultNow();

System.out.printf("User: %s, Order: %s%n", user.name(), order.id());

}

}

}

13. 언어별 동시성 모델 비교

| 차원 | Go | Rust | Java 21 | Python | Kotlin | JavaScript | Erlang/Elixir | C# |

|------|-----|------|---------|--------|--------|-----------|--------------|-----|

| 모델 | CSP | Ownership + async | Virtual Thread | asyncio | Coroutine | Event Loop | Actor | Task |

| 경량 단위 | goroutine | Future | Virtual Thread | coroutine | coroutine | Promise | process | Task |

| 스케줄링 | M:N (GMP) | 런타임 의존 | M:N | 1:1 (싱글) | M:N | 1:1 (싱글) | 선점적 M:N | M:N |

| 공유 상태 | channel | ownership | synchronized | asyncio.Lock | Mutex | 없음 (싱글) | 없음 (메시지) | lock |

| 에러 전파 | panic/recover | Result | 예외 | 예외 | 예외 | reject | link/monitor | 예외 |

| 취소 | context | drop | interrupt | cancel | Job.cancel | AbortController | exit | CancelToken |

| 구조적 동시성 | errgroup | 제한적 | StructuredTaskScope | TaskGroup | coroutineScope | 없음 | supervisor | 없음 |

| CPU-bound | goroutine | thread/rayon | thread | multiprocessing | Dispatchers.Default | Worker Thread | process | Parallel.ForEach |

| 메모리 안전 | GC | 컴파일 타임 | GC | GC | GC | GC | GC | GC |

| 데드락 방지 | 채널 설계 | 컴파일 타임 | 주의 필요 | 주의 필요 | 주의 필요 | 해당 없음 | 메시지 기반 | 주의 필요 |

| 성능 | 높음 | 매우 높음 | 높음 | 보통 | 높음 | I/O만 | 높음 (BEAM) | 높음 |

| 학습 곡선 | 낮음 | 높음 | 보통 | 낮음 | 보통 | 낮음 | 보통 | 보통 |

14. 실전 패턴: Rate Limiter

package main

"context"

"fmt"

"sync"

"time"

)

// Token Bucket Rate Limiter

type RateLimiter struct {

tokens chan struct{}

refillRate time.Duration

ctx context.Context

cancel context.CancelFunc

}

func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {

ctx, cancel := context.WithCancel(context.Background())

rl := &RateLimiter{

tokens: make(chan struct{}, maxTokens),

refillRate: refillRate,

ctx: ctx,

cancel: cancel,

}

// 토큰 채우기

for i := 0; i < maxTokens; i++ {

rl.tokens <- struct{}{}

}

// 주기적 리필

go func() {

ticker := time.NewTicker(refillRate)

defer ticker.Stop()

for {

select {

case <-ticker.C:

select {

case rl.tokens <- struct{}{}:

default: // 이미 꽉 참

}

case <-ctx.Done():

return

}

}

}()

return rl

}

func (rl *RateLimiter) Acquire(ctx context.Context) error {

select {

case <-rl.tokens:

return nil

case <-ctx.Done():

return ctx.Err()

}

}

func (rl *RateLimiter) Close() {

rl.cancel()

}

func main() {

limiter := NewRateLimiter(5, 200*time.Millisecond)

defer limiter.Close()

var wg sync.WaitGroup

for i := 0; i < 20; i++ {

wg.Add(1)

go func(id int) {

defer wg.Done()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)

defer cancel()

if err := limiter.Acquire(ctx); err != nil {

fmt.Printf("Request %d: timed out\n", id)

return

}

fmt.Printf("Request %d: processed at %s\n", id,

time.Now().Format("15:04:05.000"))

}(i)

}

wg.Wait()

}

15. 성능 최적화 팁

15.1 적절한 동시성 모델 선택

워크로드 유형에 따른 선택

I/O-bound (네트워크, 디스크):

→ async/await, coroutine, goroutine

→ 이벤트 루프 기반이 효율적

→ 스레드 풀은 과도한 컨텍스트 스위칭 유발

CPU-bound (계산, 변환):

→ OS 스레드 (Python: multiprocessing)

→ Rust: rayon 크레이트

→ Go: goroutine (자동 병렬화)

혼합:

→ 별도 스레드 풀/디스패처 분리

→ Kotlin: Dispatchers.IO vs Dispatchers.Default

→ Python: asyncio + ProcessPoolExecutor

15.2 컨텍스트 스위칭 최소화

CPU-bound 작업을 별도 프로세스에서 실행

def cpu_heavy_task(n: int) -> int:

"""소수 계산 (CPU-bound)"""

count = 0

for i in range(2, n):

if all(i % j != 0 for j in range(2, int(i**0.5) + 1)):

count += 1

return count

async def main():

loop = asyncio.get_event_loop()

ProcessPoolExecutor로 CPU-bound 작업 분리

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:

start = time.perf_counter()

results = await asyncio.gather(

loop.run_in_executor(pool, cpu_heavy_task, 100_000),

loop.run_in_executor(pool, cpu_heavy_task, 100_000),

loop.run_in_executor(pool, cpu_heavy_task, 100_000),

loop.run_in_executor(pool, cpu_heavy_task, 100_000),

)

elapsed = time.perf_counter() - start

print(f"4 tasks completed in {elapsed:.2f}s: {results}")

asyncio.run(main())

16. 퀴즈

동시성은 프로그램의 **구조적** 특성으로, 여러 작업을 논리적으로 동시에 다룰 수 있는 설계입니다. 병렬성은 **실행의** 특성으로, 여러 작업이 물리적으로 동시에 실행되는 것입니다. 동시성은 싱글 코어에서도 가능하지만, 병렬성은 멀티코어가 필요합니다.

- **G (Goroutine)**: 실행할 작업 단위. 경량 스레드.

- **M (Machine)**: OS 스레드. 실제 실행을 담당.

- **P (Processor)**: 논리적 프로세서. G를 M에 매핑하는 스케줄러. GOMAXPROCS 값으로 수를 결정.

P는 로컬 run queue를 가지며, work stealing으로 부하를 균등 분배합니다.

1. **상호 배제 (Mutual Exclusion)**: 자원을 한 번에 하나의 프로세스만 사용

2. **점유 대기 (Hold and Wait)**: 자원을 점유한 채 다른 자원을 대기

3. **비선점 (No Preemption)**: 이미 할당된 자원을 강제로 빼앗을 수 없음

4. **순환 대기 (Circular Wait)**: 프로세스 간 순환 형태의 대기 관계

하나라도 깨면 데드락이 발생하지 않습니다. 가장 일반적인 해결책은 잠금 순서를 통일하여 순환 대기를 방지하는 것입니다.

- **Actor 모델**: 비동기 메시지 전달. 각 Actor는 고유한 메일박스를 가짐. 수신자를 직접 지정. Erlang, Akka.

- **CSP**: 동기 채널 통신. 채널이 독립적인 엔티티. 송신자와 수신자가 서로를 모름. Go의 goroutine + channel.

Actor는 "누구에게 보낼지"를 알고, CSP는 "어느 채널에 보낼지"를 알 뿐입니다.

구조적 동시성은 다음 문제를 해결합니다:

1. **작업 누출 (Task Leaking)**: fire-and-forget으로 시작한 작업이 잊혀지는 문제

2. **에러 전파**: 자식 작업의 에러가 부모에게 전달되지 않는 문제

3. **취소 전파**: 부모가 취소되어도 자식이 계속 실행되는 문제

4. **수명 관리**: 스코프가 끝났는데 작업이 계속 실행되는 문제

Java 21의 StructuredTaskScope, Python의 TaskGroup, Kotlin의 coroutineScope가 대표적입니다.

17. 참고 자료

1. Rob Pike - "Concurrency is not Parallelism" (Go Blog)

2. Java 21 Virtual Threads - JEP 444

3. Java 21 Structured Concurrency - JEP 453

4. Python asyncio 공식 문서

5. Go 공식 문서 - Effective Go (Concurrency)

6. Kotlin Coroutines 공식 가이드

7. Rust async-book (The Async Book)

8. Akka Documentation - Actor Model

9. Erlang/OTP - Processes

10. "The Art of Multiprocessor Programming" - Maurice Herlihy, Nir Shavit

11. "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 8)

12. "Concurrency in Go" - Katherine Cox-Buday (O'Reilly)

13. tokio.rs - Rust 비동기 런타임

14. libuv - Node.js Event Loop 구현체

현재 단락 (1/1116)

"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things...

작성 글자: 0원문 글자: 27,082작성 단락: 0/1116