Skip to content
Published on

Python Backend 2025 Complete Guide: FastAPI vs Django, Async Patterns, and AI Service Architecture

Authors

Introduction

Python holds an unparalleled position as a backend development language in 2025. With the explosive growth of AI/ML services, FastAPI has surpassed 80K GitHub stars and become the fastest-growing web framework. Django also remains strong in the enterprise market with 4.2 LTS and enhanced async support in 5.0.

This guide covers everything about Python backend development, from FastAPI and Django to async processing, databases, task queues, AI service architecture, and production deployment. Interview questions and quizzes are also included for career preparation.


1. Python Backend Landscape 2025

Why Python Backend?

Python dominates backend development for three reasons:

  1. Natural integration with AI/ML ecosystem — Use PyTorch, TensorFlow, LangChain directly
  2. Rapid prototyping and productivity — Clean syntax, rich libraries, type hints for safety
  3. Massive community — 500K+ PyPI packages, abundant StackOverflow answers

Framework Overview

FrameworkGitHub StarsLatest VersionKey Features
FastAPI80K+0.115+Async, auto-docs, Pydantic
Django82K+5.1Full-stack, ORM, Admin
Flask69K+3.1Micro, flexible, simple
Litestar5K+2.xFastAPI alternative, performance-first

2. FastAPI vs Django vs Flask Comparison

Performance Comparison (requests/sec — TechEmpower benchmarks)

MetricFastAPIDjangoFlask
JSON serialization~15,000~3,500~4,000
Single DB query~12,000~3,000~3,500
Multiple DB queries~3,500~1,200~1,400
Async I/ONativePartial (5.0+)Not supported

Selection Criteria

Choose FastAPI when:
  - Building new API services
  - AI/ML model serving is needed
  - High concurrency is required
  - Microservice architecture

Choose Django when:
  - Admin panel is needed
  - Rapid CRUD app development
  - Full-stack web application
  - Team has strong Django experience

Choose Flask when:
  - Extremely simple APIs
  - Legacy system maintenance
  - Learning purposes

Ecosystem Comparison

FeatureFastAPIDjangoFlask
ORMSQLAlchemyDjango ORMSQLAlchemy
AuthCustom / FastAPI-Usersdjango-allauthFlask-Login
AdminSQLAdmindjango-adminFlask-Admin
API DocsAuto (Swagger/ReDoc)DRF SpectacularFlask-RESTX
WebSocketNativeChannelsFlask-SocketIO
AsyncNativePartialNot supported

3. FastAPI Deep Dive

3.1 Basic Structure and Routing

from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, Field
from datetime import datetime

app = FastAPI(
    title="My API",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc",
)

class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: str = Field(..., pattern=r"^[\w\.\-]+@[\w\.\-]+\.\w+$")
    age: int = Field(..., ge=0, le=150)

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    created_at: datetime

    model_config = {"from_attributes": True}

@app.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
    return UserResponse(
        id=1,
        username=user.username,
        email=user.email,
        created_at=datetime.now(),
    )

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    if user_id <= 0:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found",
        )
    return UserResponse(id=user_id, username="test", email="t@t.com", created_at=datetime.now())

3.2 Pydantic v2 Data Validation

Pydantic v2 was rewritten with Rust-based pydantic-core, achieving 5-50x performance improvements.

from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Optional
from enum import Enum

class Role(str, Enum):
    ADMIN = "admin"
    USER = "user"
    MODERATOR = "moderator"

class UserProfile(BaseModel):
    name: str = Field(..., min_length=1, max_length=100)
    role: Role = Role.USER
    tags: list[str] = Field(default_factory=list, max_length=10)
    metadata: Optional[dict[str, str]] = None

    @field_validator("name")
    @classmethod
    def name_must_not_be_empty(cls, v: str) -> str:
        if not v.strip():
            raise ValueError("Name cannot be whitespace only")
        return v.strip()

    @model_validator(mode="after")
    def validate_admin_tags(self):
        if self.role == Role.ADMIN and not self.tags:
            raise ValueError("Admin must have at least one tag")
        return self

    model_config = {
        "json_schema_extra": {
            "examples": [
                {"name": "Alice", "role": "admin", "tags": ["ops"]}
            ]
        }
    }

3.3 Dependency Injection

FastAPI's Depends() provides a powerful DI system.

from fastapi import Depends, Header, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated

# DB session dependency
async def get_db() -> AsyncSession:
    async with async_session_maker() as session:
        try:
            yield session
        finally:
            await session.close()

