Skip to content
Published on

Concurrency & Parallel Programming Complete Guide 2025: Async/Await, Thread, Goroutine, Actor Model

Authors

Table of Contents

1. Concurrency vs Parallelism: Rob Pike's Distinction

"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." -- Rob Pike

This single sentence captures the essential difference between concurrency and parallelism.

1.1 Conceptual Distinction

Concurrency is a structural property of a program. It means designing a program so that it can logically handle multiple tasks at the same time. Whether they actually execute simultaneously is irrelevant.

Parallelism is a property of execution. It means multiple tasks physically running at the same time. The classic example is multiple threads running simultaneously on a multi-core CPU.

Concurrency - Structure
┌─────────────────────────────────────────┐
Task A: ██░░██░░░░██░░                │
Task B: ░░██░░██░░░░██                │
Task C: ░░░░░░░░██░░░░██              │
│         ─────────────────→ Time  (Interleaved on a single core)└─────────────────────────────────────────┘

Parallelism - Execution
┌─────────────────────────────────────────┐
Core 1: ████████████████               │
Core 2: ████████████████               │
Core 3: ████████████████               │
│         ─────────────────→ Time  (Simultaneous on multiple cores)└─────────────────────────────────────────┘

1.2 Real-World Analogy

ScenarioConcurrentParallel
1 chef alternating between 3 dishesYesNo
3 chefs each cooking 1 dish simultaneouslyYesYes
1 chef cooking only 1 dishNoNo

1.3 Why the Distinction Matters

Pursuing parallelism without concurrency leads to exploding complexity. The correct approach is to first design proper concurrent structure, then leverage parallel execution for performance.

# Concurrent structure (asyncio) - efficient even on single thread
import asyncio

async def fetch_data(url: str) -> str:
    print(f"Fetching {url}...")
    await asyncio.sleep(1)  # I/O wait simulation
    return f"Data from {url}"

async def main():
    # Concurrency: structurally handling 3 requests simultaneously
    results = await asyncio.gather(
        fetch_data("https://api.example.com/users"),
        fetch_data("https://api.example.com/posts"),
        fetch_data("https://api.example.com/comments"),
    )
    for r in results:
        print(r)

asyncio.run(main())
# Total ~1 second (not 3) - the power of concurrency

2. Threading Models: From OS Threads to Virtual Threads

2.1 OS Threads (Kernel Threads)

Threads managed directly by the operating system. High creation cost (approximately 1MB stack) and context switching overhead.

// Java - OS Threads
public class ThreadExample {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Thread 1: " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Thread 2: " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("Both threads completed");
    }
}

2.2 User Threads (Green Threads)

Lightweight threads managed by the runtime. Multiple user threads are mapped onto OS threads.

Thread Model Comparison
┌───────────────────────────────────────────────┐
1:1 (OS Thread)    N:1 (Green Thread)│ ┌──┐ ┌──┐ ┌──┐    ┌──┐┌──┐┌──┐┌──┐┌──┐     │
│ │UT│ │UT│ │UT│    │GT││GT││GT││GT││GT│     │
│ └┬─┘ └┬─┘ └┬─┘    └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘     │
│  │    │    │        └──┬──┘  │  └──┬──┘       │
│ ┌┴─┐ ┌┴─┐ ┌┴─┐       ┌┴─┐  │     │          │
│ │KT│ │KT│ │KT│       │KT│  │     │          │
│ └──┘ └──┘ └──┘       └──┘  │     │          │
│                              │     │          │
M:N (Hybrid)│ ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐                    │
│ │GT││GT││GT││GT││GT││GT│                    │
│ └┬─┘└┬─┘└┬─┘└┬─┘└┬─┘└┬─┘                    │
│  └─┬─┘ └──┬──┘ └──┬──┘                       │
│   ┌┴─┐   ┌┴─┐   ┌┴─┐                         │
│   │KT│   │KT│   │KT│                         │
│   └──┘   └──┘   └──┘                         │
└───────────────────────────────────────────────┘

2.3 Java 21 Virtual Threads

Virtual threads, officially introduced in Java 21, implement the M:N model. You can create millions of virtual threads.

// Java 21 Virtual Threads
import java.util.concurrent.Executors;
import java.time.Duration;

