Skip to content
Published on

Python Best Practices & Advanced Patterns Guide — Write Python Like a Pro

Authors

Table of Contents

  1. Project Structure
  2. Type Hints
  3. Decorator Patterns
  4. Context Managers
  5. Generators and Iterators
  6. Async Programming
  7. Data Classes
  8. Design Patterns
  9. Performance Optimization
  10. Testing

1. Project Structure

1.1 The src Layout

The src layout is the modern standard for Python projects. Placing source code under a src/ directory prevents accidental imports from an uninstalled package.

my-project/
  src/
    my_package/
      __init__.py
      core.py
      utils.py
  tests/
    test_core.py
    test_utils.py
  pyproject.toml
  README.md

1.2 pyproject.toml

setup.py and setup.cfg are legacy. Consolidate build system, dependencies, and tool configuration into a single pyproject.toml.

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "my-package"
version = "0.1.0"
description = "My awesome package"
requires-python = ">=3.11"
dependencies = [
    "httpx>=0.27",
    "pydantic>=2.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0",
    "mypy>=1.8",
    "ruff>=0.3",
]

[tool.ruff]
line-length = 88
target-version = "py311"

[tool.mypy]
strict = true
python_version = "3.11"

1.3 Virtual Environments - venv vs uv

venv (built-in):

python -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"

uv (Rust-based, extremely fast):

uv venv
uv pip install -e ".[dev]"
# Or use lockfile-based installation
uv sync

uv is 10 to 100 times faster than pip. For any project started after 2024, uv is strongly recommended.


2. Type Hints

2.1 Basic Type Hints

Python 3.10 and above allows built-in types directly as annotations.

# Python 3.10+ - no typing import needed
def greet(name: str) -> str:
    return f"Hello, {name}!"

def process_items(items: list[str]) -> dict[str, int]:
    return {item: len(item) for item in items}

# Union types use the | operator
def parse_value(value: str | int | None) -> str:
    if value is None:
        return "empty"
    return str(value)

2.2 TypeVar and Generic

Use these when building generic functions and classes.

from typing import TypeVar, Generic

T = TypeVar("T")

class Stack(Generic[T]):
    def __init__(self) -> None:
        self._items: list[T] = []

    def push(self, item: T) -> None:
        self._items.append(item)

    def pop(self) -> T:
        if not self._items:
            raise IndexError("Stack is empty")
        return self._items.pop()

    def peek(self) -> T:
        if not self._items:
            raise IndexError("Stack is empty")
        return self._items[-1]

# Usage
int_stack: Stack[int] = Stack()
int_stack.push(42)
str_stack: Stack[str] = Stack()
str_stack.push("hello")

2.3 Protocol - Structural Subtyping

Express duck typing through the type system. Define interfaces without requiring inheritance.

from typing import Protocol, runtime_checkable

@runtime_checkable
class Drawable(Protocol):
    def draw(self) -> str: ...

class Circle:
    def draw(self) -> str:
        return "Drawing circle"

class Square:
    def draw(self) -> str:
        return "Drawing square"

def render(shape: Drawable) -> None:
    print(shape.draw())

# Circle does not inherit Drawable, but it has draw(), so it is compatible
render(Circle())  # OK
render(Square())  # OK

2.4 Practical mypy Configuration

[tool.mypy]
strict = true
python_version = "3.11"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true

[[tool.mypy.overrides]]
module = "tests.*"
disallow_untyped_defs = false
# Check the whole project
mypy src/
# Check a specific file
mypy src/my_package/core.py

3. Decorator Patterns

3.1 Using functools.wraps

Always use functools.wraps when writing decorators. It preserves the original function's name and docstring.

import functools
import time
from typing import Callable, ParamSpec, TypeVar

P = ParamSpec("P")
R = TypeVar("R")

def timer(func: Callable[P, R]) -> Callable[P, R]:
    """Decorator that measures function execution time"""
    @functools.wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} took {elapsed:.4f}s")
        return result
    return wrapper

@timer
def slow_function(n: int) -> int:
    """Sum up to n"""
    return sum(range(n))

slow_function(1_000_000)
# Output: slow_function took 0.0312s

3.2 Parameterized Decorators

def retry(max_attempts: int = 3, delay: float = 1.0):
    """Decorator that retries on failure"""
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            last_exception: Exception | None = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    print(f"Attempt {attempt} failed: {e}")
                    if attempt < max_attempts:
                        time.sleep(delay)
            raise last_exception  # type: ignore
        return wrapper
    return decorator