# Auth dependency
async def get_current_user(
    authorization: str = Header(...),
    db: AsyncSession = Depends(get_db),
) -> User:
    token = authorization.replace("Bearer ", "")
    user = await verify_token(token, db)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

# Role verification dependency
def require_role(required_role: str):
    async def role_checker(user: User = Depends(get_current_user)):
        if user.role != required_role:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return user
    return role_checker

# Usage
@app.get("/admin/users")
async def list_users(
    admin: Annotated[User, Depends(require_role("admin"))],
    db: Annotated[AsyncSession, Depends(get_db)],
):
    return await get_all_users(db)

3.4 Middleware and Error Handling

from fastapi import Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging

logger = logging.getLogger(__name__)

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://myapp.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Custom middleware — request logging
class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        elapsed = time.perf_counter() - start
        logger.info(
            f"method={request.method} path={request.url.path} "
            f"status={response.status_code} duration={elapsed:.3f}s"
        )
        return response

app.add_middleware(RequestLoggingMiddleware)

# Global exception handler
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
    return JSONResponse(
        status_code=422,
        content={"detail": str(exc), "type": "validation_error"},
    )

3.5 WebSocket and Background Tasks

from fastapi import WebSocket, WebSocketDisconnect, BackgroundTasks

# WebSocket chat
class ConnectionManager:
    def __init__(self):
        self.active: list[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.active.append(ws)

    def disconnect(self, ws: WebSocket):
        self.active.remove(ws)

    async def broadcast(self, message: str):
        for conn in self.active:
            await conn.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def websocket_chat(ws: WebSocket):
    await manager.connect(ws)
    try:
        while True:
            data = await ws.receive_text()
            await manager.broadcast(f"User: {data}")
    except WebSocketDisconnect:
        manager.disconnect(ws)

# Background tasks
async def send_notification(email: str, message: str):
    await asyncio.sleep(2)
    logger.info(f"Sent notification to {email}")

@app.post("/orders")
async def create_order(order: OrderCreate, background_tasks: BackgroundTasks):
    result = await save_order(order)
    background_tasks.add_task(send_notification, order.email, "Order confirmed!")
    return result

4. Django Deep Dive

4.1 Django ORM and Models

# models.py
from django.db import models
from django.core.validators import MinValueValidator

class Category(models.Model):
    name = models.CharField(max_length=100, unique=True)
    slug = models.SlugField(unique=True)

    class Meta:
        verbose_name_plural = "categories"
        ordering = ["name"]

    def __str__(self):
        return self.name

class Product(models.Model):
    name = models.CharField(max_length=200)
    category = models.ForeignKey(Category, on_delete=models.CASCADE, related_name="products")
    price = models.DecimalField(max_digits=10, decimal_places=2, validators=[MinValueValidator(0)])
    stock = models.PositiveIntegerField(default=0)
    is_active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        indexes = [
            models.Index(fields=["category", "-created_at"]),
            models.Index(fields=["price"]),
        ]

# Query examples — solving N+1 problem
products = Product.objects.select_related("category").filter(is_active=True)

# Aggregation
from django.db.models import Avg, Count, Q
stats = Category.objects.annotate(
    product_count=Count("products"),
    avg_price=Avg("products__price"),
    active_count=Count("products", filter=Q(products__is_active=True)),
)

4.2 Django Admin Customization

# admin.py
from django.contrib import admin
from .models import Product, Category

@admin.register(Product)
class ProductAdmin(admin.ModelAdmin):
    list_display = ["name", "category", "price", "stock", "is_active", "created_at"]
    list_filter = ["is_active", "category", "created_at"]
    search_fields = ["name", "category__name"]
    list_editable = ["price", "stock", "is_active"]
    readonly_fields = ["created_at", "updated_at"]
    list_per_page = 25

    actions = ["activate_products", "deactivate_products"]

    @admin.action(description="Activate selected products")
    def activate_products(self, request, queryset):
        updated = queryset.update(is_active=True)
        self.message_user(request, f"{updated} products activated.")

    @admin.action(description="Deactivate selected products")
    def deactivate_products(self, request, queryset):
        updated = queryset.update(is_active=False)
        self.message_user(request, f"{updated} products deactivated.")

4.3 Django REST Framework (DRF)

# serializers.py
from rest_framework import serializers
from .models import Product, Category

class CategorySerializer(serializers.ModelSerializer):
    product_count = serializers.IntegerField(read_only=True)

    class Meta:
        model = Category
        fields = ["id", "name", "slug", "product_count"]

class ProductSerializer(serializers.ModelSerializer):
    category_name = serializers.CharField(source="category.name", read_only=True)

    class Meta:
        model = Product
        fields = ["id", "name", "category", "category_name", "price", "stock", "is_active"]

# views.py
from rest_framework import viewsets, filters
from django_filters.rest_framework import DjangoFilterBackend

class ProductViewSet(viewsets.ModelViewSet):
    queryset = Product.objects.select_related("category").filter(is_active=True)
    serializer_class = ProductSerializer
    filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
    filterset_fields = ["category", "is_active"]
    search_fields = ["name"]
    ordering_fields = ["price", "created_at"]
    ordering = ["-created_at"]

4.4 Django 5.0 Async Views

# Django 5.0+ async views
from django.http import JsonResponse
from asgiref.sync import sync_to_async

async def async_product_list(request):
    # ORM calls must be wrapped with sync_to_async
    products = await sync_to_async(list)(
        Product.objects.select_related("category").filter(is_active=True)[:20]
    )
    data = [
        {"id": p.id, "name": p.name, "price": str(p.price)}
        for p in products
    ]
    return JsonResponse({"products": data})

# Django Channels — WebSocket
from channels.generic.websocket import AsyncJsonWebSocketConsumer

class ChatConsumer(AsyncJsonWebSocketConsumer):
    async def connect(self):
        self.room = self.scope["url_route"]["kwargs"]["room"]
        await self.channel_layer.group_add(self.room, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.room, self.channel_name)

    async def receive_json(self, content):
        await self.channel_layer.group_send(
            self.room,
            {"type": "chat.message", "message": content["message"]},
        )

    async def chat_message(self, event):
        await self.send_json({"message": event["message"]})

5. Database: SQLAlchemy 2.0 + Alembic

5.1 SQLAlchemy 2.0 Async Models

from sqlalchemy import String, Integer, ForeignKey, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from datetime import datetime

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
    email: Mapped[str] = mapped_column(String(255), unique=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
    posts: Mapped[list["Post"]] = relationship(back_populates="author", lazy="selectin")

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(String)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    author: Mapped["User"] = relationship(back_populates="posts")

# Async engine setup
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
    echo=False,
)

async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

5.2 Async CRUD Patterns

from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession

class UserRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_by_id(self, user_id: int) -> User | None:
        result = await self.session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

    async def list_with_posts(self, skip: int = 0, limit: int = 20) -> list[User]:
        result = await self.session.execute(
            select(User)
            .options(selectinload(User.posts))
            .offset(skip)
            .limit(limit)
            .order_by(User.created_at.desc())
        )
        return list(result.scalars().all())

    async def create(self, username: str, email: str) -> User:
        user = User(username=username, email=email)
        self.session.add(user)
        await self.session.commit()
        await self.session.refresh(user)
        return user

    async def update_email(self, user_id: int, new_email: str) -> bool:
        result = await self.session.execute(
            update(User).where(User.id == user_id).values(email=new_email)
        )
        await self.session.commit()
        return result.rowcount > 0

5.3 Alembic Migrations

# Initialize
alembic init alembic

# Generate migration
alembic revision --autogenerate -m "add users table"

# Apply migration
alembic upgrade head

# Rollback
alembic downgrade -1

6. Async Patterns (async/await)

6.1 asyncio Basics

import asyncio
import aiohttp

# Concurrent API calls
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

# Execute
urls = [
    "https://api.example.com/users",
    "https://api.example.com/products",
    "https://api.example.com/orders",
]
results = asyncio.run(fetch_all(urls))

6.2 Semaphore for Concurrency Limiting

import asyncio

# Maximum 10 concurrent executions
semaphore = asyncio.Semaphore(10)

async def rate_limited_fetch(session, url):
    async with semaphore:
        async with session.get(url) as response:
            return await response.json()

async def fetch_many(urls: list[str]):
    async with aiohttp.ClientSession() as session:
        tasks = [rate_limited_fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

6.3 Sync vs Async — When to Use What

Async is better for:
  - Many external API calls
  - Frequent database I/O
  - WebSocket / real-time communication
  - Heavy file I/O

Sync is better for:
  - CPU-intensive tasks (image processing, ML inference)
  - Simple CRUD operations
  - Legacy libraries without async support
  - When concurrent.futures.ProcessPoolExecutor is more appropriate

6.4 CPU-Bound Processing with concurrent.futures

from concurrent.futures import ProcessPoolExecutor
import asyncio

def cpu_heavy_task(data: bytes) -> bytes:
    import hashlib
    return hashlib.sha256(data).digest()

async def process_files(files: list[bytes]) -> list[bytes]:
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [
            loop.run_in_executor(executor, cpu_heavy_task, f)
            for f in files
        ]
        return await asyncio.gather(*tasks)

7. Celery + Redis Task Queue

7.1 Basic Setup

# celery_app.py
from celery import Celery

app = Celery(
    "myapp",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_acks_late=True,
    worker_prefetch_multiplier=1,
    task_reject_on_worker_lost=True,
)

7.2 Task Definition and Error Handling

from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,
    retry_jitter=True,
)
def send_email_task(self, to: str, subject: str, body: str):
    try:
        send_email(to, subject, body)
        logger.info(f"Email sent to {to}")
    except Exception as exc:
        logger.error(f"Email failed: {exc}")
        raise self.retry(exc=exc)

@shared_task(bind=True)
def process_ai_inference(self, model_name: str, input_data: dict):
    self.update_state(state="PROCESSING", meta={"progress": 0})
    model = load_model(model_name)
    self.update_state(state="PROCESSING", meta={"progress": 50})
    result = model.predict(input_data)
    self.update_state(state="PROCESSING", meta={"progress": 100})
    return result

7.3 Periodic Tasks (Celery Beat)

from celery.schedules import crontab

app.conf.beat_schedule = {
    "cleanup-expired-sessions": {
        "task": "tasks.cleanup_sessions",
        "schedule": crontab(hour=2, minute=0),  # Daily at 2 AM
    },
    "sync-external-data": {
        "task": "tasks.sync_data",
        "schedule": 300.0,  # Every 5 minutes
    },
    "weekly-report": {
        "task": "tasks.generate_report",
        "schedule": crontab(hour=9, minute=0, day_of_week="mon"),
    },
}

7.4 Monitoring with Flower

# Run Flower
celery -A myapp flower --port=5555

# Flower features:
# - Real-time worker status monitoring
# - Task progress tracking
# - Failed task retry
# - Worker pool size adjustment

8. AI Service Architecture: FastAPI + LangChain

8.1 LangChain + FastAPI Integration

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
import asyncio

app = FastAPI()

llm = ChatOpenAI(model="gpt-4o", temperature=0.7, streaming=True)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", "{question}"),
])