public class VirtualThreadDemo {
    public static void main(String[] args) throws Exception {
        // Can create 1 million virtual threads!
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < 1_000_000; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    try {
                        Thread.sleep(Duration.ofSeconds(1));
                        if (taskId % 100_000 == 0) {
                            System.out.println("Task " + taskId + " done on " +
                                Thread.currentThread());
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
        }
        System.out.println("All tasks completed");
    }
}

2.4 Thread Model Comparison

PropertyOS ThreadGreen ThreadVirtual Thread (Java 21)
Stack Size~1MB~KB~Few KB (dynamic)
Creation CostHigh (syscall)LowVery Low
Concurrent CountThousandsMillionsMillions
PreemptionOS preemptiveCooperativeCooperative (yields on I/O)
I/O BlockingBlocks threadRisk of blocking allAuto-unmount

3. Coroutines: The Core of Lightweight Concurrency

3.1 Stackful vs Stackless Coroutines

Stackful Coroutine (e.g., Lua, Go's goroutine)
┌──────────────────────────────────┐
Has its own stack                │
Can suspend anywhere in stack    │
Higher memory usage              │
Can yield from nested calls      │
└──────────────────────────────────┘

Stackless Coroutine (e.g., Python, Kotlin, Rust)
┌──────────────────────────────────┐
State stored on heap             │
Can only suspend at top level    │
Lower memory usage               │
Compiler transforms to state machine │
└──────────────────────────────────┘

3.2 Python asyncio Coroutines

import asyncio
import aiohttp
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    """Asynchronous HTTP request"""
    async with session.get(url) as response:
        return {
            "url": url,
            "status": response.status,
            "length": len(await response.text()),
        }

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
    ]

    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        # Execute 5 requests concurrently
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - start
    print(f"5 requests completed: {elapsed:.2f}s")  # ~1s (not 5s)

    for r in results:
        print(f"  {r['url']}: status={r['status']}, length={r['length']}")

asyncio.run(main())

3.3 Kotlin Coroutines

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

suspend fun fetchData(id: Int): String {
    delay(1000) // Async wait (no thread blocking)
    return "Result-$id"
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        // Structured concurrency: coroutineScope manages all child coroutines
        coroutineScope {
            val results = (1..10).map { id ->
                async(Dispatchers.IO) {
                    fetchData(id)
                }
            }
            results.awaitAll().forEach { println(it) }
        }
    }
    println("Completed in ${time}ms") // ~1000ms
}

4. Goroutines and Go's Concurrency Model

4.1 Goroutine Internal Structure

Go's runtime uses M:N scheduling. Three concepts are central: G (Goroutine), M (Machine/OS Thread), and P (Processor).

Go Runtime Scheduler (GMP Model)
┌─────────────────────────────────────────────┐
│                                             │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐      │
│  │ G1   │ │ G2   │ │ G3   │ │ G4...│  └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘      │
│     │        │        │        │            │
│  ┌──┴────────┴──┐  ┌──┴────────┴──┐        │
│  │   P (Local   │  │   P (Local   │        │
│  │    Queue)    │  │    Queue)    │        │
│  └──────┬───────┘  └──────┬───────┘        │
│         │                 │                 │
│  ┌──────┴───────┐  ┌──────┴───────┐        │
│  │ M (OS Thread)│  │ M (OS Thread)│        │
│  └──────────────┘  └──────────────┘        │
│                                             │
Global Run Queue: [G5, G6, G7, ...]└─────────────────────────────────────────────┘

GOMAXPROCS = number of P's (default: CPU core count)

4.2 Basic Goroutine Usage

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg) // goroutine creation - ~2KB stack
    }

    wg.Wait()
    fmt.Println("All workers completed")
}

4.3 Work Stealing

When a P's local queue is empty, it steals goroutines from another P's queue. This distributes load evenly.

Work Stealing Process
┌─────────────┐     ┌─────────────┐
P1 (busy)   │     │ P2 (idle)[G1,G2,G3]  │ ──→ │ []│             │steal│             │
[G1,G2]    │ ──→ │ [G3]└─────────────┘     └─────────────┘

5. Async/Await Pattern Comparison

5.1 JavaScript Promise and Async/Await

// Promise chaining vs Async/Await
// Promise chaining approach
function fetchUserData(userId) {
  return fetch(`/api/users/${userId}`)
    .then(response => response.json())
    .then(user => fetch(`/api/posts?author=${user.id}`))
    .then(response => response.json())
    .then(posts => {
      return { user, posts };
    })
    .catch(error => {
      console.error('Error:', error);
      throw error;
    });
}

// Async/Await approach (much more readable)
async function fetchUserDataAsync(userId) {
  try {
    const userResponse = await fetch(`/api/users/${userId}`);
    const user = await userResponse.json();

    const postsResponse = await fetch(`/api/posts?author=${user.id}`);
    const posts = await postsResponse.json();

    return { user, posts };
  } catch (error) {
    console.error('Error:', error);
    throw error;
  }
}

// Parallel execution
async function fetchMultipleUsers(userIds) {
  const promises = userIds.map(id => fetchUserDataAsync(id));
  return Promise.all(promises); // All execute in parallel
}