@retry(max_attempts=5, delay=2.0)
def fetch_data(url: str) -> dict:
    # Network request logic
    ...

3.3 Class-Based Decorators

class CacheResult:
    """Class decorator that caches results"""
    def __init__(self, ttl_seconds: int = 300):
        self.ttl = ttl_seconds
        self.cache: dict[str, tuple[float, object]] = {}

    def __call__(self, func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            key = str(args) + str(kwargs)
            now = time.time()
            if key in self.cache:
                cached_time, cached_result = self.cache[key]
                if now - cached_time < self.ttl:
                    return cached_result  # type: ignore
            result = func(*args, **kwargs)
            self.cache[key] = (now, result)
            return result
        return wrapper

@CacheResult(ttl_seconds=60)
def expensive_computation(x: int) -> int:
    time.sleep(2)  # Simulating expensive operation
    return x ** 2

4. Context Managers

4.1 How the with Statement Works

Context managers automatically handle resource acquisition and release. They implement the __enter__ and __exit__ magic methods.

class DatabaseConnection:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.connection = None

    def __enter__(self):
        print(f"Connecting to {self.connection_string}")
        self.connection = self._connect()
        return self.connection

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.connection:
            self.connection.close()
            print("Connection closed")
        # Returning False propagates the exception
        return False

    def _connect(self):
        # Actual connection logic
        ...

with DatabaseConnection("postgresql://localhost/mydb") as conn:
    conn.execute("SELECT 1")
# Connection is automatically closed when the block exits

4.2 The contextmanager Decorator

For simple context managers, use contextlib.contextmanager.

from contextlib import contextmanager
import os

@contextmanager
def temporary_directory(path: str):
    """Create a temporary directory and remove it after use"""
    os.makedirs(path, exist_ok=True)
    try:
        yield path
    finally:
        import shutil
        shutil.rmtree(path)

with temporary_directory("/tmp/work") as tmpdir:
    # Work with tmpdir
    print(f"Working in {tmpdir}")
# Directory is automatically removed when the block ends

4.3 Async Context Managers

from contextlib import asynccontextmanager
import aiohttp

@asynccontextmanager
async def http_session():
    """Manage aiohttp session lifecycle"""
    session = aiohttp.ClientSession()
    try:
        yield session
    finally:
        await session.close()

async def fetch_url(url: str) -> str:
    async with http_session() as session:
        async with session.get(url) as response:
            return await response.text()

5. Generators and Iterators

5.1 Generator Basics

Generators use lazy evaluation for memory-efficient processing.

# List: loads all data into memory
numbers_list = [x ** 2 for x in range(10_000_000)]  # ~80MB

# Generator: produces values one at a time, almost no memory
numbers_gen = (x ** 2 for x in range(10_000_000))  # ~120B
def read_large_file(filepath: str, chunk_size: int = 8192):
    """Read a large file in chunks"""
    with open(filepath, "r") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

# Process multi-GB files without memory concerns
for chunk in read_large_file("huge_log.txt"):
    process(chunk)

5.2 yield from

Use this to compose or delegate generators.

def flatten(nested: list) -> list:
    """Flatten a nested list"""
    for item in nested:
        if isinstance(item, list):
            yield from flatten(item)
        else:
            yield item

data = [1, [2, 3], [4, [5, 6]], 7]
print(list(flatten(data)))
# Output: [1, 2, 3, 4, 5, 6, 7]
def chain(*iterables):
    """Chain multiple iterables into one"""
    for iterable in iterables:
        yield from iterable

result = list(chain([1, 2], [3, 4], [5, 6]))
# Output: [1, 2, 3, 4, 5, 6]

5.3 Building Pipelines with Generators

import csv
from typing import Iterator

def read_csv_rows(filename: str) -> Iterator[dict]:
    """Read CSV file row by row"""
    with open(filename) as f:
        reader = csv.DictReader(f)
        yield from reader

def filter_active(rows: Iterator[dict]) -> Iterator[dict]:
    """Filter active users only"""
    for row in rows:
        if row["status"] == "active":
            yield row

def extract_emails(rows: Iterator[dict]) -> Iterator[str]:
    """Extract email field"""
    for row in rows:
        yield row["email"]

# Pipeline: file -> filter -> transform
pipeline = extract_emails(filter_active(read_csv_rows("users.csv")))
for email in pipeline:
    send_newsletter(email)

6. Async Programming

6.1 asyncio Basics

import asyncio

async def fetch_data(url: str, delay: float) -> str:
    print(f"Fetching {url}...")
    await asyncio.sleep(delay)  # Simulating network request
    return f"Data from {url}"

async def main():
    # Sequential execution: 3 seconds
    result1 = await fetch_data("api/users", 1.0)
    result2 = await fetch_data("api/posts", 1.0)
    result3 = await fetch_data("api/comments", 1.0)

    # Parallel execution: 1 second
    results = await asyncio.gather(
        fetch_data("api/users", 1.0),
        fetch_data("api/posts", 1.0),
        fetch_data("api/comments", 1.0),
    )
    print(results)

asyncio.run(main())

6.2 Async HTTP with aiohttp

import aiohttp
import asyncio

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def fetch_all_users(user_ids: list[int]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_url(session, f"https://api.example.com/users/{uid}")
            for uid in user_ids
        ]
        return await asyncio.gather(*tasks)

# Fetch 100 user profiles in parallel
users = asyncio.run(fetch_all_users(list(range(1, 101))))

6.3 Limiting Concurrency with Semaphore

async def rate_limited_fetch(
    urls: list[str],
    max_concurrent: int = 10,
) -> list[str]:
    """Limit the number of concurrent requests"""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_with_limit(session: aiohttp.ClientSession, url: str) -> str:
        async with semaphore:
            async with session.get(url) as resp:
                return await resp.text()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url) for url in urls]
        return await asyncio.gather(*tasks)