chain = prompt | llm | StrOutputParser()

@app.post("/chat")
async def chat(question: str):
    response = await chain.ainvoke({"question": question})
    return {"answer": response}

@app.post("/chat/stream")
async def chat_stream(question: str):
    async def generate():
        async for chunk in chain.astream({"question": question}):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

8.2 RAG Endpoint

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

# Vector store initialization
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

rag_prompt = ChatPromptTemplate.from_messages([
    ("system", "Answer based on the following context:\n{context}"),
    ("human", "{question}"),
])

@app.post("/rag/query")
async def rag_query(question: str):
    docs = await retriever.ainvoke(question)
    context = "\n".join([doc.page_content for doc in docs])

    response = await (rag_prompt | llm | StrOutputParser()).ainvoke(
        {"context": context, "question": question}
    )

    return {
        "answer": response,
        "sources": [{"content": d.page_content, "metadata": d.metadata} for d in docs],
    }

8.3 Model Serving Pattern

from contextlib import asynccontextmanager
import torch

ml_models = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load models on startup
    ml_models["sentiment"] = load_sentiment_model()
    ml_models["ner"] = load_ner_model()
    yield
    # Cleanup on shutdown
    ml_models.clear()
    torch.cuda.empty_cache()

app = FastAPI(lifespan=lifespan)

