- Published on
Concurrency & Parallel Programming Complete Guide 2025: Async/Await, Thread, Goroutine, Actor Model
- Authors

- Name
- Youngju Kim
- @fjvbn20031
Table of Contents
1. Concurrency vs Parallelism: Rob Pike's Distinction
"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." -- Rob Pike
This single sentence captures the essential difference between concurrency and parallelism.
1.1 Conceptual Distinction
Concurrency is a structural property of a program. It means designing a program so that it can logically handle multiple tasks at the same time. Whether they actually execute simultaneously is irrelevant.
Parallelism is a property of execution. It means multiple tasks physically running at the same time. The classic example is multiple threads running simultaneously on a multi-core CPU.
Concurrency - Structure
┌─────────────────────────────────────────┐
│ Task A: ██░░██░░░░██░░ │
│ Task B: ░░██░░██░░░░██ │
│ Task C: ░░░░░░░░██░░░░██ │
│ ─────────────────→ Time │
│ (Interleaved on a single core) │
└─────────────────────────────────────────┘
Parallelism - Execution
┌─────────────────────────────────────────┐
│ Core 1: ████████████████ │
│ Core 2: ████████████████ │
│ Core 3: ████████████████ │
│ ─────────────────→ Time │
│ (Simultaneous on multiple cores) │
└─────────────────────────────────────────┘
1.2 Real-World Analogy
| Scenario | Concurrent | Parallel |
|---|---|---|
| 1 chef alternating between 3 dishes | Yes | No |
| 3 chefs each cooking 1 dish simultaneously | Yes | Yes |
| 1 chef cooking only 1 dish | No | No |
1.3 Why the Distinction Matters
Pursuing parallelism without concurrency leads to exploding complexity. The correct approach is to first design proper concurrent structure, then leverage parallel execution for performance.
# Concurrent structure (asyncio) - efficient even on single thread
import asyncio
async def fetch_data(url: str) -> str:
print(f"Fetching {url}...")
await asyncio.sleep(1) # I/O wait simulation
return f"Data from {url}"
async def main():
# Concurrency: structurally handling 3 requests simultaneously
results = await asyncio.gather(
fetch_data("https://api.example.com/users"),
fetch_data("https://api.example.com/posts"),
fetch_data("https://api.example.com/comments"),
)
for r in results:
print(r)
asyncio.run(main())
# Total ~1 second (not 3) - the power of concurrency
2. Threading Models: From OS Threads to Virtual Threads
2.1 OS Threads (Kernel Threads)
Threads managed directly by the operating system. High creation cost (approximately 1MB stack) and context switching overhead.
// Java - OS Threads
public class ThreadExample {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread 1: " + i);
try { Thread.sleep(100); } catch (InterruptedException e) { break; }
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread 2: " + i);
try { Thread.sleep(100); } catch (InterruptedException e) { break; }
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Both threads completed");
}
}
2.2 User Threads (Green Threads)
Lightweight threads managed by the runtime. Multiple user threads are mapped onto OS threads.
Thread Model Comparison
┌───────────────────────────────────────────────┐
│ 1:1 (OS Thread) N:1 (Green Thread) │
│ ┌──┐ ┌──┐ ┌──┐ ┌──┐┌──┐┌──┐┌──┐┌──┐ │
│ │UT│ │UT│ │UT│ │GT││GT││GT││GT││GT│ │
│ └┬─┘ └┬─┘ └┬─┘ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘ │
│ │ │ │ └──┬──┘ │ └──┬──┘ │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐ ┌┴─┐ │ │ │
│ │KT│ │KT│ │KT│ │KT│ │ │ │
│ └──┘ └──┘ └──┘ └──┘ │ │ │
│ │ │ │
│ M:N (Hybrid) │
│ ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐ │
│ │GT││GT││GT││GT││GT││GT│ │
│ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘└┬─┘ │
│ └─┬─┘ └──┬──┘ └──┬──┘ │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐ │
│ │KT│ │KT│ │KT│ │
│ └──┘ └──┘ └──┘ │
└───────────────────────────────────────────────┘
2.3 Java 21 Virtual Threads
Virtual threads, officially introduced in Java 21, implement the M:N model. You can create millions of virtual threads.
// Java 21 Virtual Threads
import java.util.concurrent.Executors;
import java.time.Duration;
public class VirtualThreadDemo {
public static void main(String[] args) throws Exception {
// Can create 1 million virtual threads!
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 1_000_000; i++) {
final int taskId = i;
executor.submit(() -> {
try {
Thread.sleep(Duration.ofSeconds(1));
if (taskId % 100_000 == 0) {
System.out.println("Task " + taskId + " done on " +
Thread.currentThread());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
System.out.println("All tasks completed");
}
}
2.4 Thread Model Comparison
| Property | OS Thread | Green Thread | Virtual Thread (Java 21) |
|---|---|---|---|
| Stack Size | ~1MB | ~KB | ~Few KB (dynamic) |
| Creation Cost | High (syscall) | Low | Very Low |
| Concurrent Count | Thousands | Millions | Millions |
| Preemption | OS preemptive | Cooperative | Cooperative (yields on I/O) |
| I/O Blocking | Blocks thread | Risk of blocking all | Auto-unmount |
3. Coroutines: The Core of Lightweight Concurrency
3.1 Stackful vs Stackless Coroutines
Stackful Coroutine (e.g., Lua, Go's goroutine)
┌──────────────────────────────────┐
│ Has its own stack │
│ Can suspend anywhere in stack │
│ Higher memory usage │
│ Can yield from nested calls │
└──────────────────────────────────┘
Stackless Coroutine (e.g., Python, Kotlin, Rust)
┌──────────────────────────────────┐
│ State stored on heap │
│ Can only suspend at top level │
│ Lower memory usage │
│ Compiler transforms to state machine │
└──────────────────────────────────┘
3.2 Python asyncio Coroutines
import asyncio
import aiohttp
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""Asynchronous HTTP request"""
async with session.get(url) as response:
return {
"url": url,
"status": response.status,
"length": len(await response.text()),
}
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
# Execute 5 requests concurrently
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"5 requests completed: {elapsed:.2f}s") # ~1s (not 5s)
for r in results:
print(f" {r['url']}: status={r['status']}, length={r['length']}")
asyncio.run(main())
3.3 Kotlin Coroutines
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
suspend fun fetchData(id: Int): String {
delay(1000) // Async wait (no thread blocking)
return "Result-$id"
}
fun main() = runBlocking {
val time = measureTimeMillis {
// Structured concurrency: coroutineScope manages all child coroutines
coroutineScope {
val results = (1..10).map { id ->
async(Dispatchers.IO) {
fetchData(id)
}
}
results.awaitAll().forEach { println(it) }
}
}
println("Completed in ${time}ms") // ~1000ms
}
4. Goroutines and Go's Concurrency Model
4.1 Goroutine Internal Structure
Go's runtime uses M:N scheduling. Three concepts are central: G (Goroutine), M (Machine/OS Thread), and P (Processor).
Go Runtime Scheduler (GMP Model)
┌─────────────────────────────────────────────┐
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ G1 │ │ G2 │ │ G3 │ │ G4 │ ... │
│ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │
│ │ │ │ │ │
│ ┌──┴────────┴──┐ ┌──┴────────┴──┐ │
│ │ P (Local │ │ P (Local │ │
│ │ Queue) │ │ Queue) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ┌──────┴───────┐ ┌──────┴───────┐ │
│ │ M (OS Thread)│ │ M (OS Thread)│ │
│ └──────────────┘ └──────────────┘ │
│ │
│ Global Run Queue: [G5, G6, G7, ...] │
└─────────────────────────────────────────────┘
GOMAXPROCS = number of P's (default: CPU core count)
4.2 Basic Goroutine Usage
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg) // goroutine creation - ~2KB stack
}
wg.Wait()
fmt.Println("All workers completed")
}
4.3 Work Stealing
When a P's local queue is empty, it steals goroutines from another P's queue. This distributes load evenly.
Work Stealing Process
┌─────────────┐ ┌─────────────┐
│ P1 (busy) │ │ P2 (idle) │
│ [G1,G2,G3] │ ──→ │ [] │
│ │steal│ │
│ [G1,G2] │ ──→ │ [G3] │
└─────────────┘ └─────────────┘
5. Async/Await Pattern Comparison
5.1 JavaScript Promise and Async/Await
// Promise chaining vs Async/Await
// Promise chaining approach
function fetchUserData(userId) {
return fetch(`/api/users/${userId}`)
.then(response => response.json())
.then(user => fetch(`/api/posts?author=${user.id}`))
.then(response => response.json())
.then(posts => {
return { user, posts };
})
.catch(error => {
console.error('Error:', error);
throw error;
});
}
// Async/Await approach (much more readable)
async function fetchUserDataAsync(userId) {
try {
const userResponse = await fetch(`/api/users/${userId}`);
const user = await userResponse.json();
const postsResponse = await fetch(`/api/posts?author=${user.id}`);
const posts = await postsResponse.json();
return { user, posts };
} catch (error) {
console.error('Error:', error);
throw error;
}
}
// Parallel execution
async function fetchMultipleUsers(userIds) {
const promises = userIds.map(id => fetchUserDataAsync(id));
return Promise.all(promises); // All execute in parallel
}
// Promise.allSettled - continues even if some fail
async function fetchWithPartialFailure(urls) {
const results = await Promise.allSettled(
urls.map(url => fetch(url).then(r => r.json()))
);
const successes = results
.filter(r => r.status === 'fulfilled')
.map(r => r.value);
const failures = results
.filter(r => r.status === 'rejected')
.map(r => r.reason);
return { successes, failures };
}
5.2 Rust Async
use tokio;
use reqwest;
use futures::future::join_all;
#[derive(Debug)]
struct ApiResponse {
url: String,
status: u16,
body_length: usize,
}
async fn fetch_url(client: &reqwest::Client, url: &str) -> Result<ApiResponse, reqwest::Error> {
let response = client.get(url).send().await?;
let status = response.status().as_u16();
let body = response.text().await?;
Ok(ApiResponse {
url: url.to_string(),
status,
body_length: body.len(),
})
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let urls = vec![
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
];
// Execute all requests concurrently
let futures: Vec<_> = urls.iter()
.map(|url| fetch_url(&client, url))
.collect();
let results = join_all(futures).await;
for result in results {
match result {
Ok(resp) => println!("{}: status={}, length={}", resp.url, resp.status, resp.body_length),
Err(e) => eprintln!("Error: {}", e),
}
}
Ok(())
}
5.3 C# Task-Based Asynchrony
using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
public class AsyncDemo
{
private static readonly HttpClient _client = new HttpClient();
public static async Task<string> FetchDataAsync(string url)
{
var response = await _client.GetAsync(url);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
public static async Task Main()
{
var urls = new List<string>
{
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
};
// Parallel execution with WhenAll
var tasks = urls.Select(url => FetchDataAsync(url));
var results = await Task.WhenAll(tasks);
for (int i = 0; i < urls.Count; i++)
{
Console.WriteLine($"URL: {urls[i]}, Length: {results[i].Length}");
}
// Timeout pattern
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
var data = await _client.GetStringAsync(urls[0], cts.Token);
Console.WriteLine($"Got data: {data.Length} chars");
}
catch (OperationCanceledException)
{
Console.WriteLine("Request timed out!");
}
}
}
5.4 Async/Await Language Comparison
| Property | JavaScript | Python | Rust | C# | Kotlin |
|---|---|---|---|---|---|
| Runtime | V8 Event Loop | asyncio | tokio/async-std | CLR Thread Pool | Dispatchers |
| Future Type | Promise | Coroutine | Future (lazy) | Task | Deferred |
| Cancellation | AbortController | Task.cancel() | drop | CancellationToken | Job.cancel() |
| Streams | AsyncIterator | async for | Stream trait | IAsyncEnumerable | Flow |
| Error Handling | try/catch | try/except | Result | try/catch | try/catch |
| Parallel Exec | Promise.all | gather | join! | Task.WhenAll | awaitAll |
| Zero-cost | No | No | Yes | No | No |
6. Event Loop Deep Dive
6.1 Node.js libuv Event Loop
Node.js Event Loop Phases
┌───────────────────────────────────┐
│ ┌──────────────┐ │
│ ┌────│ timers │────┐ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ pending I/O │ │ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ idle/prepare │ │ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ poll │◄───┘ │
│ │ └──────────────┘ │
│ │ ┌──────────────┐ │
│ │ │ check │ │
│ │ │ (setImmediate)│ │
│ │ └──────────────┘ │
│ │ ┌──────────────┐ │
│ └────│close callbacks│─────────│
│ └──────────────┘ │
│ │
│ Between each phase: microtask │
│ queue (Promise, queueMicrotask) │
└───────────────────────────────────┘
// Understanding Event Loop execution order
console.log('1. Synchronous code');
setTimeout(() => console.log('2. setTimeout (timers)'), 0);
setImmediate(() => console.log('3. setImmediate (check)'));
Promise.resolve().then(() => console.log('4. Promise (microtask)'));
process.nextTick(() => console.log('5. nextTick (microtask - highest priority)'));
console.log('6. End of synchronous code');
// Output order:
// 1. Synchronous code
// 6. End of synchronous code
// 5. nextTick (microtask - highest priority)
// 4. Promise (microtask)
// 2. setTimeout (timers)
// 3. setImmediate (check)
6.2 Python asyncio Event Loop
import asyncio
import time
async def cpu_bound_simulation(name: str, duration: float):
"""CPU-bound work simulation"""
print(f"[{name}] Starting")
# Without await yield, the event loop would be blocked
await asyncio.sleep(duration)
print(f"[{name}] Complete ({duration}s)")
return f"{name}: done"
async def producer(queue: asyncio.Queue, n: int):
"""Producer: add items to queue"""
for i in range(n):
await asyncio.sleep(0.1)
item = f"item-{i}"
await queue.put(item)
print(f" Produced: {item}")
await queue.put(None) # Sentinel value
async def consumer(queue: asyncio.Queue, name: str):
"""Consumer: consume items from queue"""
while True:
item = await queue.get()
if item is None:
await queue.put(None) # Pass sentinel to other consumers
break
await asyncio.sleep(0.2) # Processing simulation
print(f" {name} consumed: {item}")
queue.task_done()
async def main():
# 1. Concurrent execution
start = time.perf_counter()
results = await asyncio.gather(
cpu_bound_simulation("TaskA", 1.0),
cpu_bound_simulation("TaskB", 0.5),
cpu_bound_simulation("TaskC", 0.8),
)
elapsed = time.perf_counter() - start
print(f"Total time: {elapsed:.2f}s") # ~1s
print(f"Results: {results}")
# 2. Producer-consumer pattern
print("\n--- Producer-Consumer ---")
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue, 10),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
)
asyncio.run(main())
7. Actor Model: Concurrency Without Shared State
7.1 Actor Model Concepts
Actor Model
┌─────────────────────────────────────────────────┐
│ │
│ ┌─────────┐ message ┌─────────┐ │
│ │ Actor A │ ───────→ │ Actor B │ │
│ │ [State] │ │ [State] │ │
│ │ [Mailbox] │ [Mailbox] │
│ └─────────┘ └────┬────┘ │
│ ↑ │ │
│ │ message │ spawn new Actor │
│ └─────────────────────┘ │
│ ↓ │
│ ┌─────────┐ │
│ │ Actor C │ │
│ │ [State] │ │
│ └─────────┘ │
│ │
│ Rules: │
│ 1. Communicate only via messages (no shared state) │
│ 2. Process one message at a time │
│ 3. Can spawn new Actors │
│ 4. Can only modify own state │
└─────────────────────────────────────────────────┘
7.2 Erlang/Elixir Actor
%% Erlang - Counter Actor
-module(counter).
-export([start/0, increment/1, get/1, loop/1]).
start() ->
spawn(fun() -> loop(0) end).
increment(Pid) ->
Pid ! {increment, 1}.
get(Pid) ->
Pid ! {get, self()},
receive
{count, Value} -> Value
after 5000 ->
timeout
end.
loop(Count) ->
receive
{increment, N} ->
loop(Count + N);
{get, From} ->
From ! {count, Count},
loop(Count);
stop ->
ok
end.
%% Usage:
%% Pid = counter:start().
%% counter:increment(Pid).
%% counter:increment(Pid).
%% counter:get(Pid). %% => 2
7.3 Akka (Scala) Actor
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
// Message definitions
sealed trait Command
case class Deposit(amount: Double) extends Command
case class Withdraw(amount: Double) extends Command
case class GetBalance(replyTo: ActorRef[Balance]) extends Command
case class Balance(amount: Double)
// Bank Account Actor
object BankAccount {
def apply(balance: Double = 0.0): Behavior[Command] =
Behaviors.receive { (context, message) =>
message match {
case Deposit(amount) =>
val newBalance = balance + amount
context.log.info(s"Deposited $amount, balance: $newBalance")
BankAccount(newBalance)
case Withdraw(amount) =>
if (amount <= balance) {
val newBalance = balance - amount
context.log.info(s"Withdrew $amount, balance: $newBalance")
BankAccount(newBalance)
} else {
context.log.warn(s"Insufficient funds: $balance < $amount")
Behaviors.same
}
case GetBalance(replyTo) =>
replyTo ! Balance(balance)
Behaviors.same
}
}
}
// Supervision (fault recovery)
object BankSupervisor {
def apply(): Behavior[Command] =
Behaviors.supervise(BankAccount())
.onFailure(SupervisorStrategy.restart)
}
7.4 Actor Model Pros and Cons
| Pros | Cons |
|---|---|
| No shared state (prevents race conditions) | Can be difficult to debug |
| Location transparency (distributable) | Limited message ordering guarantees |
| Fault isolation (let it crash) | Potential mailbox overflow |
| Good scalability | Synchronous request-response is complex |
8. CSP: Go's Channel-Based Concurrency
8.1 Channel Basics
package main
import (
"fmt"
"time"
)
func main() {
// Unbuffered channel (synchronous)
ch := make(chan string)
go func() {
time.Sleep(time.Second)
ch <- "Hello from goroutine!"
}()
msg := <-ch // Blocks until message arrives
fmt.Println(msg)
// Buffered channel (asynchronous)
buffered := make(chan int, 3)
buffered <- 1
buffered <- 2
buffered <- 3
// buffered <- 4 // Would block - buffer full!
fmt.Println(<-buffered) // 1
fmt.Println(<-buffered) // 2
}
8.2 Fan-In / Fan-Out Pattern
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Fan-Out: distribute one input to multiple workers
func fanOut(input <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
ch := make(chan int)
channels[i] = ch
go func(workerID int, out chan<- int) {
defer close(out)
for val := range input {
// Processing simulation
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
out <- val * val
fmt.Printf(" Worker %d processed %d\n", workerID, val)
}
}(i, ch)
}
return channels
}
// Fan-In: merge results from multiple channels into one
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
merged <- val
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
// Input channel
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 20; i++ {
input <- i
}
}()
// Fan-Out: 4 workers
workerChannels := fanOut(input, 4)
// Fan-In: merge results
results := fanIn(workerChannels...)
// Collect results
var sum int
for result := range results {
sum += result
}
fmt.Printf("Sum of squares: %d\n", sum)
}
8.3 Select Statement
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "from channel 1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "from channel 2"
}()
// select: choose whichever channel is ready
for i := 0; i < 2; i++ {
select {
case msg := <-ch1:
fmt.Println("Received", msg)
case msg := <-ch2:
fmt.Println("Received", msg)
case <-time.After(3 * time.Second):
fmt.Println("Timeout!")
}
}
// Non-blocking select
ch := make(chan int, 1)
select {
case val := <-ch:
fmt.Println("Got:", val)
default:
fmt.Println("Channel empty, moving on")
}
}
9. Synchronization Primitives
9.1 Mutex and RWLock
package main
import (
"fmt"
"sync"
)
// Mutex: mutual exclusion
type SafeCounter struct {
mu sync.Mutex
v map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.v[key]++
}
func (c *SafeCounter) Value(key string) int {
c.mu.Lock()
defer c.mu.Unlock()
return c.v[key]
}
// RWMutex: concurrent reads, exclusive writes
type SafeCache struct {
mu sync.RWMutex
store map[string]string
}
func (c *SafeCache) Get(key string) (string, bool) {
c.mu.RLock() // Read lock (multiple goroutines can read concurrently)
defer c.mu.RUnlock()
val, ok := c.store[key]
return val, ok
}
func (c *SafeCache) Set(key, value string) {
c.mu.Lock() // Write lock (exclusive)
defer c.mu.Unlock()
c.store[key] = value
}
func main() {
counter := SafeCounter{v: make(map[string]int)}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Inc("key")
}()
}
wg.Wait()
fmt.Println("Final count:", counter.Value("key")) // Exactly 1000
}
9.2 Semaphore
import asyncio
async def worker(sem: asyncio.Semaphore, task_id: int):
async with sem: # Max 3 concurrent executions
print(f"Task {task_id}: started (slot acquired)")
await asyncio.sleep(1)
print(f"Task {task_id}: completed (slot released)")
async def main():
sem = asyncio.Semaphore(3) # Limit to 3 concurrent
tasks = [worker(sem, i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
9.3 Condition Variable
import threading
import time
import random
class BoundedBuffer:
def __init__(self, capacity: int):
self.buffer = []
self.capacity = capacity
self.lock = threading.Lock()
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
def produce(self, item):
with self.not_full:
while len(self.buffer) >= self.capacity:
self.not_full.wait() # Wait until buffer has space
self.buffer.append(item)
self.not_empty.notify() # Notify consumer
def consume(self):
with self.not_empty:
while len(self.buffer) == 0:
self.not_empty.wait() # Wait until item available
item = self.buffer.pop(0)
self.not_full.notify() # Notify producer
return item
buf = BoundedBuffer(5)
def producer():
for i in range(20):
buf.produce(i)
print(f"Produced: {i}")
time.sleep(random.uniform(0.01, 0.1))
def consumer(name):
for _ in range(10):
item = buf.consume()
print(f"{name} consumed: {item}")
time.sleep(random.uniform(0.05, 0.15))
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("C1",))
t3 = threading.Thread(target=consumer, args=("C2",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()
9.4 Atomic Operations
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicDemo {
// AtomicLong - lock-free counter
private static final AtomicLong counter = new AtomicLong(0);
// AtomicReference - lock-free reference update
private static final AtomicReference<String> config =
new AtomicReference<>("default");
public static void main(String[] args) throws InterruptedException {
// CAS (Compare-And-Swap) loop
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10_000; j++) {
counter.incrementAndGet(); // Atomic increment
}
});
threads[i].start();
}
for (Thread t : threads) t.join();
System.out.println("Counter: " + counter.get()); // Exactly 100,000
// Direct CAS usage
boolean updated = config.compareAndSet("default", "production");
System.out.println("Config updated: " + updated); // true
System.out.println("Config value: " + config.get()); // production
}
}
10. Lock-Free Data Structures
10.1 CAS (Compare-And-Swap)
CAS is the foundation of lock-free programming. It is an atomic operation that says "if the current value equals the expected value, replace it with the new value."
CAS Operation
┌──────────────────────────────────────┐
│ CAS(address, expected, new_value) │
│ │
│ if *address == expected: │
│ *address = new_value │
│ return true // success │
│ else: │
│ return false // fail -> retry │
│ │
│ (entire operation is hardware-atomic)│
└──────────────────────────────────────┘
10.2 The ABA Problem
ABA Problem
Thread 1: reads value A -> (suspended)
Thread 2: changes A -> B
Thread 3: changes B -> A
Thread 1: (resumes) CAS(A, A, C) -> success! (but A is not the original A)
Solution: add version counter
AtomicStampedReference (Java)
TaggedPointer (C/C++)
10.3 Lock-Free Stack (Treiber Stack)
import java.util.concurrent.atomic.AtomicReference;
public class LockFreeStack<T> {
private static class Node<T> {
final T value;
Node<T> next;
Node(T value) {
this.value = value;
}
}
private final AtomicReference<Node<T>> top = new AtomicReference<>(null);
public void push(T value) {
Node<T> newNode = new Node<>(value);
Node<T> oldTop;
do {
oldTop = top.get();
newNode.next = oldTop;
} while (!top.compareAndSet(oldTop, newNode)); // CAS loop
}
public T pop() {
Node<T> oldTop;
Node<T> newTop;
do {
oldTop = top.get();
if (oldTop == null) return null;
newTop = oldTop.next;
} while (!top.compareAndSet(oldTop, newTop)); // CAS loop
return oldTop.value;
}
}
11. Concurrency Bug Patterns and Prevention
11.1 Deadlock
Deadlock Conditions (all 4 must be met)
1. Mutual Exclusion
2. Hold and Wait
3. No Preemption
4. Circular Wait
┌──────────┐ ┌──────────┐
│ Thread A │──Lock1──→│ Thread B │
│ │←──Lock2──│ │
│ (waiting │ │ (waiting │
│ Lock2) │ │ Lock1) │
└──────────┘ └──────────┘
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
# Code that causes deadlock
def thread_1():
with lock_a:
print("Thread 1: Acquired Lock A")
time.sleep(0.1)
with lock_b: # Waiting for Lock B -> deadlock!
print("Thread 1: Acquired Lock B")
def thread_2():
with lock_b:
print("Thread 2: Acquired Lock B")
time.sleep(0.1)
with lock_a: # Waiting for Lock A -> deadlock!
print("Thread 2: Acquired Lock A")
# Solution: consistent lock ordering
def thread_1_fixed():
with lock_a: # Always A -> B order
print("Thread 1: Acquired Lock A")
time.sleep(0.1)
with lock_b:
print("Thread 1: Acquired Lock B")
def thread_2_fixed():
with lock_a: # Always A -> B order (prevents circular wait)
print("Thread 2: Acquired Lock A")
time.sleep(0.1)
with lock_b:
print("Thread 2: Acquired Lock B")
11.2 Race Condition
package main
import (
"fmt"
"sync"
)
func main() {
// Race condition example
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // DATA RACE! read-modify-write is not atomic
}()
}
wg.Wait()
fmt.Println("Counter (unsafe):", counter) // May be less than 1000
// Solution 1: Mutex
counter = 0
var mu sync.Mutex
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println("Counter (mutex):", counter) // Exactly 1000
// Solution 2: Channel
counter = 0
ch := make(chan int, 1000)
for i := 0; i < 1000; i++ {
go func() {
ch <- 1
}()
}
for i := 0; i < 1000; i++ {
counter += <-ch
}
fmt.Println("Counter (channel):", counter) // Exactly 1000
}
11.3 Starvation and Priority Inversion
Starvation
- A specific thread waits indefinitely for resource access
- Solution: Fair Lock, Priority Aging
Priority Inversion
┌─────────────────────────────────────┐
│ High Priority Task <- blocked │
│ | (waiting for Lock) │
│ Medium Priority Task <- running │
│ | (preempts) │
│ Low Priority Task <- holds Lock │
│ │
│ Solution: Priority Inheritance │
│ Low priority task temporarily │
│ inherits high priority when holding │
│ a lock needed by high priority task │
└─────────────────────────────────────┘
12. Structured Concurrency
12.1 Concept
Structured concurrency is a pattern that ties the lifetime of concurrent operations to a code block (scope). When the scope ends, all child tasks are guaranteed to be complete (or cancelled).
Unstructured Concurrency Structured Concurrency
┌────────────────┐ ┌────────────────┐
│ start task A │ │ scope { │
│ start task B │ │ task A │
│ ... (leaked?) │ │ task B │
│ forget task A? │ │ } // A,B done │
└────────────────┘ └────────────────┘
12.2 Python TaskGroup (3.11+)
import asyncio
async def fetch(name: str, delay: float) -> str:
await asyncio.sleep(delay)
if name == "failing":
raise ValueError(f"{name} failed!")
return f"{name}: done"
async def main():
# TaskGroup: structured concurrency
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("api-1", 0.5))
task2 = tg.create_task(fetch("api-2", 1.0))
task3 = tg.create_task(fetch("api-3", 0.8))
# All tasks guaranteed complete when we reach here
print(task1.result(), task2.result(), task3.result())
except* ValueError as eg:
# Error handling with ExceptionGroup
for exc in eg.exceptions:
print(f"Error: {exc}")
asyncio.run(main())
12.3 Java 21 Structured Concurrency
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
public class StructuredConcurrencyDemo {
record User(String name) {}
record Order(String id) {}
static User fetchUser(String userId) throws Exception {
Thread.sleep(500);
return new User("Alice");
}
static Order fetchOrder(String orderId) throws Exception {
Thread.sleep(800);
return new Order("ORD-123");
}
public static void main(String[] args) throws Exception {
// ShutdownOnFailure: cancel remaining if any fails
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<User> userFuture = scope.fork(() -> fetchUser("user-1"));
Future<Order> orderFuture = scope.fork(() -> fetchOrder("order-1"));
scope.join(); // Wait for all tasks
scope.throwIfFailed(); // Propagate failure
// All results available when we reach here
User user = userFuture.resultNow();
Order order = orderFuture.resultNow();
System.out.printf("User: %s, Order: %s%n", user.name(), order.id());
}
}
}
13. Language Concurrency Model Comparison
| Dimension | Go | Rust | Java 21 | Python | Kotlin | JavaScript | Erlang/Elixir | C# |
|---|---|---|---|---|---|---|---|---|
| Model | CSP | Ownership + async | Virtual Thread | asyncio | Coroutine | Event Loop | Actor | Task |
| Lightweight Unit | goroutine | Future | Virtual Thread | coroutine | coroutine | Promise | process | Task |
| Scheduling | M:N (GMP) | Runtime-dependent | M:N | 1:1 (single) | M:N | 1:1 (single) | Preemptive M:N | M:N |
| Shared State | channel | ownership | synchronized | asyncio.Lock | Mutex | none (single) | none (message) | lock |
| Error Propagation | panic/recover | Result | exception | exception | exception | reject | link/monitor | exception |
| Cancellation | context | drop | interrupt | cancel | Job.cancel | AbortController | exit | CancelToken |
| Structured Conc. | errgroup | limited | StructuredTaskScope | TaskGroup | coroutineScope | none | supervisor | none |
| CPU-bound | goroutine | thread/rayon | thread | multiprocessing | Dispatchers.Default | Worker Thread | process | Parallel.ForEach |
| Memory Safety | GC | compile-time | GC | GC | GC | GC | GC | GC |
| Deadlock Prev. | channel design | compile-time | care needed | care needed | care needed | N/A | message-based | care needed |
| Performance | high | very high | high | moderate | high | I/O only | high (BEAM) | high |
| Learning Curve | low | high | medium | low | medium | low | medium | medium |
14. Practical Pattern: Rate Limiter
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Token Bucket Rate Limiter
type RateLimiter struct {
tokens chan struct{}
refillRate time.Duration
ctx context.Context
cancel context.CancelFunc
}
func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
ctx, cancel := context.WithCancel(context.Background())
rl := &RateLimiter{
tokens: make(chan struct{}, maxTokens),
refillRate: refillRate,
ctx: ctx,
cancel: cancel,
}
// Fill tokens
for i := 0; i < maxTokens; i++ {
rl.tokens <- struct{}{}
}
// Periodic refill
go func() {
ticker := time.NewTicker(refillRate)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case rl.tokens <- struct{}{}:
default: // Already full
}
case <-ctx.Done():
return
}
}
}()
return rl
}
func (rl *RateLimiter) Acquire(ctx context.Context) error {
select {
case <-rl.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (rl *RateLimiter) Close() {
rl.cancel()
}
func main() {
limiter := NewRateLimiter(5, 200*time.Millisecond)
defer limiter.Close()
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := limiter.Acquire(ctx); err != nil {
fmt.Printf("Request %d: timed out\n", id)
return
}
fmt.Printf("Request %d: processed at %s\n", id,
time.Now().Format("15:04:05.000"))
}(i)
}
wg.Wait()
}
15. Performance Optimization Tips
15.1 Choosing the Right Concurrency Model
Selection by Workload Type
I/O-bound (network, disk):
-> async/await, coroutine, goroutine
-> Event loop-based is most efficient
-> Thread pools cause excessive context switching
CPU-bound (computation, transformation):
-> OS threads (Python: multiprocessing)
-> Rust: rayon crate
-> Go: goroutines (automatic parallelization)
Mixed:
-> Separate thread pools/dispatchers
-> Kotlin: Dispatchers.IO vs Dispatchers.Default
-> Python: asyncio + ProcessPoolExecutor
15.2 Minimizing Context Switching
import asyncio
import concurrent.futures
import time
# Execute CPU-bound work in separate processes
def cpu_heavy_task(n: int) -> int:
"""Prime counting (CPU-bound)"""
count = 0
for i in range(2, n):
if all(i % j != 0 for j in range(2, int(i**0.5) + 1)):
count += 1
return count
async def main():
loop = asyncio.get_event_loop()
# Separate CPU-bound work with ProcessPoolExecutor
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
start = time.perf_counter()
results = await asyncio.gather(
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
loop.run_in_executor(pool, cpu_heavy_task, 100_000),
)
elapsed = time.perf_counter() - start
print(f"4 tasks completed in {elapsed:.2f}s: {results}")
asyncio.run(main())
16. Quiz
Q1. What is the key difference between concurrency and parallelism?
Concurrency is a structural property of a program -- designing it so multiple tasks can be logically handled at the same time. Parallelism is a property of execution -- multiple tasks physically running at the same time. Concurrency is possible on a single core, but parallelism requires multiple cores.
Q2. In Go's GMP model, what are G, M, and P?
- G (Goroutine): The unit of work to execute. A lightweight thread.
- M (Machine): An OS thread. Responsible for actual execution.
- P (Processor): A logical processor. The scheduler that maps G to M. The number is determined by GOMAXPROCS.
P has a local run queue and uses work stealing to distribute load evenly.
Q3. What 4 conditions must all be met for deadlock to occur?
- Mutual Exclusion: Only one process can use a resource at a time
- Hold and Wait: Holding a resource while waiting for another
- No Preemption: Resources cannot be forcibly taken from a process
- Circular Wait: Circular chain of processes each waiting for the next
Breaking any one prevents deadlock. The most common solution is consistent lock ordering to prevent circular wait.
Q4. What is the core difference between the Actor model and CSP?
- Actor Model: Asynchronous message passing. Each Actor has its own mailbox. Sender specifies the receiver directly. Erlang, Akka.
- CSP: Synchronous channel communication. Channels are independent entities. Sender and receiver are unaware of each other. Go's goroutine + channel.
Actors know "who to send to," while CSP knows "which channel to send to."
Q5. What problems does structured concurrency solve?
Structured concurrency solves:
- Task Leaking: Fire-and-forget tasks that are forgotten
- Error Propagation: Child task errors not reaching the parent
- Cancellation Propagation: Children continuing after parent cancellation
- Lifetime Management: Tasks continuing after their scope has ended
Key implementations include Java 21's StructuredTaskScope, Python's TaskGroup, and Kotlin's coroutineScope.
17. References
- Rob Pike - "Concurrency is not Parallelism" (Go Blog)
- Java 21 Virtual Threads - JEP 444
- Java 21 Structured Concurrency - JEP 453
- Python asyncio Official Documentation
- Go Official Documentation - Effective Go (Concurrency)
- Kotlin Coroutines Official Guide
- Rust async-book (The Async Book)
- Akka Documentation - Actor Model
- Erlang/OTP - Processes
- "The Art of Multiprocessor Programming" - Maurice Herlihy, Nir Shavit
- "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 8)
- "Concurrency in Go" - Katherine Cox-Buday (O'Reilly)
- tokio.rs - Rust Async Runtime
- libuv - Node.js Event Loop Implementation