6.4 Event Loop and TaskGroup (Python 3.11+)

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data("api/users", 1.0))
        task2 = tg.create_task(fetch_data("api/posts", 1.0))
        task3 = tg.create_task(fetch_data("api/comments", 1.0))

    # All tasks are complete by this point
    print(task1.result(), task2.result(), task3.result())

TaskGroup, introduced in Python 3.11, offers better error handling than asyncio.gather. If any task raises an exception, the remaining tasks are automatically cancelled.


7. Data Classes

7.1 dataclass

from dataclasses import dataclass, field

@dataclass
class User:
    name: str
    email: str
    age: int
    tags: list[str] = field(default_factory=list)
    is_active: bool = True

    def display_name(self) -> str:
        return f"{self.name} ({self.email})"

user = User(name="Kim", email="kim@example.com", age=30)
print(user)
# User(name='Kim', email='kim@example.com', age=30, tags=[], is_active=True)

Immutable dataclass:

@dataclass(frozen=True)
class Point:
    x: float
    y: float

p = Point(1.0, 2.0)
# p.x = 3.0  # Raises FrozenInstanceError!

7.2 NamedTuple

A lightweight, fast choice for immutable data.

from typing import NamedTuple

class Coordinate(NamedTuple):
    latitude: float
    longitude: float
    altitude: float = 0.0

coord = Coordinate(37.5665, 126.9780)
lat, lng, alt = coord  # Supports unpacking
print(coord.latitude)  # Attribute access

7.3 Pydantic - Data Validation

For external input, Pydantic is the best choice. It provides automatic validation, serialization, and documentation.

from pydantic import BaseModel, EmailStr, Field, field_validator

class UserCreate(BaseModel):
    name: str = Field(min_length=1, max_length=100)
    email: EmailStr
    age: int = Field(ge=0, le=150)
    tags: list[str] = []

    @field_validator("name")
    @classmethod
    def name_must_be_capitalized(cls, v: str) -> str:
        if not v[0].isupper():
            raise ValueError("Name must start with uppercase")
        return v

# Valid input
user = UserCreate(name="Kim", email="kim@example.com", age=30)
print(user.model_dump_json())

# Invalid input -> ValidationError
try:
    bad_user = UserCreate(name="", email="not-an-email", age=-5)
except Exception as e:
    print(e)

7.4 When to Use What

ScenarioRecommendation
Internal data transferdataclass
Immutable value objectsNamedTuple or frozen dataclass
API I/O, external dataPydantic BaseModel
Config file parsingPydantic BaseSettings
DB modelsSQLAlchemy + dataclass or Pydantic

8. Design Patterns

8.1 Singleton - The Pythonic Way

class DatabasePool:
    _instance: "DatabasePool | None" = None

    def __new__(cls) -> "DatabasePool":
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialize()
        return cls._instance

    def _initialize(self) -> None:
        self.connections: list = []
        print("Pool initialized")

# Always returns the same instance
pool1 = DatabasePool()
pool2 = DatabasePool()
assert pool1 is pool2