// Promise.allSettled - continues even if some fail
async function fetchWithPartialFailure(urls) {
  const results = await Promise.allSettled(
    urls.map(url => fetch(url).then(r => r.json()))
  );

  const successes = results
    .filter(r => r.status === 'fulfilled')
    .map(r => r.value);
  const failures = results
    .filter(r => r.status === 'rejected')
    .map(r => r.reason);

  return { successes, failures };
}

5.2 Rust Async

use tokio;
use reqwest;
use futures::future::join_all;

#[derive(Debug)]
struct ApiResponse {
    url: String,
    status: u16,
    body_length: usize,
}

async fn fetch_url(client: &reqwest::Client, url: &str) -> Result<ApiResponse, reqwest::Error> {
    let response = client.get(url).send().await?;
    let status = response.status().as_u16();
    let body = response.text().await?;

    Ok(ApiResponse {
        url: url.to_string(),
        status,
        body_length: body.len(),
    })
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();
    let urls = vec![
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
    ];

    // Execute all requests concurrently
    let futures: Vec<_> = urls.iter()
        .map(|url| fetch_url(&client, url))
        .collect();

    let results = join_all(futures).await;

    for result in results {
        match result {
            Ok(resp) => println!("{}: status={}, length={}", resp.url, resp.status, resp.body_length),
            Err(e) => eprintln!("Error: {}", e),
        }
    }

    Ok(())
}

5.3 C# Task-Based Asynchrony

using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;

public class AsyncDemo
{
    private static readonly HttpClient _client = new HttpClient();

    public static async Task<string> FetchDataAsync(string url)
    {
        var response = await _client.GetAsync(url);
        response.EnsureSuccessStatusCode();
        return await response.Content.ReadAsStringAsync();
    }

    public static async Task Main()
    {
        var urls = new List<string>
        {
            "https://httpbin.org/get",
            "https://httpbin.org/ip",
            "https://httpbin.org/user-agent",
        };

        // Parallel execution with WhenAll
        var tasks = urls.Select(url => FetchDataAsync(url));
        var results = await Task.WhenAll(tasks);

        for (int i = 0; i < urls.Count; i++)
        {
            Console.WriteLine($"URL: {urls[i]}, Length: {results[i].Length}");
        }

        // Timeout pattern
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
        try
        {
            var data = await _client.GetStringAsync(urls[0], cts.Token);
            Console.WriteLine($"Got data: {data.Length} chars");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Request timed out!");
        }
    }
}

5.4 Async/Await Language Comparison

PropertyJavaScriptPythonRustC#Kotlin
RuntimeV8 Event Loopasynciotokio/async-stdCLR Thread PoolDispatchers
Future TypePromiseCoroutineFuture (lazy)TaskDeferred
CancellationAbortControllerTask.cancel()dropCancellationTokenJob.cancel()
StreamsAsyncIteratorasync forStream traitIAsyncEnumerableFlow
Error Handlingtry/catchtry/exceptResulttry/catchtry/catch
Parallel ExecPromise.allgatherjoin!Task.WhenAllawaitAll
Zero-costNoNoYesNoNo

6. Event Loop Deep Dive

6.1 Node.js libuv Event Loop

Node.js Event Loop Phases
┌───────────────────────────────────┐
│         ┌──────────────┐          │
│    ┌────│   timers     │────┐     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │ pending I/O  │    │     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │  idle/prepare │    │     │
│    │    └──────────────┘    │     │
│    │    ┌──────────────┐    │     │
│    │    │    poll       │◄───┘    │
│    │    └──────────────┘         │
│    │    ┌──────────────┐         │
│    │    │    check      │         │
│    │     (setImmediate)│         │
│    │    └──────────────┘         │
│    │    ┌──────────────┐         │
│    └────│close callbacks│─────────│
│         └──────────────┘         │
│                                   │
Between each phase: microtask    │
queue (Promise, queueMicrotask)└───────────────────────────────────┘
// Understanding Event Loop execution order
console.log('1. Synchronous code');

setTimeout(() => console.log('2. setTimeout (timers)'), 0);

setImmediate(() => console.log('3. setImmediate (check)'));

Promise.resolve().then(() => console.log('4. Promise (microtask)'));

process.nextTick(() => console.log('5. nextTick (microtask - highest priority)'));

console.log('6. End of synchronous code');

// Output order:
// 1. Synchronous code
// 6. End of synchronous code
// 5. nextTick (microtask - highest priority)
// 4. Promise (microtask)
// 2. setTimeout (timers)
// 3. setImmediate (check)

6.2 Python asyncio Event Loop

import asyncio
import time