@app.post("/predict/sentiment")
async def predict_sentiment(text: str):
    model = ml_models["sentiment"]
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, model.predict, text)
    return {"sentiment": result}

9. Testing

9.1 pytest + httpx (FastAPI)

import pytest
from httpx import AsyncClient, ASGITransport
from main import app

@pytest.fixture
async def client():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac

@pytest.mark.anyio
async def test_create_user(client: AsyncClient):
    response = await client.post("/users", json={
        "username": "testuser",
        "email": "test@example.com",
        "age": 25,
    })
    assert response.status_code == 201
    data = response.json()
    assert data["username"] == "testuser"

@pytest.mark.anyio
async def test_get_user_not_found(client: AsyncClient):
    response = await client.get("/users/0")
    assert response.status_code == 404

@pytest.mark.anyio
async def test_create_user_invalid_email(client: AsyncClient):
    response = await client.post("/users", json={
        "username": "testuser",
        "email": "invalid-email",
        "age": 25,
    })
    assert response.status_code == 422

9.2 Test Data with factory_boy

import factory
from factory.alchemy import SQLAlchemyModelFactory

class UserFactory(SQLAlchemyModelFactory):
    class Meta:
        model = User
        sqlalchemy_session_persistence = "commit"

    username = factory.Sequence(lambda n: f"user_{n}")
    email = factory.LazyAttribute(lambda o: f"{o.username}@example.com")

