- Published on
동시성 & 병렬 프로그래밍 완전 가이드 2025: Async/Await, Thread, Goroutine, Actor 모델
- Authors

- Name
- Youngju Kim
- @fjvbn20031
목차
1. 동시성 vs 병렬성: Rob Pike의 구분
"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." -- Rob Pike
이 한 문장이 동시성과 병렬성의 핵심 차이를 요약합니다.
1.1 개념적 구분
동시성(Concurrency) 은 프로그램의 구조적 특성입니다. 여러 작업을 논리적으로 동시에 다룰 수 있도록 프로그램을 설계하는 것입니다. 실제로 동시에 실행되는지는 중요하지 않습니다.
병렬성(Parallelism) 은 실행의 특성입니다. 여러 작업이 물리적으로 동시에 실행되는 것을 의미합니다. 멀티코어 CPU에서 여러 스레드가 동시에 동작하는 것이 대표적입니다.
동시성 (Concurrency) - 구조
┌─────────────────────────────────────────┐
│ Task A: ██░░██░░░░██░░ │
│ Task B: ░░██░░██░░░░██ │
│ Task C: ░░░░░░░░██░░░░██ │
│ ─────────────────→ 시간 │
│ (단일 코어에서 번갈아 실행) │
└─────────────────────────────────────────┘
병렬성 (Parallelism) - 실행
┌─────────────────────────────────────────┐
│ Core 1: ████████████████ │
│ Core 2: ████████████████ │
│ Core 3: ████████████████ │
│ ─────────────────→ 시간 │
│ (다중 코어에서 동시에 실행) │
└─────────────────────────────────────────┘
1.2 실생활 비유
| 상황 | 동시성 | 병렬성 |
|---|---|---|
| 요리사 1명이 3개 요리를 번갈아 만듦 | O | X |
| 요리사 3명이 각각 1개씩 동시에 만듦 | O | O |
| 요리사 1명이 1개 요리만 만듦 | X | X |
1.3 왜 구분이 중요한가
동시성 없이 병렬성만 추구하면 복잡도가 폭발합니다. 올바른 동시성 구조를 먼저 설계한 뒤, 병렬 실행으로 성능을 끌어올리는 것이 정석입니다.
# 동시성 구조 (asyncio) - 단일 스레드에서도 효율적
import asyncio
async def fetch_data(url: str) -> str:
print(f"Fetching {url}...")
await asyncio.sleep(1) # I/O 대기 시뮬레이션
return f"Data from {url}"
async def main():
# 동시성: 3개 요청을 구조적으로 동시에 다룸
results = await asyncio.gather(
fetch_data("https://api.example.com/users"),
fetch_data("https://api.example.com/posts"),
fetch_data("https://api.example.com/comments"),
)
for r in results:
print(r)
asyncio.run(main())
# 총 ~1초 (3초가 아닌) - 동시성의 힘
2. 스레딩 모델: OS 스레드부터 가상 스레드까지
2.1 OS 스레드 (Kernel Thread)
운영체제가 직접 관리하는 스레드입니다. 생성 비용이 높고 (약 1MB 스택), 컨텍스트 스위칭 오버헤드가 존재합니다.
// Java - OS 스레드
public class ThreadExample {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread 1: " + i);
try { Thread.sleep(100); } catch (InterruptedException e) { break; }
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread 2: " + i);
try { Thread.sleep(100); } catch (InterruptedException e) { break; }
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Both threads completed");
}
}
2.2 유저 스레드 (User Thread / Green Thread)
런타임이 관리하는 경량 스레드입니다. OS 스레드 위에 다수의 유저 스레드를 매핑합니다.
스레드 모델 비교
┌───────────────────────────────────────────────┐
│ 1:1 (OS Thread) N:1 (Green Thread) │
│ ┌──┐ ┌──┐ ┌──┐ ┌──┐┌──┐┌──┐┌──┐┌──┐ │
│ │UT│ │UT│ │UT│ │GT││GT││GT││GT││GT│ │
│ └┬─┘ └┬─┘ └┬─┘ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘ │
│ │ │ │ └──┬──┘ │ └──┬──┘ │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐ ┌┴─┐ │ │ │
│ │KT│ │KT│ │KT│ │KT│ │ │ │
│ └──┘ └──┘ └──┘ └──┘ │ │ │
│ │ │ │
│ M:N (Hybrid) │
│ ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐ │
│ │GT││GT││GT││GT││GT││GT│ │
│ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘└┬─┘ │
│ └─┬─┘ └──┬──┘ └──┬──┘ │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐ │
│ │KT│ │KT│ │KT│ │
│ └──┘ └──┘ └──┘ │
└───────────────────────────────────────────────┘
2.3 Java 21 가상 스레드 (Virtual Thread)
Java 21에서 정식 도입된 가상 스레드는 M:N 모델을 구현합니다. 수백만 개의 가상 스레드를 생성할 수 있습니다.
// Java 21 Virtual Threads
import java.util.concurrent.Executors;
import java.time.Duration;
public class VirtualThreadDemo {
public static void main(String[] args) throws Exception {
// 100만 개의 가상 스레드 생성 가능!
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 1_000_000; i++) {
final int taskId = i;
executor.submit(() -> {
try {
Thread.sleep(Duration.ofSeconds(1));
if (taskId % 100_000 == 0) {
System.out.println("Task " + taskId + " done on " +
Thread.currentThread());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
System.out.println("All tasks completed");
}
}
2.4 스레드 모델 비교
| 특성 | OS 스레드 | Green Thread | Virtual Thread (Java 21) |
|---|---|---|---|
| 스택 크기 | ~1MB | ~KB | ~수 KB (동적) |
| 생성 비용 | 높음 (syscall) | 낮음 | 매우 낮음 |
| 동시 개수 | 수천 | 수백만 | 수백만 |
| 선점 | OS 선점 | 협력적 | 협력적 (I/O 시 양보) |
| I/O 블로킹 | 스레드 블로킹 | 전체 블로킹 위험 | 자동 언마운트 |
3. 코루틴: 경량 동시성의 핵심
3.1 Stackful vs Stackless 코루틴
Stackful Coroutine (예: Lua, Go의 goroutine)
┌──────────────────────────────┐
│ 자체 스택을 가짐 │
│ 스택 어디서든 일시중단 가능 │
│ 메모리 사용 높음 │
│ 중첩 함수 호출 중에도 양보 가능 │
└──────────────────────────────┘
Stackless Coroutine (예: Python, Kotlin, Rust)
┌──────────────────────────────┐
│ 상태를 힙에 저장 │
│ 최상위에서만 일시중단 가능 │
│ 메모리 사용 낮음 │
│ 컴파일러가 상태 머신으로 변환 │
└──────────────────────────────┘
3.2 Python asyncio 코루틴
import asyncio
import aiohttp
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""비동기 HTTP 요청"""
async with session.get(url) as response:
return {
"url": url,
"status": response.status,
"length": len(await response.text()),
}
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
# 5개 요청을 동시에 실행
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"5개 요청 완료: {elapsed:.2f}초") # ~1초 (5초가 아닌)
for r in results:
print(f" {r['url']}: status={r['status']}, length={r['length']}")
asyncio.run(main())
3.3 Kotlin 코루틴
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
suspend fun fetchData(id: Int): String {
delay(1000) // 비동기 대기 (스레드 블로킹 없음)
return "Result-$id"
}
fun main() = runBlocking {
val time = measureTimeMillis {
// 구조적 동시성: coroutineScope가 모든 자식 코루틴 관리
coroutineScope {
val results = (1..10).map { id ->
async(Dispatchers.IO) {
fetchData(id)
}
}
results.awaitAll().forEach { println(it) }
}
}
println("Completed in ${time}ms") // ~1000ms
}
4. Goroutine과 Go의 동시성 모델
4.1 Goroutine 내부 구조
Go 런타임은 M:N 스케줄링을 사용합니다. G(Goroutine), M(Machine/OS Thread), P(Processor) 세 가지 개념이 핵심입니다.
Go 런타임 스케줄러 (GMP 모델)
┌─────────────────────────────────────────────┐
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ G1 │ │ G2 │ │ G3 │ │ G4 │ ... │
│ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │
│ │ │ │ │ │
│ ┌──┴────────┴──┐ ┌──┴────────┴──┐ │
│ │ P (Local │ │ P (Local │ │
│ │ Queue) │ │ Queue) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ┌──────┴───────┐ ┌──────┴───────┐ │
│ │ M (OS Thread)│ │ M (OS Thread)│ │
│ └──────────────┘ └──────────────┘ │
│ │
│ Global Run Queue: [G5, G6, G7, ...] │
└─────────────────────────────────────────────┘
GOMAXPROCS = P의 수 (기본값: CPU 코어 수)
4.2 Goroutine 기본 사용
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg) // goroutine 생성 - 약 2KB 스택
}
wg.Wait()
fmt.Println("All workers completed")
}
4.3 Work Stealing
P의 로컬 큐가 비면 다른 P의 큐에서 goroutine을 훔쳐옵니다. 이를 통해 부하를 균등하게 분배합니다.
Work Stealing 과정
┌─────────────┐ ┌─────────────┐
│ P1 (busy) │ │ P2 (idle) │
│ [G1,G2,G3] │ ──→ │ [] │
│ │steal│ │
│ [G1,G2] │ ──→ │ [G3] │
└─────────────┘ └─────────────┘
5. Async/Await 패턴 비교
5.1 JavaScript Promise와 Async/Await
// Promise 체이닝 vs Async/Await
// Promise 체이닝 방식
function fetchUserData(userId) {
return fetch(`/api/users/${userId}`)
.then(response => response.json())
.then(user => fetch(`/api/posts?author=${user.id}`))
.then(response => response.json())
.then(posts => {
return { user, posts };
})
.catch(error => {
console.error('Error:', error);
throw error;
});
}
// Async/Await 방식 (훨씬 읽기 좋음)
async function fetchUserDataAsync(userId) {
try {
const userResponse = await fetch(`/api/users/${userId}`);
const user = await userResponse.json();
const postsResponse = await fetch(`/api/posts?author=${user.id}`);
const posts = await postsResponse.json();
return { user, posts };
} catch (error) {
console.error('Error:', error);
throw error;
}
}
// 병렬 실행
async function fetchMultipleUsers(userIds) {
const promises = userIds.map(id => fetchUserDataAsync(id));
return Promise.all(promises); // 모두 병렬 실행
}
// Promise.allSettled - 일부 실패해도 계속
async function fetchWithPartialFailure(urls) {
const results = await Promise.allSettled(
urls.map(url => fetch(url).then(r => r.json()))
);
const successes = results
.filter(r => r.status === 'fulfilled')
.map(r => r.value);
const failures = results
.filter(r => r.status === 'rejected')
.map(r => r.reason);
return { successes, failures };
}
5.2 Rust Async
use tokio;
use reqwest;
use futures::future::join_all;
#[derive(Debug)]
struct ApiResponse {
url: String,
status: u16,
body_length: usize,
}
async fn fetch_url(client: &reqwest::Client, url: &str) -> Result<ApiResponse, reqwest::Error> {
let response = client.get(url).send().await?;
let status = response.status().as_u16();
let body = response.text().await?;
Ok(ApiResponse {
url: url.to_string(),
status,
body_length: body.len(),
})
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let urls = vec![
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
];
// 모든 요청을 동시에 실행
let futures: Vec<_> = urls.iter()
.map(|url| fetch_url(&client, url))
.collect();
let results = join_all(futures).await;
for result in results {
match result {
Ok(resp) => println!("{}: status={}, length={}", resp.url, resp.status, resp.body_length),
Err(e) => eprintln!("Error: {}", e),
}
}
Ok(())
}
5.3 C# Task 기반 비동기
using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
public class AsyncDemo
{
private static readonly HttpClient _client = new HttpClient();
public static async Task<string> FetchDataAsync(string url)
{
var response = await _client.GetAsync(url);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
public static async Task Main()
{
var urls = new List<string>
{
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
};
// 병렬 실행 with WhenAll
var tasks = urls.Select(url => FetchDataAsync(url));
var results = await Task.WhenAll(tasks);
for (int i = 0; i < urls.Count; i++)
{
Console.WriteLine($"URL: {urls[i]}, Length: {results[i].Length}");
}
// 타임아웃 패턴
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
var data = await _client.GetStringAsync(urls[0], cts.Token);
Console.WriteLine($"Got data: {data.Length} chars");
}
catch (OperationCanceledException)
{
Console.WriteLine("Request timed out!");
}
}
}
5.4 Async/Await 언어 비교
| 특성 | JavaScript | Python | Rust | C# | Kotlin |
|---|---|---|---|---|---|
| 런타임 | V8 Event Loop | asyncio | tokio/async-std | CLR Thread Pool | Dispatchers |
| Future 타입 | Promise | Coroutine | Future (lazy) | Task | Deferred |
| 취소 | AbortController | Task.cancel() | drop | CancellationToken | Job.cancel() |
| 스트림 | AsyncIterator | async for | Stream trait | IAsyncEnumerable | Flow |
| 에러 처리 | try/catch | try/except | Result | try/catch | try/catch |
| 병렬 실행 | Promise.all | gather | join! | Task.WhenAll | awaitAll |
| Zero-cost | X | X | O | X | X |
6. Event Loop 심층 분석
6.1 Node.js libuv Event Loop
Node.js Event Loop 단계
┌───────────────────────────────────┐
│ ┌──────────────┐ │
│ ┌────│ timers │────┐ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ pending I/O │ │ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ idle/prepare │ │ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ poll │◄───┘ │
│ │ └──────────────┘ │
│ │ ┌──────────────┐ │
│ │ │ check │ │
│ │ │ (setImmediate)│ │
│ │ └──────────────┘ │
│ │ ┌──────────────┐ │
│ └────│close callbacks│─────────│
│ └──────────────┘ │
│ │
│ 각 단계 사이: microtask queue │
│ (Promise, queueMicrotask) │
└───────────────────────────────────┘
// Event Loop 동작 순서 이해
console.log('1. 동기 코드');
setTimeout(() => console.log('2. setTimeout (timers)'), 0);
setImmediate(() => console.log('3. setImmediate (check)'));
Promise.resolve().then(() => console.log('4. Promise (microtask)'));
process.nextTick(() => console.log('5. nextTick (microtask - 최우선)'));
console.log('6. 동기 코드 끝');
// 출력 순서:
// 1. 동기 코드
// 6. 동기 코드 끝
// 5. nextTick (microtask - 최우선)
// 4. Promise (microtask)
// 2. setTimeout (timers)
// 3. setImmediate (check)
6.2 Python asyncio Event Loop
import asyncio
import time
async def cpu_bound_simulation(name: str, duration: float):
"""CPU-bound 작업 시뮬레이션"""
print(f"[{name}] 시작")
# await로 양보하지 않으면 이벤트 루프를 블로킹
await asyncio.sleep(duration)
print(f"[{name}] 완료 ({duration}초)")
return f"{name}: done"
async def producer(queue: asyncio.Queue, n: int):
"""생산자: 큐에 아이템 추가"""
for i in range(n):
await asyncio.sleep(0.1)
item = f"item-{i}"
await queue.put(item)
print(f" Produced: {item}")
await queue.put(None) # 종료 신호
async def consumer(queue: asyncio.Queue, name: str):
"""소비자: 큐에서 아이템 소비"""
while True:
item = await queue.get()
if item is None:
await queue.put(None) # 다른 소비자도 종료
break
await asyncio.sleep(0.2) # 처리 시뮬레이션
print(f" {name} consumed: {item}")
queue.task_done()
async def main():
# 1. 동시 실행
start = time.perf_counter()
results = await asyncio.gather(
cpu_bound_simulation("TaskA", 1.0),
cpu_bound_simulation("TaskB", 0.5),
cpu_bound_simulation("TaskC", 0.8),
)
elapsed = time.perf_counter() - start
print(f"총 시간: {elapsed:.2f}초") # ~1초
print(f"결과: {results}")
# 2. 생산자-소비자 패턴
print("\n--- 생산자-소비자 ---")
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue, 10),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
)
asyncio.run(main())
7. Actor 모델: 공유 상태 없는 동시성
7.1 Actor 모델 개념
Actor 모델
┌─────────────────────────────────────────────────┐
│ │
│ ┌─────────┐ 메시지 ┌─────────┐ │
│ │ Actor A │ ───────→ │ Actor B │ │
│ │ [State] │ │ [State] │ │
│ │ [Mailbox] │ [Mailbox] │
│ └─────────┘ └────┬────┘ │
│ ↑ │ │
│ │ 메시지 │ 새 Actor 생성 │
│ └─────────────────────┘ │
│ ↓ │
│ ┌─────────┐ │
│ │ Actor C │ │
│ │ [State] │ │
│ └─────────┘ │
│ │
│ 규칙: │
│ 1. 메시지로만 통신 (공유 상태 없음) │
│ 2. 한 번에 하나의 메시지만 처리 │
│ 3. 다른 Actor를 생성할 수 있음 │
│ 4. 자신의 상태만 변경 가능 │
└─────────────────────────────────────────────────┘
7.2 Erlang/Elixir Actor
%% Erlang - 카운터 Actor
-module(counter).
-export([start/0, increment/1, get/1, loop/1]).
start() ->
spawn(fun() -> loop(0) end).
increment(Pid) ->
Pid ! {increment, 1}.
get(Pid) ->
Pid ! {get, self()},
receive
{count, Value} -> Value
after 5000 ->
timeout
end.
loop(Count) ->
receive
{increment, N} ->
loop(Count + N);
{get, From} ->
From ! {count, Count},
loop(Count);
stop ->
ok
end.
%% 사용 예:
%% Pid = counter:start().
%% counter:increment(Pid).
%% counter:increment(Pid).
%% counter:get(Pid). %% => 2
7.3 Akka (Scala) Actor
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
// 메시지 정의
sealed trait Command
case class Deposit(amount: Double) extends Command
case class Withdraw(amount: Double) extends Command
case class GetBalance(replyTo: ActorRef[Balance]) extends Command
case class Balance(amount: Double)
// 은행 계좌 Actor
object BankAccount {
def apply(balance: Double = 0.0): Behavior[Command] =
Behaviors.receive { (context, message) =>
message match {
case Deposit(amount) =>
val newBalance = balance + amount
context.log.info(s"Deposited $amount, balance: $newBalance")
BankAccount(newBalance)
case Withdraw(amount) =>
if (amount <= balance) {
val newBalance = balance - amount
context.log.info(s"Withdrew $amount, balance: $newBalance")
BankAccount(newBalance)
} else {
context.log.warn(s"Insufficient funds: $balance < $amount")
Behaviors.same
}
case GetBalance(replyTo) =>
replyTo ! Balance(balance)
Behaviors.same
}
}
}
// Supervision (장애 복구)
object BankSupervisor {
def apply(): Behavior[Command] =
Behaviors.supervise(BankAccount())
.onFailure(SupervisorStrategy.restart)
}
7.4 Actor 모델의 장점과 단점
| 장점 | 단점 |
|---|---|
| 공유 상태 없음 (경쟁 조건 방지) | 디버깅이 어려울 수 있음 |
| 위치 투명성 (분산 가능) | 메시지 순서 보장 제한적 |
| 장애 격리 (let it crash) | 메일박스 오버플로우 가능 |
| 확장성이 좋음 | 동기 요청-응답 패턴이 복잡 |
8. CSP: Go의 채널 기반 동시성
8.1 채널 기본
package main
import (
"fmt"
"time"
)
func main() {
// 버퍼 없는 채널 (동기)
ch := make(chan string)
go func() {
time.Sleep(time.Second)
ch <- "Hello from goroutine!"
}()
msg := <-ch // 메시지가 올 때까지 블로킹
fmt.Println(msg)
// 버퍼 있는 채널 (비동기)
buffered := make(chan int, 3)
buffered <- 1
buffered <- 2
buffered <- 3
// buffered <- 4 // 버퍼가 꽉 차서 블로킹!
fmt.Println(<-buffered) // 1
fmt.Println(<-buffered) // 2
}
8.2 Fan-In / Fan-Out 패턴
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Fan-Out: 하나의 입력을 여러 워커에게 분배
func fanOut(input <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
ch := make(chan int)
channels[i] = ch
go func(workerID int, out chan<- int) {
defer close(out)
for val := range input {
// 처리 시뮬레이션
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
out <- val * val
fmt.Printf(" Worker %d processed %d\n", workerID, val)
}
}(i, ch)
}
return channels
}
// Fan-In: 여러 채널의 결과를 하나로 합침
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
merged <- val
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
// 입력 채널
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 20; i++ {
input <- i
}
}()
// Fan-Out: 4개 워커
workerChannels := fanOut(input, 4)
// Fan-In: 결과 합침
results := fanIn(workerChannels...)
// 결과 수집
var sum int
for result := range results {
sum += result
}
fmt.Printf("Sum of squares: %d\n", sum)
}
8.3 Select 문
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "from channel 1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "from channel 2"
}()
// select: 여러 채널 중 준비된 것을 선택
for i := 0; i < 2; i++ {
select {
case msg := <-ch1:
fmt.Println("Received", msg)
case msg := <-ch2:
fmt.Println("Received", msg)
case <-time.After(3 * time.Second):
fmt.Println("Timeout!")
}
}
// 논블로킹 select
ch := make(chan int, 1)
select {
case val := <-ch:
fmt.Println("Got:", val)
default:
fmt.Println("Channel empty, moving on")
}
}
9. 동기화 프리미티브
9.1 Mutex와 RWLock
package main
import (
"fmt"
"sync"
)
// Mutex: 상호 배제
type SafeCounter struct {
mu sync.Mutex
v map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.v[key]++
}
func (c *SafeCounter) Value(key string) int {
c.mu.Lock()
defer c.mu.Unlock()
return c.v[key]
}
// RWMutex: 읽기는 동시에, 쓰기는 배타적
type SafeCache struct {
mu sync.RWMutex
store map[string]string
}
func (c *SafeCache) Get(key string) (string, bool) {
c.mu.RLock() // 읽기 잠금 (여러 고루틴 동시 읽기 가능)
defer c.mu.RUnlock()
val, ok := c.store[key]
return val, ok
}
func (c *SafeCache) Set(key, value string) {
c.mu.Lock() // 쓰기 잠금 (배타적)
defer c.mu.Unlock()
c.store[key] = value
}
func main() {
counter := SafeCounter{v: make(map[string]int)}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Inc("key")
}()
}
wg.Wait()
fmt.Println("Final count:", counter.Value("key")) // 정확히 1000
}
9.2 Semaphore
import asyncio
async def worker(sem: asyncio.Semaphore, task_id: int):
async with sem: # 최대 3개만 동시 실행
print(f"Task {task_id}: 시작 (슬롯 획득)")
await asyncio.sleep(1)
print(f"Task {task_id}: 완료 (슬롯 반환)")
async def main():
sem = asyncio.Semaphore(3) # 동시 실행 3개로 제한
tasks = [worker(sem, i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
9.3 Condition Variable
import threading
import time
import random
class BoundedBuffer:
def __init__(self, capacity: int):
self.buffer = []
self.capacity = capacity
self.lock = threading.Lock()
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
def produce(self, item):
with self.not_full:
while len(self.buffer) >= self.capacity:
self.not_full.wait() # 버퍼가 비워질 때까지 대기
self.buffer.append(item)
self.not_empty.notify() # 소비자에게 알림
def consume(self):
with self.not_empty:
while len(self.buffer) == 0:
self.not_empty.wait() # 아이템이 들어올 때까지 대기
item = self.buffer.pop(0)
self.not_full.notify() # 생산자에게 알림
return item
buf = BoundedBuffer(5)
def producer():
for i in range(20):
buf.produce(i)
print(f"Produced: {i}")
time.sleep(random.uniform(0.01, 0.1))
def consumer(name):
for _ in range(10):
item = buf.consume()
print(f"{name} consumed: {item}")
time.sleep(random.uniform(0.05, 0.15))
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("C1",))
t3 = threading.Thread(target=consumer, args=("C2",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()
9.4 Atomic 연산
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicDemo {
// AtomicLong - 잠금 없는 카운터
private static final AtomicLong counter = new AtomicLong(0);
// AtomicReference - 잠금 없는 참조 업데이트
private static final AtomicReference<String> config =
new AtomicReference<>("default");
public static void main(String[] args) throws InterruptedException {
// CAS (Compare-And-Swap) 루프
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10_000; j++) {
counter.incrementAndGet(); // 원자적 증가
}
});
threads[i].start();
}
for (Thread t : threads) t.join();
System.out.println("Counter: " + counter.get()); // 정확히 100,000
// CAS 직접 사용
boolean updated = config.compareAndSet("default", "production");
System.out.println("Config updated: " + updated); // true
System.out.println("Config value: " + config.get()); // production
}
}
10. Lock-free 자료구조
10.1 CAS (Compare-And-Swap)
CAS는 lock-free 프로그래밍의 기초입니다. "현재 값이 예상 값과 같으면 새 값으로 교체"하는 원자적 연산입니다.
CAS 동작 원리
┌──────────────────────────────────────┐
│ CAS(address, expected, new_value) │
│ │
│ if *address == expected: │
│ *address = new_value │
│ return true // 성공 │
│ else: │
│ return false // 실패 → 재시도 │
│ │
│ (위 전체가 하드웨어 수준에서 원자적) │
└──────────────────────────────────────┘
10.2 ABA 문제
ABA 문제
Thread 1: 값 A를 읽음 → (일시 중단)
Thread 2: A → B로 변경
Thread 3: B → A로 변경
Thread 1: (재개) CAS(A, A, C) → 성공! (하지만 A는 원래의 A가 아님)
해결: 버전 카운터 추가
AtomicStampedReference (Java)
TaggedPointer (C/C++)
10.3 Lock-free 스택 (Treiber Stack)
import java.util.concurrent.atomic.AtomicReference;
public class LockFreeStack<T> {
private static class Node<T> {
final T value;
Node<T> next;
Node(T value) {
this.value = value;
}
}
private final AtomicReference<Node<T>> top = new AtomicReference<>(null);
public void push(T value) {
Node<T> newNode = new Node<>(value);
Node<T> oldTop;
do {
oldTop = top.get();
newNode.next = oldTop;
} while (!top.compareAndSet(oldTop, newNode)); // CAS 루프
}
public T pop() {
Node<T> oldTop;
Node<T> newTop;
do {
oldTop = top.get();
if (oldTop == null) return null;
newTop = oldTop.next;
} while (!top.compareAndSet(oldTop, newTop)); // CAS 루프
return oldTop.value;
}
}
11. 동시성 버그 패턴과 방지
11.1 데드락 (Deadlock)
데드락 조건 (4가지 모두 충족 시 발생)
1. 상호 배제 (Mutual Exclusion)
2. 점유 대기 (Hold and Wait)
3. 비선점 (No Preemption)
4. 순환 대기 (Circular Wait)
┌──────────┐ ┌──────────┐
│ Thread A │──Lock1──→│ Thread B │
│ │←──Lock2──│ │
│ (Lock2 │ │ (Lock1 │
│ 대기중) │ │ 대기중) │
└──────────┘ └──────────┘
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
# 데드락 발생 코드
def thread_1():
with lock_a:
print("Thread 1: Lock A 획득")
time.sleep(0.1)
with lock_b: # Lock B 대기 → 데드락!
print("Thread 1: Lock B 획득")
def thread_2():
with lock_b:
print("Thread 2: Lock B 획득")
time.sleep(0.1)
with lock_a: # Lock A 대기 → 데드락!
print("Thread 2: Lock A 획득")
# 해결: 잠금 순서 통일
def thread_1_fixed():
with lock_a: # 항상 A → B 순서
print("Thread 1: Lock A 획득")
time.sleep(0.1)
with lock_b:
print("Thread 1: Lock B 획득")
def thread_2_fixed():
with lock_a: # 항상 A → B 순서 (순환 대기 방지)
print("Thread 2: Lock A 획득")
time.sleep(0.1)
with lock_b:
print("Thread 2: Lock B 획득")
11.2 레이스 컨디션 (Race Condition)
package main
import (
"fmt"
"sync"
)
func main() {
// 레이스 컨디션 예시
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // DATA RACE! 읽기-수정-쓰기가 원자적이지 않음
}()
}
wg.Wait()
fmt.Println("Counter (unsafe):", counter) // 1000 미만일 수 있음
// 해결 1: Mutex
counter = 0
var mu sync.Mutex
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println("Counter (mutex):", counter) // 정확히 1000
// 해결 2: 채널
counter = 0
ch := make(chan int, 1000)
for i := 0; i < 1000; i++ {
go func() {
ch <- 1
}()
}
for i := 0; i < 1000; i++ {
counter += <-ch
}
fmt.Println("Counter (channel):", counter) // 정확히 1000
}
11.3 기아 상태와 우선순위 역전
기아 상태 (Starvation)
- 특정 스레드가 자원에 접근하지 못하고 무한 대기
- 해결: 공정한 잠금 (Fair Lock), 우선순위 에이징
우선순위 역전 (Priority Inversion)
┌─────────────────────────────────────┐
│ High Priority Task ← 블로킹됨 │
│ ↓ (Lock 대기) │
│ Medium Priority Task ← 실행 중 │
│ ↓ (선점) │
│ Low Priority Task ← Lock 보유 │
│ │
│ 해결: Priority Inheritance Protocol │
│ 낮은 우선순위 태스크가 Lock 보유 시 │
│ 높은 우선순위를 일시적으로 상속 │
└─────────────────────────────────────┘
12. 구조적 동시성 (Structured Concurrency)
12.1 개념
구조적 동시성은 동시 작업의 수명을 코드 블록(스코프)에 묶는 패턴입니다. 스코프가 끝나면 모든 자식 작업이 완료(또는 취소)됨을 보장합니다.
비구조적 동시성 구조적 동시성
┌────────────────┐ ┌────────────────┐
│ start task A │ │ scope { │
│ start task B │ │ task A │
│ ... (누출?) │ │ task B │
│ forget task A? │ │ } // A,B 완료 │
└────────────────┘ └────────────────┘
12.2 Python TaskGroup (3.11+)
import asyncio
async def fetch(name: str, delay: float) -> str:
await asyncio.sleep(delay)
if name == "failing":
raise ValueError(f"{name} failed!")
return f"{name}: done"
async def main():
# TaskGroup: 구조적 동시성
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("api-1", 0.5))
task2 = tg.create_task(fetch("api-2", 1.0))
task3 = tg.create_task(fetch("api-3", 0.8))
# 여기 도달 시 모든 태스크 완료 보장
print(task1.result(), task2.result(), task3.result())
except* ValueError as eg:
# ExceptionGroup으로 에러 처리
for exc in eg.exceptions:
print(f"Error: {exc}")
asyncio.run(main())
12.3 Java 21 Structured Concurrency
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
public class StructuredConcurrencyDemo {
record User(String name) {}
record Order(String id) {}
static User fetchUser(String userId) throws Exception {
Thread.sleep(500);
return new User("Alice");
}
static Order fetchOrder(String orderId) throws Exception {
Thread.sleep(800);
return new Order("ORD-123");
}
public static void main(String[] args) throws Exception {
// ShutdownOnFailure: 하나라도 실패하면 나머지 취소
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<User> userFuture = scope.fork(() -> fetchUser("user-1"));
Future<Order> orderFuture = scope.fork(() -> fetchOrder("order-1"));
scope.join(); // 모든 태스크 완료 대기
scope.throwIfFailed(); // 실패 시 예외 전파
// 여기 도달 시 모든 결과 사용 가능
User user = userFuture.resultNow();
Order order = orderFuture.resultNow();
System.out.printf("User: %s, Order: %s%n", user.name(), order.id());
}
}
}
13. 언어별 동시성 모델 비교
| 차원 | Go | Rust | Java 21 | Python | Kotlin | JavaScript | Erlang/Elixir | C# |
|---|---|---|---|---|---|---|---|---|
| 모델 | CSP | Ownership + async | Virtual Thread | asyncio | Coroutine | Event Loop | Actor | Task |
| 경량 단위 | goroutine | Future | Virtual Thread | coroutine | coroutine | Promise | process | Task |
| 스케줄링 | M:N (GMP) | 런타임 의존 | M:N | 1:1 (싱글) | M:N | 1:1 (싱글) | 선점적 M:N | M:N |
| 공유 상태 | channel | ownership | synchronized | asyncio.Lock | Mutex | 없음 (싱글) | 없음 (메시지) | lock |
| 에러 전파 | panic/recover | Result | 예외 | 예외 | 예외 | reject | link/monitor | 예외 |
| 취소 | context | drop | interrupt | cancel | Job.cancel | AbortController | exit | CancelToken |
| 구조적 동시성 | errgroup | 제한적 | StructuredTaskScope | TaskGroup | coroutineScope | 없음 | supervisor | 없음 |
| CPU-bound | goroutine | thread/rayon | thread | multiprocessing | Dispatchers.Default | Worker Thread | process | Parallel.ForEach |
| 메모리 안전 | GC | 컴파일 타임 | GC | GC | GC | GC | GC | GC |
| 데드락 방지 | 채널 설계 | 컴파일 타임 | 주의 필요 | 주의 필요 | 주의 필요 | 해당 없음 | 메시지 기반 | 주의 필요 |
| 성능 | 높음 | 매우 높음 | 높음 | 보통 | 높음 | I/O만 | 높음 (BEAM) | 높음 |
| 학습 곡선 | 낮음 | 높음 | 보통 | 낮음 | 보통 | 낮음 | 보통 | 보통 |
14. 실전 패턴: Rate Limiter
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Token Bucket Rate Limiter
type RateLimiter struct {
tokens chan struct{}
refillRate time.Duration
ctx context.Context
cancel context.CancelFunc
}
func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
ctx, cancel := context.WithCancel(context.Background())
rl := &RateLimiter{
tokens: make(chan struct{}, maxTokens),
refillRate: refillRate,
ctx: ctx,
cancel: cancel,
}
// 토큰 채우기
for i := 0; i < maxTokens; i++ {
rl.tokens <- struct{}{}
}
// 주기적 리필
go func() {
ticker := time.NewTicker(refillRate)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case rl.tokens <- struct{}{}:
default: // 이미 꽉 참
}
case <-ctx.Done():
return
}
}
}()
return rl
}
func (rl *RateLimiter) Acquire(ctx context.Context) error {
select {
case <-rl.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (rl *RateLimiter) Close() {
rl.cancel()
}
func main() {
limiter := NewRateLimiter(5, 200*time.Millisecond)
defer limiter.Close()
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := limiter.Acquire(ctx); err != nil {
fmt.Printf("Request %d: timed out\n", id)
return
}
fmt.Printf("Request %d: processed at %s\n", id,
time.Now().Format("15:04:05.000"))
}(i)
}
wg.Wait()
}
15. 성능 최적화 팁
15.1 적절한 동시성 모델 선택
워크로드 유형에 따른 선택
I/O-bound (네트워크, 디스크):
→ async/await, coroutine, goroutine
→ 이벤트 루프 기반이 효율적
→ 스레드 풀은 과도한 컨텍스트 스위칭 유발
CPU-bound (계산, 변환):
→ OS 스레드 (Python: multiprocessing)
→ Rust: rayon 크레이트
→ Go: goroutine (자동 병렬화)
혼합:
→ 별도 스레드 풀/디스패처 분리
→ Kotlin: Dispatchers.IO vs Dispatchers.Default
→ Python: asyncio + ProcessPoolExecutor
15.2 컨텍스트 스위칭 최소화
import asyncio
import concurrent.futures
import time
# CPU-bound 작업을 별도 프로세스에서 실행
def cpu_heavy_task(n: int) -> int:
"""소수 계산 (CPU-bound)"""
count = 0
for i in range(2, n):
if all(i % j != 0 for j in range(2, int(i**0.5) + 1)):
count += 1
return count
async def main():
loop = asyncio.get_event_loop()
# ProcessPoolExecutor로 CPU-bound 작업 분리
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
start = time.perf_counter()
results = await asyncio.gather(
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
)
elapsed = time.perf_counter() - start
print(f"4 tasks completed in {elapsed:.2f}s: {results}")
asyncio.run(main())
16. 퀴즈
Q1. 동시성과 병렬성의 핵심 차이는?
동시성은 프로그램의 구조적 특성으로, 여러 작업을 논리적으로 동시에 다룰 수 있는 설계입니다. 병렬성은 실행의 특성으로, 여러 작업이 물리적으로 동시에 실행되는 것입니다. 동시성은 싱글 코어에서도 가능하지만, 병렬성은 멀티코어가 필요합니다.
Q2. Goroutine의 GMP 모델에서 G, M, P는 각각 무엇인가?
- G (Goroutine): 실행할 작업 단위. 경량 스레드.
- M (Machine): OS 스레드. 실제 실행을 담당.
- P (Processor): 논리적 프로세서. G를 M에 매핑하는 스케줄러. GOMAXPROCS 값으로 수를 결정.
P는 로컬 run queue를 가지며, work stealing으로 부하를 균등 분배합니다.
Q3. 데드락이 발생하려면 어떤 4가지 조건이 모두 충족되어야 하는가?
- 상호 배제 (Mutual Exclusion): 자원을 한 번에 하나의 프로세스만 사용
- 점유 대기 (Hold and Wait): 자원을 점유한 채 다른 자원을 대기
- 비선점 (No Preemption): 이미 할당된 자원을 강제로 빼앗을 수 없음
- 순환 대기 (Circular Wait): 프로세스 간 순환 형태의 대기 관계
하나라도 깨면 데드락이 발생하지 않습니다. 가장 일반적인 해결책은 잠금 순서를 통일하여 순환 대기를 방지하는 것입니다.
Q4. Actor 모델과 CSP의 핵심 차이는?
- Actor 모델: 비동기 메시지 전달. 각 Actor는 고유한 메일박스를 가짐. 수신자를 직접 지정. Erlang, Akka.
- CSP: 동기 채널 통신. 채널이 독립적인 엔티티. 송신자와 수신자가 서로를 모름. Go의 goroutine + channel.
Actor는 "누구에게 보낼지"를 알고, CSP는 "어느 채널에 보낼지"를 알 뿐입니다.
Q5. 구조적 동시성이 해결하는 문제는?
구조적 동시성은 다음 문제를 해결합니다:
- 작업 누출 (Task Leaking): fire-and-forget으로 시작한 작업이 잊혀지는 문제
- 에러 전파: 자식 작업의 에러가 부모에게 전달되지 않는 문제
- 취소 전파: 부모가 취소되어도 자식이 계속 실행되는 문제
- 수명 관리: 스코프가 끝났는데 작업이 계속 실행되는 문제
Java 21의 StructuredTaskScope, Python의 TaskGroup, Kotlin의 coroutineScope가 대표적입니다.
17. 참고 자료
- Rob Pike - "Concurrency is not Parallelism" (Go Blog)
- Java 21 Virtual Threads - JEP 444
- Java 21 Structured Concurrency - JEP 453
- Python asyncio 공식 문서
- Go 공식 문서 - Effective Go (Concurrency)
- Kotlin Coroutines 공식 가이드
- Rust async-book (The Async Book)
- Akka Documentation - Actor Model
- Erlang/OTP - Processes
- "The Art of Multiprocessor Programming" - Maurice Herlihy, Nir Shavit
- "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 8)
- "Concurrency in Go" - Katherine Cox-Buday (O'Reilly)
- tokio.rs - Rust 비동기 런타임
- libuv - Node.js Event Loop 구현체