async def cpu_bound_simulation(name: str, duration: float):
    """CPU-bound work simulation"""
    print(f"[{name}] Starting")
    # Without await yield, the event loop would be blocked
    await asyncio.sleep(duration)
    print(f"[{name}] Complete ({duration}s)")
    return f"{name}: done"

async def producer(queue: asyncio.Queue, n: int):
    """Producer: add items to queue"""
    for i in range(n):
        await asyncio.sleep(0.1)
        item = f"item-{i}"
        await queue.put(item)
        print(f"  Produced: {item}")
    await queue.put(None)  # Sentinel value

async def consumer(queue: asyncio.Queue, name: str):
    """Consumer: consume items from queue"""
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # Pass sentinel to other consumers
            break
        await asyncio.sleep(0.2)  # Processing simulation
        print(f"  {name} consumed: {item}")
        queue.task_done()

async def main():
    # 1. Concurrent execution
    start = time.perf_counter()
    results = await asyncio.gather(
        cpu_bound_simulation("TaskA", 1.0),
        cpu_bound_simulation("TaskB", 0.5),
        cpu_bound_simulation("TaskC", 0.8),
    )
    elapsed = time.perf_counter() - start
    print(f"Total time: {elapsed:.2f}s")  # ~1s
    print(f"Results: {results}")

    # 2. Producer-consumer pattern
    print("\n--- Producer-Consumer ---")
    queue = asyncio.Queue(maxsize=5)
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
    )

asyncio.run(main())

7. Actor Model: Concurrency Without Shared State

7.1 Actor Model Concepts

Actor Model
┌─────────────────────────────────────────────────┐
│                                                 │
│  ┌─────────┐  message  ┌─────────┐             │
│  │ Actor A │ ───────→  │ Actor B │             │
│  │ [State] │           │ [State] │             │
│  │ [Mailbox][Mailbox]│  └─────────┘           └────┬────┘             │
│       ↑                     │                   │
│       │       message       │ spawn new Actor│       └─────────────────────┘                   │
│                              ↓                   │
│                        ┌─────────┐              │
│                        │ Actor C │              │
│                        │ [State] │              │
│                        └─────────┘              │
│                                                 │
Rules:1. Communicate only via messages (no shared state)2. Process one message at a time               │
3. Can spawn new Actors4. Can only modify own state                   │
└─────────────────────────────────────────────────┘

7.2 Erlang/Elixir Actor

%% Erlang - Counter Actor
-module(counter).
-export([start/0, increment/1, get/1, loop/1]).

start() ->
    spawn(fun() -> loop(0) end).

increment(Pid) ->
    Pid ! {increment, 1}.

get(Pid) ->
    Pid ! {get, self()},
    receive
        {count, Value} -> Value
    after 5000 ->
        timeout
    end.

loop(Count) ->
    receive
        {increment, N} ->
            loop(Count + N);
        {get, From} ->
            From ! {count, Count},
            loop(Count);
        stop ->
            ok
    end.

%% Usage:
%% Pid = counter:start().
%% counter:increment(Pid).
%% counter:increment(Pid).
%% counter:get(Pid). %% => 2

7.3 Akka (Scala) Actor

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors

// Message definitions
sealed trait Command
case class Deposit(amount: Double) extends Command
case class Withdraw(amount: Double) extends Command
case class GetBalance(replyTo: ActorRef[Balance]) extends Command
case class Balance(amount: Double)

// Bank Account Actor
object BankAccount {
  def apply(balance: Double = 0.0): Behavior[Command] =
    Behaviors.receive { (context, message) =>
      message match {
        case Deposit(amount) =>
          val newBalance = balance + amount
          context.log.info(s"Deposited $amount, balance: $newBalance")
          BankAccount(newBalance)

        case Withdraw(amount) =>
          if (amount <= balance) {
            val newBalance = balance - amount
            context.log.info(s"Withdrew $amount, balance: $newBalance")
            BankAccount(newBalance)
          } else {
            context.log.warn(s"Insufficient funds: $balance < $amount")
            Behaviors.same
          }

        case GetBalance(replyTo) =>
          replyTo ! Balance(balance)
          Behaviors.same
      }
    }
}

// Supervision (fault recovery)
object BankSupervisor {
  def apply(): Behavior[Command] =
    Behaviors.supervise(BankAccount())
      .onFailure(SupervisorStrategy.restart)
}

7.4 Actor Model Pros and Cons

ProsCons
No shared state (prevents race conditions)Can be difficult to debug
Location transparency (distributable)Limited message ordering guarantees
Fault isolation (let it crash)Potential mailbox overflow
Good scalabilitySynchronous request-response is complex

8. CSP: Go's Channel-Based Concurrency

8.1 Channel Basics

package main

import (
    "fmt"
    "time"
)