An even simpler approach - module-level variable:

# db.py
_pool = None

def get_pool():
    global _pool
    if _pool is None:
        _pool = create_pool()
    return _pool

8.2 Factory Pattern

from abc import ABC, abstractmethod

class Notification(ABC):
    @abstractmethod
    def send(self, message: str) -> None: ...

class EmailNotification(Notification):
    def send(self, message: str) -> None:
        print(f"Email: {message}")

class SlackNotification(Notification):
    def send(self, message: str) -> None:
        print(f"Slack: {message}")

class SMSNotification(Notification):
    def send(self, message: str) -> None:
        print(f"SMS: {message}")

def create_notification(channel: str) -> Notification:
    factories: dict[str, type[Notification]] = {
        "email": EmailNotification,
        "slack": SlackNotification,
        "sms": SMSNotification,
    }
    if channel not in factories:
        raise ValueError(f"Unknown channel: {channel}")
    return factories[channel]()

notif = create_notification("slack")
notif.send("Hello!")

8.3 Observer Pattern

from typing import Callable

class EventEmitter:
    def __init__(self) -> None:
        self._listeners: dict[str, list[Callable]] = {}

    def on(self, event: str, callback: Callable) -> None:
        self._listeners.setdefault(event, []).append(callback)

    def emit(self, event: str, *args, **kwargs) -> None:
        for callback in self._listeners.get(event, []):
            callback(*args, **kwargs)

# Usage
emitter = EventEmitter()
emitter.on("user_created", lambda user: print(f"Welcome, {user}!"))
emitter.on("user_created", lambda user: send_email(user))
emitter.emit("user_created", "Kim")

8.4 Strategy Pattern

from typing import Protocol

class SortStrategy(Protocol):
    def sort(self, data: list[int]) -> list[int]: ...

class BubbleSort:
    def sort(self, data: list[int]) -> list[int]:
        arr = data.copy()
        n = len(arr)
        for i in range(n):
            for j in range(0, n - i - 1):
                if arr[j] > arr[j + 1]:
                    arr[j], arr[j + 1] = arr[j + 1], arr[j]
        return arr