class PostFactory(SQLAlchemyModelFactory):
    class Meta:
        model = Post
        sqlalchemy_session_persistence = "commit"

    title = factory.Faker("sentence")
    content = factory.Faker("paragraph")
    author = factory.SubFactory(UserFactory)

9.3 Coverage Configuration

# pyproject.toml
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"

[tool.coverage.run]
source = ["app"]
omit = ["tests/*", "alembic/*"]

[tool.coverage.report]
fail_under = 80
show_missing = true

10. Production Deployment

10.1 Uvicorn + Gunicorn

# Development
uvicorn main:app --reload --host 0.0.0.0 --port 8000

# Production — Gunicorn + Uvicorn Workers
gunicorn main:app \
  -w 4 \
  -k uvicorn.workers.UvicornWorker \
  --bind 0.0.0.0:8000 \
  --timeout 120 \
  --graceful-timeout 30 \
  --access-logfile - \
  --error-logfile -

10.2 Docker Multi-Stage Build

# Stage 1: Build
FROM python:3.12-slim AS builder

WORKDIR /app
RUN pip install --no-cache-dir poetry

COPY pyproject.toml poetry.lock ./
RUN poetry export -f requirements.txt --output requirements.txt --without-hashes
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

# Stage 2: Runtime
FROM python:3.12-slim

WORKDIR /app
COPY --from=builder /install /usr/local
COPY . .

RUN adduser --disabled-password --gecos "" appuser
USER appuser

EXPOSE 8000

HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD python -c "import httpx; httpx.get('http://localhost:8000/health')"

CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]

10.3 Structured Logging

import structlog
import logging

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    logger_factory=structlog.PrintLoggerFactory(),
)

logger = structlog.get_logger()

async def create_order(order: OrderCreate):
    logger.info("order_created", order_id=order.id, user_id=order.user_id, amount=order.total)

10.4 Health Check Endpoints

from fastapi import FastAPI
from datetime import datetime

@app.get("/health")
async def health_check():
    return {
        "status": "healthy",
        "timestamp": datetime.utcnow().isoformat(),
        "version": "1.0.0",
    }

@app.get("/health/ready")
async def readiness_check(db: AsyncSession = Depends(get_db)):
    try:
        await db.execute(text("SELECT 1"))
        db_status = "connected"
    except Exception:
        db_status = "disconnected"
        raise HTTPException(status_code=503)

    return {"status": "ready", "database": db_status}

11. Interview Questions (15)

Q1. What is the biggest difference between FastAPI and Flask?

FastAPI supports native async/await, provides automatic request validation via Pydantic, and generates OpenAPI docs automatically. Flask is synchronous and requires separate libraries for these features. FastAPI also has a built-in dependency injection system using type hints.

Q2. How does Python's GIL affect web server performance?

The GIL restricts only one thread to execute Python bytecode at a time. However, for I/O-bound tasks (network, DB), the GIL is released, making async/await effective. CPU-bound work should use multiprocessing or multiple Gunicorn workers.

Q3. What are the key differences between Pydantic v1 and v2?

v2 was rewritten with Rust-based pydantic-core for 5-50x speed improvement. Key changes: model_validate() replaces parse_obj(), model_config dict replaces Config class, field_validator replaces validator, and new model_serializer.

Q4. How do you solve the N+1 problem in Django ORM?

Use select_related() for ForeignKey/OneToOneField JOIN queries and prefetch_related() for ManyToManyField or reverse relations. Monitor query counts with django-debug-toolbar.

Q5. How does FastAPI Depends() work internally?