func main() {
    // Unbuffered channel (synchronous)
    ch := make(chan string)

    go func() {
        time.Sleep(time.Second)
        ch <- "Hello from goroutine!"
    }()

    msg := <-ch // Blocks until message arrives
    fmt.Println(msg)

    // Buffered channel (asynchronous)
    buffered := make(chan int, 3)
    buffered <- 1
    buffered <- 2
    buffered <- 3
    // buffered <- 4 // Would block - buffer full!

    fmt.Println(<-buffered) // 1
    fmt.Println(<-buffered) // 2
}

8.2 Fan-In / Fan-Out Pattern

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Fan-Out: distribute one input to multiple workers
func fanOut(input <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan int)
        channels[i] = ch
        go func(workerID int, out chan<- int) {
            defer close(out)
            for val := range input {
                // Processing simulation
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                out <- val * val
                fmt.Printf("  Worker %d processed %d\n", workerID, val)
            }
        }(i, ch)
    }
    return channels
}

// Fan-In: merge results from multiple channels into one
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                merged <- val
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // Input channel
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 20; i++ {
            input <- i
        }
    }()

    // Fan-Out: 4 workers
    workerChannels := fanOut(input, 4)

    // Fan-In: merge results
    results := fanIn(workerChannels...)

    // Collect results
    var sum int
    for result := range results {
        sum += result
    }
    fmt.Printf("Sum of squares: %d\n", sum)
}

8.3 Select Statement

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from channel 1"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from channel 2"
    }()

    // select: choose whichever channel is ready
    for i := 0; i < 2; i++ {
        select {
        case msg := <-ch1:
            fmt.Println("Received", msg)
        case msg := <-ch2:
            fmt.Println("Received", msg)
        case <-time.After(3 * time.Second):
            fmt.Println("Timeout!")
        }
    }

    // Non-blocking select
    ch := make(chan int, 1)
    select {
    case val := <-ch:
        fmt.Println("Got:", val)
    default:
        fmt.Println("Channel empty, moving on")
    }
}

9. Synchronization Primitives

9.1 Mutex and RWLock

package main

import (
    "fmt"
    "sync"
)

// Mutex: mutual exclusion
type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.v[key]++
}

func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.v[key]
}

// RWMutex: concurrent reads, exclusive writes
type SafeCache struct {
    mu    sync.RWMutex
    store map[string]string
}

func (c *SafeCache) Get(key string) (string, bool) {
    c.mu.RLock()         // Read lock (multiple goroutines can read concurrently)
    defer c.mu.RUnlock()
    val, ok := c.store[key]
    return val, ok
}

func (c *SafeCache) Set(key, value string) {
    c.mu.Lock()          // Write lock (exclusive)
    defer c.mu.Unlock()
    c.store[key] = value
}

func main() {
    counter := SafeCounter{v: make(map[string]int)}
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc("key")
        }()
    }
    wg.Wait()
    fmt.Println("Final count:", counter.Value("key")) // Exactly 1000
}

9.2 Semaphore

import asyncio

async def worker(sem: asyncio.Semaphore, task_id: int):
    async with sem:  # Max 3 concurrent executions
        print(f"Task {task_id}: started (slot acquired)")
        await asyncio.sleep(1)
        print(f"Task {task_id}: completed (slot released)")

async def main():
    sem = asyncio.Semaphore(3)  # Limit to 3 concurrent
    tasks = [worker(sem, i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

9.3 Condition Variable

import threading
import time
import random

class BoundedBuffer:
    def __init__(self, capacity: int):
        self.buffer = []
        self.capacity = capacity
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)

    def produce(self, item):
        with self.not_full:
            while len(self.buffer) >= self.capacity:
                self.not_full.wait()  # Wait until buffer has space
            self.buffer.append(item)
            self.not_empty.notify()  # Notify consumer

    def consume(self):
        with self.not_empty:
            while len(self.buffer) == 0:
                self.not_empty.wait()  # Wait until item available
            item = self.buffer.pop(0)
            self.not_full.notify()  # Notify producer
            return item

buf = BoundedBuffer(5)

def producer():
    for i in range(20):
        buf.produce(i)
        print(f"Produced: {i}")
        time.sleep(random.uniform(0.01, 0.1))

def consumer(name):
    for _ in range(10):
        item = buf.consume()
        print(f"{name} consumed: {item}")
        time.sleep(random.uniform(0.05, 0.15))

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("C1",))
t3 = threading.Thread(target=consumer, args=("C2",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

9.4 Atomic Operations

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class AtomicDemo {
    // AtomicLong - lock-free counter
    private static final AtomicLong counter = new AtomicLong(0);

    // AtomicReference - lock-free reference update
    private static final AtomicReference<String> config =
        new AtomicReference<>("default");

    public static void main(String[] args) throws InterruptedException {
        // CAS (Compare-And-Swap) loop
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10_000; j++) {
                    counter.incrementAndGet(); // Atomic increment
                }
            });
            threads[i].start();
        }

        for (Thread t : threads) t.join();
        System.out.println("Counter: " + counter.get()); // Exactly 100,000

        // Direct CAS usage
        boolean updated = config.compareAndSet("default", "production");
        System.out.println("Config updated: " + updated); // true
        System.out.println("Config value: " + config.get()); // production
    }
}