class QuickSort:
    def sort(self, data: list[int]) -> list[int]:
        if len(data) <= 1:
            return data
        pivot = data[len(data) // 2]
        left = [x for x in data if x < pivot]
        middle = [x for x in data if x == pivot]
        right = [x for x in data if x > pivot]
        return self.sort(left) + middle + self.sort(right)

class Sorter:
    def __init__(self, strategy: SortStrategy) -> None:
        self._strategy = strategy

    def sort(self, data: list[int]) -> list[int]:
        return self._strategy.sort(data)

# Swap strategies at runtime
sorter = Sorter(QuickSort())
print(sorter.sort([3, 1, 4, 1, 5, 9]))

9. Performance Optimization

9.1 Profiling with cProfile

import cProfile
import pstats

def expensive_function():
    total = 0
    for i in range(1_000_000):
        total += i ** 2
    return total

# Run profiler
profiler = cProfile.Profile()
profiler.enable()
expensive_function()
profiler.disable()

# Print results
stats = pstats.Stats(profiler)
stats.sort_stats("cumulative")
stats.print_stats(10)

Line-by-line analysis with line_profiler:

pip install line_profiler
@profile  # line_profiler decorator
def process_data(data: list[int]) -> list[int]:
    result = []                    # Nearly 0
    for item in data:              # Loop overhead
        if item % 2 == 0:         # Condition check
            result.append(item * 2)  # Actual work
    return result

9.2 Memoization with functools.lru_cache

from functools import lru_cache

@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

# Without cache: O(2^n) -> extremely slow
# With cache: O(n) -> instant
print(fibonacci(100))

# Check cache statistics
print(fibonacci.cache_info())
# CacheInfo(hits=98, misses=101, maxsize=128, currsize=101)

9.3 List Comprehension vs for Loop

import timeit

# for loop - slower
def squares_loop(n: int) -> list[int]:
    result = []
    for i in range(n):
        result.append(i ** 2)
    return result

# List comprehension - faster (C-level optimization)
def squares_comp(n: int) -> list[int]:
    return [i ** 2 for i in range(n)]

# Benchmark
n = 100_000
t_loop = timeit.timeit(lambda: squares_loop(n), number=100)
t_comp = timeit.timeit(lambda: squares_comp(n), number=100)
print(f"Loop: {t_loop:.3f}s, Comprehension: {t_comp:.3f}s")
# Comprehension is typically 20-30% faster

9.4 Additional Performance Tips

# 1. Use join() for string concatenation
words = ["hello", "world", "python"]
result = " ".join(words)  # Much faster than "+" operator

# 2. Use set for membership tests
valid_ids = set(range(10000))
if 42 in valid_ids:  # O(1) - faster than list's O(n)
    pass

# 3. Leverage collections
from collections import Counter, defaultdict

# Frequency counting
word_count = Counter(["apple", "banana", "apple", "cherry", "apple"])
print(word_count.most_common(2))  # [('apple', 3), ('banana', 1)]

# Default dictionary
grouped = defaultdict(list)
for item in [("A", 1), ("B", 2), ("A", 3)]:
    grouped[item[0]].append(item[1])

10. Testing

10.1 pytest Basics

# tests/test_calculator.py
def add(a: int, b: int) -> int:
    return a + b

def test_add_positive():
    assert add(2, 3) == 5

def test_add_negative():
    assert add(-1, -1) == -2

def test_add_zero():
    assert add(0, 0) == 0
# Run tests
pytest tests/ -v
# With coverage
pytest --cov=src --cov-report=term-missing

10.2 Fixtures

import pytest

@pytest.fixture
def sample_users() -> list[dict]:
    return [
        {"name": "Kim", "age": 30},
        {"name": "Lee", "age": 25},
        {"name": "Park", "age": 35},
    ]

@pytest.fixture
def db_connection():
    conn = create_connection()
    yield conn  # Test runs here
    conn.close()  # Cleanup code

def test_user_count(sample_users):
    assert len(sample_users) == 3

def test_oldest_user(sample_users):
    oldest = max(sample_users, key=lambda u: u["age"])
    assert oldest["name"] == "Park"

10.3 Parametrize

Run the same test with multiple inputs.

@pytest.mark.parametrize("input_val, expected", [
    ("hello", 5),
    ("", 0),
    ("python", 6),
    ("   ", 3),
])
def test_string_length(input_val: str, expected: int):
    assert len(input_val) == expected

@pytest.mark.parametrize("a, b, expected", [
    (2, 3, 5),
    (-1, 1, 0),
    (0, 0, 0),
    (100, 200, 300),
])
def test_add(a: int, b: int, expected: int):
    assert add(a, b) == expected

10.4 Mocking

Isolate external dependencies for testing.

from unittest.mock import Mock, patch, AsyncMock

class UserService:
    def __init__(self, api_client):
        self.api = api_client

    def get_user(self, user_id: int) -> dict:
        response = self.api.get(f"/users/{user_id}")
        return response.json()

def test_get_user():
    # Create mock API client
    mock_api = Mock()
    mock_api.get.return_value.json.return_value = {
        "id": 1,
        "name": "Kim",
    }

    service = UserService(mock_api)
    user = service.get_user(1)

    assert user["name"] == "Kim"
    mock_api.get.assert_called_once_with("/users/1")

# Mocking with patch
@patch("my_module.requests.get")
def test_fetch_data(mock_get):
    mock_get.return_value.status_code = 200
    mock_get.return_value.json.return_value = {"data": "test"}

    result = fetch_data("https://api.example.com")
    assert result["data"] == "test"

10.5 Async Tests

import pytest

@pytest.mark.asyncio
async def test_async_fetch():
    result = await fetch_data("https://api.example.com/users/1")
    assert "name" in result

@pytest.mark.asyncio
async def test_concurrent_requests():
    results = await asyncio.gather(
        fetch_data("url1"),
        fetch_data("url2"),
    )
    assert len(results) == 2

Wrapping Up

Here are the core principles for writing great Python.

  1. Use type hints - They serve as documentation and bug prevention
  2. Use pyproject.toml - Consolidate all configuration into one file
  3. Manage resources with context managers - The with statement is a core Python idiom
  4. Save memory with generators - Essential for processing large datasets
  5. Speed up I/O-bound work with asyncio - Effective for network and file operations
  6. Validate external data with Pydantic - Guarantee runtime safety
  7. Don't overuse patterns - Keep it Pythonic and concise
  8. Profile first, optimize later - Base decisions on measurements, not guesses
  9. Write tests with pytest - Make full use of fixtures and parametrize
  10. uv + ruff + mypy is the most efficient Python toolchain as of 2026

Python is far more than a scripting language. With the right patterns and tools, it is a reliable choice for large-scale production systems.