FastAPI resolves the dependency graph for each request. Each Depends() recursively resolves sub-dependencies, and generator dependencies (yield) execute cleanup code after the response. Dependencies are cached within the same request.

Q6. What is the difference between ASGI and WSGI?

WSGI is a synchronous interface using one thread per request. ASGI is an asynchronous interface capable of handling thousands of concurrent connections in a single process, supporting WebSocket, HTTP/2, and Server-Sent Events (SSE).

Q7. What is the session management strategy for SQLAlchemy?

Per-request session pattern is common. In FastAPI, sessions are injected via Depends() and auto-closed with yield. Important settings include connection pooling (pool_size, max_overflow), expire_on_commit=False, and scoped_session usage.

Q8. How do you handle task failures in Celery?

Use autoretry_for for specific exceptions, max_retries for limits, retry_backoff for exponential backoff, and task_acks_late for ACK after completion. Failed tasks can be sent to a Dead Letter Queue or trigger on_failure callbacks.

Q9. How do you efficiently handle file uploads in FastAPI?

UploadFile uses spooling instead of loading entirely into memory. Large files should use chunked reads and stream directly to object storage like S3. Post-processing should be separated into background tasks or Celery.

Q10. What are the caveats of using async views in Django?

Django ORM does not fully support async yet, requiring sync_to_async() wrappers. Middleware must also be async-compatible. Mixed usage may cause performance overhead. ASGI servers (Daphne, Uvicorn) are required.

Q11. What is the difference between concurrency and parallelism in Python?

Concurrency (asyncio) alternates between tasks. Parallelism (multiprocessing) executes tasks simultaneously. I/O-bound tasks benefit from concurrency; CPU-bound tasks benefit from parallelism.

Q12. What is the middleware execution order in FastAPI?

Middleware follows the onion model. Requests pass through middleware in reverse registration order, and responses return in registration order. Typical order: CORS, authentication, logging.

Q13. How do you safely perform large-scale Alembic migrations?

Separate data migrations from schema migrations, use online DDL tools (pt-online-schema-change). Apply the expand-and-contract pattern for zero-downtime migrations. Always prepare rollback scripts.

Q14. How do you debug memory leaks in Python web apps?

Use tracemalloc for memory allocation tracking, objgraph for reference graph analysis. memray is suitable for production profiling. Common causes: circular references, growing global caches, unremoved event handlers.

Q15. FastAPI vs Litestar — which should you choose for new projects?

FastAPI has a larger community and ecosystem (80K+ stars). Litestar offers better performance and a built-in DI container. Consider team experience, ecosystem dependencies, and performance requirements. FastAPI is the safer choice in most cases.


12. Quiz

Q1. What happens when you define a FastAPI endpoint with a sync function (def)?

FastAPI runs sync functions in a separate thread pool. async def runs directly on the event loop, while regular def runs via run_in_executor() in a thread pool. This means sync functions do not block other requests.

Q2. What is the difference between Pydantic model_validate() and model_construct()?

model_validate() validates all fields and raises ValidationError for invalid data. model_construct() skips validation and creates the model directly. Use it for already-validated data (from DB queries) for better performance.

Q3. What is the SQL difference between Django select_related() and prefetch_related()?

select_related() uses SQL JOIN to fetch related objects in a single query (ForeignKey, OneToOne). prefetch_related() runs separate queries and joins in Python (ManyToMany, reverse relations). select_related() is generally more efficient.

Q4. What is the difference between asyncio.gather() and asyncio.TaskGroup()?

asyncio.gather() continues running remaining tasks even if one fails (with return_exceptions=True). Python 3.11's TaskGroup cancels all remaining tasks when one fails, providing structured concurrency. TaskGroup is safer and more predictable.

Q5. What does task_acks_late=True mean in Celery and what are the trade-offs?

By default, Celery sends ACK when receiving a task. task_acks_late=True sends ACK after task completion. Advantage: tasks are redelivered on worker crash. Disadvantage: tasks may execute twice, requiring idempotency.


References

  1. FastAPI Official Documentation
  2. Django Official Documentation
  3. Pydantic v2 Migration Guide
  4. SQLAlchemy 2.0 Documentation
  5. Celery Official Documentation
  6. LangChain Documentation
  7. Uvicorn Configuration Guide
  8. Python asyncio Documentation
  9. Django REST Framework
  10. Alembic Migration Guide
  11. structlog Documentation
  12. pytest-asyncio
  13. Flower Monitoring
  14. Understanding Python GIL
  15. ASGI Specification