10. Lock-Free Data Structures

10.1 CAS (Compare-And-Swap)

CAS is the foundation of lock-free programming. It is an atomic operation that says "if the current value equals the expected value, replace it with the new value."

CAS Operation
┌──────────────────────────────────────┐
CAS(address, expected, new_value)│                                      │
if *address == expected:*address = new_value             │
return true   // success         │
else:return false  // fail -> retry   │
│                                      │
 (entire operation is hardware-atomic)└──────────────────────────────────────┘

10.2 The ABA Problem

ABA Problem
Thread 1: reads value A -> (suspended)
Thread 2: changes A -> B
Thread 3: changes B -> A
Thread 1: (resumes) CAS(A, A, C) -> success! (but A is not the original A)

Solution: add version counter
AtomicStampedReference (Java)
TaggedPointer (C/C++)

10.3 Lock-Free Stack (Treiber Stack)

import java.util.concurrent.atomic.AtomicReference;

public class LockFreeStack<T> {
    private static class Node<T> {
        final T value;
        Node<T> next;

        Node(T value) {
            this.value = value;
        }
    }

    private final AtomicReference<Node<T>> top = new AtomicReference<>(null);

    public void push(T value) {
        Node<T> newNode = new Node<>(value);
        Node<T> oldTop;
        do {
            oldTop = top.get();
            newNode.next = oldTop;
        } while (!top.compareAndSet(oldTop, newNode)); // CAS loop
    }

    public T pop() {
        Node<T> oldTop;
        Node<T> newTop;
        do {
            oldTop = top.get();
            if (oldTop == null) return null;
            newTop = oldTop.next;
        } while (!top.compareAndSet(oldTop, newTop)); // CAS loop
        return oldTop.value;
    }
}

11. Concurrency Bug Patterns and Prevention

11.1 Deadlock

Deadlock Conditions (all 4 must be met)
1. Mutual Exclusion
2. Hold and Wait
3. No Preemption
4. Circular Wait

┌──────────┐          ┌──────────┐
Thread A │──Lock1──→│ Thread B│          │←──Lock2──│          │
 (waiting │           (waiting │
Lock2)  │          │  Lock1)└──────────┘          └──────────┘
import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

# Code that causes deadlock
def thread_1():
    with lock_a:
        print("Thread 1: Acquired Lock A")
        time.sleep(0.1)
        with lock_b:  # Waiting for Lock B -> deadlock!
            print("Thread 1: Acquired Lock B")

def thread_2():
    with lock_b:
        print("Thread 2: Acquired Lock B")
        time.sleep(0.1)
        with lock_a:  # Waiting for Lock A -> deadlock!
            print("Thread 2: Acquired Lock A")

# Solution: consistent lock ordering
def thread_1_fixed():
    with lock_a:     # Always A -> B order
        print("Thread 1: Acquired Lock A")
        time.sleep(0.1)
        with lock_b:
            print("Thread 1: Acquired Lock B")

def thread_2_fixed():
    with lock_a:     # Always A -> B order (prevents circular wait)
        print("Thread 2: Acquired Lock A")
        time.sleep(0.1)
        with lock_b:
            print("Thread 2: Acquired Lock B")

11.2 Race Condition

package main

import (
    "fmt"
    "sync"
)

func main() {
    // Race condition example
    counter := 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // DATA RACE! read-modify-write is not atomic
        }()
    }
    wg.Wait()
    fmt.Println("Counter (unsafe):", counter) // May be less than 1000

    // Solution 1: Mutex
    counter = 0
    var mu sync.Mutex
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counter++
            mu.Unlock()
        }()
    }
    wg.Wait()
    fmt.Println("Counter (mutex):", counter) // Exactly 1000

    // Solution 2: Channel
    counter = 0
    ch := make(chan int, 1000)
    for i := 0; i < 1000; i++ {
        go func() {
            ch <- 1
        }()
    }
    for i := 0; i < 1000; i++ {
        counter += <-ch
    }
    fmt.Println("Counter (channel):", counter) // Exactly 1000
}

11.3 Starvation and Priority Inversion

Starvation
- A specific thread waits indefinitely for resource access
- Solution: Fair Lock, Priority Aging

Priority Inversion
┌─────────────────────────────────────┐
High Priority Task   <- blocked     │
| (waiting for Lock)Medium Priority Task <- running     │
| (preempts)Low Priority Task    <- holds Lock│                                     │
Solution: Priority InheritanceLow priority task temporarily       │
│ inherits high priority when holding │
│ a lock needed by high priority task │
└─────────────────────────────────────┘

12. Structured Concurrency

12.1 Concept

Structured concurrency is a pattern that ties the lifetime of concurrent operations to a code block (scope). When the scope ends, all child tasks are guaranteed to be complete (or cancelled).

Unstructured Concurrency     Structured Concurrency
┌────────────────┐         ┌────────────────┐
│ start task A   │         │ scope {│ start task B   │         │   task A... (leaked?)  │         │   task B│ forget task A? │         │ } // A,B done  │
└────────────────┘         └────────────────┘

12.2 Python TaskGroup (3.11+)

import asyncio

async def fetch(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    if name == "failing":
        raise ValueError(f"{name} failed!")
    return f"{name}: done"

async def main():
    # TaskGroup: structured concurrency
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(fetch("api-1", 0.5))
            task2 = tg.create_task(fetch("api-2", 1.0))
            task3 = tg.create_task(fetch("api-3", 0.8))
        # All tasks guaranteed complete when we reach here
        print(task1.result(), task2.result(), task3.result())
    except* ValueError as eg:
        # Error handling with ExceptionGroup
        for exc in eg.exceptions:
            print(f"Error: {exc}")

asyncio.run(main())

12.3 Java 21 Structured Concurrency

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class StructuredConcurrencyDemo {

    record User(String name) {}
    record Order(String id) {}

    static User fetchUser(String userId) throws Exception {
        Thread.sleep(500);
        return new User("Alice");
    }

    static Order fetchOrder(String orderId) throws Exception {
        Thread.sleep(800);
        return new Order("ORD-123");
    }

    public static void main(String[] args) throws Exception {
        // ShutdownOnFailure: cancel remaining if any fails
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Future<User> userFuture = scope.fork(() -> fetchUser("user-1"));
            Future<Order> orderFuture = scope.fork(() -> fetchOrder("order-1"));

            scope.join();           // Wait for all tasks
            scope.throwIfFailed();  // Propagate failure

            // All results available when we reach here
            User user = userFuture.resultNow();
            Order order = orderFuture.resultNow();
            System.out.printf("User: %s, Order: %s%n", user.name(), order.id());
        }
    }
}

13. Language Concurrency Model Comparison

DimensionGoRustJava 21PythonKotlinJavaScriptErlang/ElixirC#
ModelCSPOwnership + asyncVirtual ThreadasyncioCoroutineEvent LoopActorTask
Lightweight UnitgoroutineFutureVirtual ThreadcoroutinecoroutinePromiseprocessTask
SchedulingM:N (GMP)Runtime-dependentM:N1:1 (single)M:N1:1 (single)Preemptive M:NM:N
Shared Statechannelownershipsynchronizedasyncio.LockMutexnone (single)none (message)lock
Error Propagationpanic/recoverResultexceptionexceptionexceptionrejectlink/monitorexception
CancellationcontextdropinterruptcancelJob.cancelAbortControllerexitCancelToken
Structured Conc.errgrouplimitedStructuredTaskScopeTaskGroupcoroutineScopenonesupervisornone
CPU-boundgoroutinethread/rayonthreadmultiprocessingDispatchers.DefaultWorker ThreadprocessParallel.ForEach
Memory SafetyGCcompile-timeGCGCGCGCGCGC
Deadlock Prev.channel designcompile-timecare neededcare neededcare neededN/Amessage-basedcare needed
Performancehighvery highhighmoderatehighI/O onlyhigh (BEAM)high
Learning Curvelowhighmediumlowmediumlowmediummedium

14. Practical Pattern: Rate Limiter

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Token Bucket Rate Limiter
type RateLimiter struct {
    tokens     chan struct{}
    refillRate time.Duration
    ctx        context.Context
    cancel     context.CancelFunc
}

func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
    ctx, cancel := context.WithCancel(context.Background())
    rl := &RateLimiter{
        tokens:     make(chan struct{}, maxTokens),
        refillRate: refillRate,
        ctx:        ctx,
        cancel:     cancel,
    }

    // Fill tokens
    for i := 0; i < maxTokens; i++ {
        rl.tokens <- struct{}{}
    }

    // Periodic refill
    go func() {
        ticker := time.NewTicker(refillRate)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case rl.tokens <- struct{}{}:
                default: // Already full
                }
            case <-ctx.Done():
                return
            }
        }
    }()

    return rl
}

func (rl *RateLimiter) Acquire(ctx context.Context) error {
    select {
    case <-rl.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (rl *RateLimiter) Close() {
    rl.cancel()
}

func main() {
    limiter := NewRateLimiter(5, 200*time.Millisecond)
    defer limiter.Close()

    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
            defer cancel()

            if err := limiter.Acquire(ctx); err != nil {
                fmt.Printf("Request %d: timed out\n", id)
                return
            }
            fmt.Printf("Request %d: processed at %s\n", id,
                time.Now().Format("15:04:05.000"))
        }(i)
    }
    wg.Wait()
}

15. Performance Optimization Tips

15.1 Choosing the Right Concurrency Model

Selection by Workload Type

I/O-bound (network, disk):
  -> async/await, coroutine, goroutine
  -> Event loop-based is most efficient
  -> Thread pools cause excessive context switching

CPU-bound (computation, transformation):
  -> OS threads (Python: multiprocessing)
  -> Rust: rayon crate
  -> Go: goroutines (automatic parallelization)

Mixed:
  -> Separate thread pools/dispatchers
  -> Kotlin: Dispatchers.IO vs Dispatchers.Default
  -> Python: asyncio + ProcessPoolExecutor

15.2 Minimizing Context Switching

import asyncio
import concurrent.futures
import time

# Execute CPU-bound work in separate processes
def cpu_heavy_task(n: int) -> int:
    """Prime counting (CPU-bound)"""
    count = 0
    for i in range(2, n):
        if all(i % j != 0 for j in range(2, int(i**0.5) + 1)):
            count += 1
    return count

async def main():
    loop = asyncio.get_event_loop()

    # Separate CPU-bound work with ProcessPoolExecutor
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
        start = time.perf_counter()
        results = await asyncio.gather(
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
            loop.run_in_executor(pool, cpu_heavy_task, 100_000),
        )
        elapsed = time.perf_counter() - start
        print(f"4 tasks completed in {elapsed:.2f}s: {results}")

asyncio.run(main())

16. Quiz

Q1. What is the key difference between concurrency and parallelism?

Concurrency is a structural property of a program -- designing it so multiple tasks can be logically handled at the same time. Parallelism is a property of execution -- multiple tasks physically running at the same time. Concurrency is possible on a single core, but parallelism requires multiple cores.

Q2. In Go's GMP model, what are G, M, and P?
  • G (Goroutine): The unit of work to execute. A lightweight thread.
  • M (Machine): An OS thread. Responsible for actual execution.
  • P (Processor): A logical processor. The scheduler that maps G to M. The number is determined by GOMAXPROCS.

P has a local run queue and uses work stealing to distribute load evenly.

Q3. What 4 conditions must all be met for deadlock to occur?
  1. Mutual Exclusion: Only one process can use a resource at a time
  2. Hold and Wait: Holding a resource while waiting for another
  3. No Preemption: Resources cannot be forcibly taken from a process
  4. Circular Wait: Circular chain of processes each waiting for the next

Breaking any one prevents deadlock. The most common solution is consistent lock ordering to prevent circular wait.

Q4. What is the core difference between the Actor model and CSP?
  • Actor Model: Asynchronous message passing. Each Actor has its own mailbox. Sender specifies the receiver directly. Erlang, Akka.
  • CSP: Synchronous channel communication. Channels are independent entities. Sender and receiver are unaware of each other. Go's goroutine + channel.

Actors know "who to send to," while CSP knows "which channel to send to."

Q5. What problems does structured concurrency solve?

Structured concurrency solves:

  1. Task Leaking: Fire-and-forget tasks that are forgotten
  2. Error Propagation: Child task errors not reaching the parent
  3. Cancellation Propagation: Children continuing after parent cancellation
  4. Lifetime Management: Tasks continuing after their scope has ended

Key implementations include Java 21's StructuredTaskScope, Python's TaskGroup, and Kotlin's coroutineScope.


17. References

  1. Rob Pike - "Concurrency is not Parallelism" (Go Blog)
  2. Java 21 Virtual Threads - JEP 444
  3. Java 21 Structured Concurrency - JEP 453
  4. Python asyncio Official Documentation
  5. Go Official Documentation - Effective Go (Concurrency)
  6. Kotlin Coroutines Official Guide
  7. Rust async-book (The Async Book)
  8. Akka Documentation - Actor Model
  9. Erlang/OTP - Processes
  10. "The Art of Multiprocessor Programming" - Maurice Herlihy, Nir Shavit
  11. "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 8)
  12. "Concurrency in Go" - Katherine Cox-Buday (O'Reilly)
  13. tokio.rs - Rust Async Runtime
  14. libuv - Node.js Event Loop Implementation