- Published on
Python Backend 2025 Complete Guide: FastAPI vs Django, Async Patterns, and AI Service Architecture
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- Introduction
- 1. Python Backend Landscape 2025
- 2. FastAPI vs Django vs Flask Comparison
- 3. FastAPI Deep Dive
- 4. Django Deep Dive
- 5. Database: SQLAlchemy 2.0 + Alembic
- 6. Async Patterns (async/await)
- 7. Celery + Redis Task Queue
- 8. AI Service Architecture: FastAPI + LangChain
- 9. Testing
- 10. Production Deployment
- 11. Interview Questions (15)
- Q1. What is the biggest difference between FastAPI and Flask?
- Q2. How does Python's GIL affect web server performance?
- Q3. What are the key differences between Pydantic v1 and v2?
- Q4. How do you solve the N+1 problem in Django ORM?
- Q5. How does FastAPI Depends() work internally?
- Q6. What is the difference between ASGI and WSGI?
- Q7. What is the session management strategy for SQLAlchemy?
- Q8. How do you handle task failures in Celery?
- Q9. How do you efficiently handle file uploads in FastAPI?
- Q10. What are the caveats of using async views in Django?
- Q11. What is the difference between concurrency and parallelism in Python?
- Q12. What is the middleware execution order in FastAPI?
- Q13. How do you safely perform large-scale Alembic migrations?
- Q14. How do you debug memory leaks in Python web apps?
- Q15. FastAPI vs Litestar — which should you choose for new projects?
- 12. Quiz
- References
Introduction
Python holds an unparalleled position as a backend development language in 2025. With the explosive growth of AI/ML services, FastAPI has surpassed 80K GitHub stars and become the fastest-growing web framework. Django also remains strong in the enterprise market with 4.2 LTS and enhanced async support in 5.0.
This guide covers everything about Python backend development, from FastAPI and Django to async processing, databases, task queues, AI service architecture, and production deployment. Interview questions and quizzes are also included for career preparation.
1. Python Backend Landscape 2025
Why Python Backend?
Python dominates backend development for three reasons:
- Natural integration with AI/ML ecosystem — Use PyTorch, TensorFlow, LangChain directly
- Rapid prototyping and productivity — Clean syntax, rich libraries, type hints for safety
- Massive community — 500K+ PyPI packages, abundant StackOverflow answers
Framework Overview
| Framework | GitHub Stars | Latest Version | Key Features |
|---|---|---|---|
| FastAPI | 80K+ | 0.115+ | Async, auto-docs, Pydantic |
| Django | 82K+ | 5.1 | Full-stack, ORM, Admin |
| Flask | 69K+ | 3.1 | Micro, flexible, simple |
| Litestar | 5K+ | 2.x | FastAPI alternative, performance-first |
2. FastAPI vs Django vs Flask Comparison
Performance Comparison (requests/sec — TechEmpower benchmarks)
| Metric | FastAPI | Django | Flask |
|---|---|---|---|
| JSON serialization | ~15,000 | ~3,500 | ~4,000 |
| Single DB query | ~12,000 | ~3,000 | ~3,500 |
| Multiple DB queries | ~3,500 | ~1,200 | ~1,400 |
| Async I/O | Native | Partial (5.0+) | Not supported |
Selection Criteria
Choose FastAPI when:
- Building new API services
- AI/ML model serving is needed
- High concurrency is required
- Microservice architecture
Choose Django when:
- Admin panel is needed
- Rapid CRUD app development
- Full-stack web application
- Team has strong Django experience
Choose Flask when:
- Extremely simple APIs
- Legacy system maintenance
- Learning purposes
Ecosystem Comparison
| Feature | FastAPI | Django | Flask |
|---|---|---|---|
| ORM | SQLAlchemy | Django ORM | SQLAlchemy |
| Auth | Custom / FastAPI-Users | django-allauth | Flask-Login |
| Admin | SQLAdmin | django-admin | Flask-Admin |
| API Docs | Auto (Swagger/ReDoc) | DRF Spectacular | Flask-RESTX |
| WebSocket | Native | Channels | Flask-SocketIO |
| Async | Native | Partial | Not supported |
3. FastAPI Deep Dive
3.1 Basic Structure and Routing
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, Field
from datetime import datetime
app = FastAPI(
title="My API",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(..., pattern=r"^[\w\.\-]+@[\w\.\-]+\.\w+$")
age: int = Field(..., ge=0, le=150)
class UserResponse(BaseModel):
id: int
username: str
email: str
created_at: datetime
model_config = {"from_attributes": True}
@app.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
return UserResponse(
id=1,
username=user.username,
email=user.email,
created_at=datetime.now(),
)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
if user_id <= 0:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return UserResponse(id=user_id, username="test", email="t@t.com", created_at=datetime.now())
3.2 Pydantic v2 Data Validation
Pydantic v2 was rewritten with Rust-based pydantic-core, achieving 5-50x performance improvements.
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Optional
from enum import Enum
class Role(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class UserProfile(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
role: Role = Role.USER
tags: list[str] = Field(default_factory=list, max_length=10)
metadata: Optional[dict[str, str]] = None
@field_validator("name")
@classmethod
def name_must_not_be_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("Name cannot be whitespace only")
return v.strip()
@model_validator(mode="after")
def validate_admin_tags(self):
if self.role == Role.ADMIN and not self.tags:
raise ValueError("Admin must have at least one tag")
return self
model_config = {
"json_schema_extra": {
"examples": [
{"name": "Alice", "role": "admin", "tags": ["ops"]}
]
}
}
3.3 Dependency Injection
FastAPI's Depends() provides a powerful DI system.
from fastapi import Depends, Header, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
# DB session dependency
async def get_db() -> AsyncSession:
async with async_session_maker() as session:
try:
yield session
finally:
await session.close()
# Auth dependency
async def get_current_user(
authorization: str = Header(...),
db: AsyncSession = Depends(get_db),
) -> User:
token = authorization.replace("Bearer ", "")
user = await verify_token(token, db)
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
# Role verification dependency
def require_role(required_role: str):
async def role_checker(user: User = Depends(get_current_user)):
if user.role != required_role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return role_checker
# Usage
@app.get("/admin/users")
async def list_users(
admin: Annotated[User, Depends(require_role("admin"))],
db: Annotated[AsyncSession, Depends(get_db)],
):
return await get_all_users(db)
3.4 Middleware and Error Handling
from fastapi import Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
logger = logging.getLogger(__name__)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://myapp.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Custom middleware — request logging
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
elapsed = time.perf_counter() - start
logger.info(
f"method={request.method} path={request.url.path} "
f"status={response.status_code} duration={elapsed:.3f}s"
)
return response
app.add_middleware(RequestLoggingMiddleware)
# Global exception handler
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
return JSONResponse(
status_code=422,
content={"detail": str(exc), "type": "validation_error"},
)
3.5 WebSocket and Background Tasks
from fastapi import WebSocket, WebSocketDisconnect, BackgroundTasks
# WebSocket chat
class ConnectionManager:
def __init__(self):
self.active: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.append(ws)
def disconnect(self, ws: WebSocket):
self.active.remove(ws)
async def broadcast(self, message: str):
for conn in self.active:
await conn.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/chat")
async def websocket_chat(ws: WebSocket):
await manager.connect(ws)
try:
while True:
data = await ws.receive_text()
await manager.broadcast(f"User: {data}")
except WebSocketDisconnect:
manager.disconnect(ws)
# Background tasks
async def send_notification(email: str, message: str):
await asyncio.sleep(2)
logger.info(f"Sent notification to {email}")
@app.post("/orders")
async def create_order(order: OrderCreate, background_tasks: BackgroundTasks):
result = await save_order(order)
background_tasks.add_task(send_notification, order.email, "Order confirmed!")
return result
4. Django Deep Dive
4.1 Django ORM and Models
# models.py
from django.db import models
from django.core.validators import MinValueValidator
class Category(models.Model):
name = models.CharField(max_length=100, unique=True)
slug = models.SlugField(unique=True)
class Meta:
verbose_name_plural = "categories"
ordering = ["name"]
def __str__(self):
return self.name
class Product(models.Model):
name = models.CharField(max_length=200)
category = models.ForeignKey(Category, on_delete=models.CASCADE, related_name="products")
price = models.DecimalField(max_digits=10, decimal_places=2, validators=[MinValueValidator(0)])
stock = models.PositiveIntegerField(default=0)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
indexes = [
models.Index(fields=["category", "-created_at"]),
models.Index(fields=["price"]),
]
# Query examples — solving N+1 problem
products = Product.objects.select_related("category").filter(is_active=True)
# Aggregation
from django.db.models import Avg, Count, Q
stats = Category.objects.annotate(
product_count=Count("products"),
avg_price=Avg("products__price"),
active_count=Count("products", filter=Q(products__is_active=True)),
)
4.2 Django Admin Customization
# admin.py
from django.contrib import admin
from .models import Product, Category
@admin.register(Product)
class ProductAdmin(admin.ModelAdmin):
list_display = ["name", "category", "price", "stock", "is_active", "created_at"]
list_filter = ["is_active", "category", "created_at"]
search_fields = ["name", "category__name"]
list_editable = ["price", "stock", "is_active"]
readonly_fields = ["created_at", "updated_at"]
list_per_page = 25
actions = ["activate_products", "deactivate_products"]
@admin.action(description="Activate selected products")
def activate_products(self, request, queryset):
updated = queryset.update(is_active=True)
self.message_user(request, f"{updated} products activated.")
@admin.action(description="Deactivate selected products")
def deactivate_products(self, request, queryset):
updated = queryset.update(is_active=False)
self.message_user(request, f"{updated} products deactivated.")
4.3 Django REST Framework (DRF)
# serializers.py
from rest_framework import serializers
from .models import Product, Category
class CategorySerializer(serializers.ModelSerializer):
product_count = serializers.IntegerField(read_only=True)
class Meta:
model = Category
fields = ["id", "name", "slug", "product_count"]
class ProductSerializer(serializers.ModelSerializer):
category_name = serializers.CharField(source="category.name", read_only=True)
class Meta:
model = Product
fields = ["id", "name", "category", "category_name", "price", "stock", "is_active"]
# views.py
from rest_framework import viewsets, filters
from django_filters.rest_framework import DjangoFilterBackend
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.select_related("category").filter(is_active=True)
serializer_class = ProductSerializer
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ["category", "is_active"]
search_fields = ["name"]
ordering_fields = ["price", "created_at"]
ordering = ["-created_at"]
4.4 Django 5.0 Async Views
# Django 5.0+ async views
from django.http import JsonResponse
from asgiref.sync import sync_to_async
async def async_product_list(request):
# ORM calls must be wrapped with sync_to_async
products = await sync_to_async(list)(
Product.objects.select_related("category").filter(is_active=True)[:20]
)
data = [
{"id": p.id, "name": p.name, "price": str(p.price)}
for p in products
]
return JsonResponse({"products": data})
# Django Channels — WebSocket
from channels.generic.websocket import AsyncJsonWebSocketConsumer
class ChatConsumer(AsyncJsonWebSocketConsumer):
async def connect(self):
self.room = self.scope["url_route"]["kwargs"]["room"]
await self.channel_layer.group_add(self.room, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room, self.channel_name)
async def receive_json(self, content):
await self.channel_layer.group_send(
self.room,
{"type": "chat.message", "message": content["message"]},
)
async def chat_message(self, event):
await self.send_json({"message": event["message"]})
5. Database: SQLAlchemy 2.0 + Alembic
5.1 SQLAlchemy 2.0 Async Models
from sqlalchemy import String, Integer, ForeignKey, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from datetime import datetime
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
email: Mapped[str] = mapped_column(String(255), unique=True)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
posts: Mapped[list["Post"]] = relationship(back_populates="author", lazy="selectin")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(String)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")
# Async engine setup
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
echo=False,
)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
5.2 Async CRUD Patterns
from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, user_id: int) -> User | None:
result = await self.session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def list_with_posts(self, skip: int = 0, limit: int = 20) -> list[User]:
result = await self.session.execute(
select(User)
.options(selectinload(User.posts))
.offset(skip)
.limit(limit)
.order_by(User.created_at.desc())
)
return list(result.scalars().all())
async def create(self, username: str, email: str) -> User:
user = User(username=username, email=email)
self.session.add(user)
await self.session.commit()
await self.session.refresh(user)
return user
async def update_email(self, user_id: int, new_email: str) -> bool:
result = await self.session.execute(
update(User).where(User.id == user_id).values(email=new_email)
)
await self.session.commit()
return result.rowcount > 0
5.3 Alembic Migrations
# Initialize
alembic init alembic
# Generate migration
alembic revision --autogenerate -m "add users table"
# Apply migration
alembic upgrade head
# Rollback
alembic downgrade -1
6. Async Patterns (async/await)
6.1 asyncio Basics
import asyncio
import aiohttp
# Concurrent API calls
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
# Execute
urls = [
"https://api.example.com/users",
"https://api.example.com/products",
"https://api.example.com/orders",
]
results = asyncio.run(fetch_all(urls))
6.2 Semaphore for Concurrency Limiting
import asyncio
# Maximum 10 concurrent executions
semaphore = asyncio.Semaphore(10)
async def rate_limited_fetch(session, url):
async with semaphore:
async with session.get(url) as response:
return await response.json()
async def fetch_many(urls: list[str]):
async with aiohttp.ClientSession() as session:
tasks = [rate_limited_fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
6.3 Sync vs Async — When to Use What
Async is better for:
- Many external API calls
- Frequent database I/O
- WebSocket / real-time communication
- Heavy file I/O
Sync is better for:
- CPU-intensive tasks (image processing, ML inference)
- Simple CRUD operations
- Legacy libraries without async support
- When concurrent.futures.ProcessPoolExecutor is more appropriate
6.4 CPU-Bound Processing with concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import asyncio
def cpu_heavy_task(data: bytes) -> bytes:
import hashlib
return hashlib.sha256(data).digest()
async def process_files(files: list[bytes]) -> list[bytes]:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [
loop.run_in_executor(executor, cpu_heavy_task, f)
for f in files
]
return await asyncio.gather(*tasks)
7. Celery + Redis Task Queue
7.1 Basic Setup
# celery_app.py
from celery import Celery
app = Celery(
"myapp",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_acks_late=True,
worker_prefetch_multiplier=1,
task_reject_on_worker_lost=True,
)
7.2 Task Definition and Error Handling
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
retry_jitter=True,
)
def send_email_task(self, to: str, subject: str, body: str):
try:
send_email(to, subject, body)
logger.info(f"Email sent to {to}")
except Exception as exc:
logger.error(f"Email failed: {exc}")
raise self.retry(exc=exc)
@shared_task(bind=True)
def process_ai_inference(self, model_name: str, input_data: dict):
self.update_state(state="PROCESSING", meta={"progress": 0})
model = load_model(model_name)
self.update_state(state="PROCESSING", meta={"progress": 50})
result = model.predict(input_data)
self.update_state(state="PROCESSING", meta={"progress": 100})
return result
7.3 Periodic Tasks (Celery Beat)
from celery.schedules import crontab
app.conf.beat_schedule = {
"cleanup-expired-sessions": {
"task": "tasks.cleanup_sessions",
"schedule": crontab(hour=2, minute=0), # Daily at 2 AM
},
"sync-external-data": {
"task": "tasks.sync_data",
"schedule": 300.0, # Every 5 minutes
},
"weekly-report": {
"task": "tasks.generate_report",
"schedule": crontab(hour=9, minute=0, day_of_week="mon"),
},
}
7.4 Monitoring with Flower
# Run Flower
celery -A myapp flower --port=5555
# Flower features:
# - Real-time worker status monitoring
# - Task progress tracking
# - Failed task retry
# - Worker pool size adjustment
8. AI Service Architecture: FastAPI + LangChain
8.1 LangChain + FastAPI Integration
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
import asyncio
app = FastAPI()
llm = ChatOpenAI(model="gpt-4o", temperature=0.7, streaming=True)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{question}"),
])
chain = prompt | llm | StrOutputParser()
@app.post("/chat")
async def chat(question: str):
response = await chain.ainvoke({"question": question})
return {"answer": response}
@app.post("/chat/stream")
async def chat_stream(question: str):
async def generate():
async for chunk in chain.astream({"question": question}):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
8.2 RAG Endpoint
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
# Vector store initialization
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
rag_prompt = ChatPromptTemplate.from_messages([
("system", "Answer based on the following context:\n{context}"),
("human", "{question}"),
])
@app.post("/rag/query")
async def rag_query(question: str):
docs = await retriever.ainvoke(question)
context = "\n".join([doc.page_content for doc in docs])
response = await (rag_prompt | llm | StrOutputParser()).ainvoke(
{"context": context, "question": question}
)
return {
"answer": response,
"sources": [{"content": d.page_content, "metadata": d.metadata} for d in docs],
}
8.3 Model Serving Pattern
from contextlib import asynccontextmanager
import torch
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load models on startup
ml_models["sentiment"] = load_sentiment_model()
ml_models["ner"] = load_ner_model()
yield
# Cleanup on shutdown
ml_models.clear()
torch.cuda.empty_cache()
app = FastAPI(lifespan=lifespan)
@app.post("/predict/sentiment")
async def predict_sentiment(text: str):
model = ml_models["sentiment"]
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, model.predict, text)
return {"sentiment": result}
9. Testing
9.1 pytest + httpx (FastAPI)
import pytest
from httpx import AsyncClient, ASGITransport
from main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.mark.anyio
async def test_create_user(client: AsyncClient):
response = await client.post("/users", json={
"username": "testuser",
"email": "test@example.com",
"age": 25,
})
assert response.status_code == 201
data = response.json()
assert data["username"] == "testuser"
@pytest.mark.anyio
async def test_get_user_not_found(client: AsyncClient):
response = await client.get("/users/0")
assert response.status_code == 404
@pytest.mark.anyio
async def test_create_user_invalid_email(client: AsyncClient):
response = await client.post("/users", json={
"username": "testuser",
"email": "invalid-email",
"age": 25,
})
assert response.status_code == 422
9.2 Test Data with factory_boy
import factory
from factory.alchemy import SQLAlchemyModelFactory
class UserFactory(SQLAlchemyModelFactory):
class Meta:
model = User
sqlalchemy_session_persistence = "commit"
username = factory.Sequence(lambda n: f"user_{n}")
email = factory.LazyAttribute(lambda o: f"{o.username}@example.com")
class PostFactory(SQLAlchemyModelFactory):
class Meta:
model = Post
sqlalchemy_session_persistence = "commit"
title = factory.Faker("sentence")
content = factory.Faker("paragraph")
author = factory.SubFactory(UserFactory)
9.3 Coverage Configuration
# pyproject.toml
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
[tool.coverage.run]
source = ["app"]
omit = ["tests/*", "alembic/*"]
[tool.coverage.report]
fail_under = 80
show_missing = true
10. Production Deployment
10.1 Uvicorn + Gunicorn
# Development
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# Production — Gunicorn + Uvicorn Workers
gunicorn main:app \
-w 4 \
-k uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 120 \
--graceful-timeout 30 \
--access-logfile - \
--error-logfile -
10.2 Docker Multi-Stage Build
# Stage 1: Build
FROM python:3.12-slim AS builder
WORKDIR /app
RUN pip install --no-cache-dir poetry
COPY pyproject.toml poetry.lock ./
RUN poetry export -f requirements.txt --output requirements.txt --without-hashes
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: Runtime
FROM python:3.12-slim
WORKDIR /app
COPY /install /usr/local
COPY . .
RUN adduser --disabled-password --gecos "" appuser
USER appuser
EXPOSE 8000
HEALTHCHECK \
CMD python -c "import httpx; httpx.get('http://localhost:8000/health')"
CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
10.3 Structured Logging
import structlog
import logging
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
async def create_order(order: OrderCreate):
logger.info("order_created", order_id=order.id, user_id=order.user_id, amount=order.total)
10.4 Health Check Endpoints
from fastapi import FastAPI
from datetime import datetime
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0.0",
}
@app.get("/health/ready")
async def readiness_check(db: AsyncSession = Depends(get_db)):
try:
await db.execute(text("SELECT 1"))
db_status = "connected"
except Exception:
db_status = "disconnected"
raise HTTPException(status_code=503)
return {"status": "ready", "database": db_status}
11. Interview Questions (15)
Q1. What is the biggest difference between FastAPI and Flask?
FastAPI supports native async/await, provides automatic request validation via Pydantic, and generates OpenAPI docs automatically. Flask is synchronous and requires separate libraries for these features. FastAPI also has a built-in dependency injection system using type hints.
Q2. How does Python's GIL affect web server performance?
The GIL restricts only one thread to execute Python bytecode at a time. However, for I/O-bound tasks (network, DB), the GIL is released, making async/await effective. CPU-bound work should use multiprocessing or multiple Gunicorn workers.
Q3. What are the key differences between Pydantic v1 and v2?
v2 was rewritten with Rust-based pydantic-core for 5-50x speed improvement. Key changes: model_validate() replaces parse_obj(), model_config dict replaces Config class, field_validator replaces validator, and new model_serializer.
Q4. How do you solve the N+1 problem in Django ORM?
Use select_related() for ForeignKey/OneToOneField JOIN queries and prefetch_related() for ManyToManyField or reverse relations. Monitor query counts with django-debug-toolbar.
Q5. How does FastAPI Depends() work internally?
FastAPI resolves the dependency graph for each request. Each Depends() recursively resolves sub-dependencies, and generator dependencies (yield) execute cleanup code after the response. Dependencies are cached within the same request.
Q6. What is the difference between ASGI and WSGI?
WSGI is a synchronous interface using one thread per request. ASGI is an asynchronous interface capable of handling thousands of concurrent connections in a single process, supporting WebSocket, HTTP/2, and Server-Sent Events (SSE).
Q7. What is the session management strategy for SQLAlchemy?
Per-request session pattern is common. In FastAPI, sessions are injected via Depends() and auto-closed with yield. Important settings include connection pooling (pool_size, max_overflow), expire_on_commit=False, and scoped_session usage.
Q8. How do you handle task failures in Celery?
Use autoretry_for for specific exceptions, max_retries for limits, retry_backoff for exponential backoff, and task_acks_late for ACK after completion. Failed tasks can be sent to a Dead Letter Queue or trigger on_failure callbacks.
Q9. How do you efficiently handle file uploads in FastAPI?
UploadFile uses spooling instead of loading entirely into memory. Large files should use chunked reads and stream directly to object storage like S3. Post-processing should be separated into background tasks or Celery.
Q10. What are the caveats of using async views in Django?
Django ORM does not fully support async yet, requiring sync_to_async() wrappers. Middleware must also be async-compatible. Mixed usage may cause performance overhead. ASGI servers (Daphne, Uvicorn) are required.
Q11. What is the difference between concurrency and parallelism in Python?
Concurrency (asyncio) alternates between tasks. Parallelism (multiprocessing) executes tasks simultaneously. I/O-bound tasks benefit from concurrency; CPU-bound tasks benefit from parallelism.
Q12. What is the middleware execution order in FastAPI?
Middleware follows the onion model. Requests pass through middleware in reverse registration order, and responses return in registration order. Typical order: CORS, authentication, logging.
Q13. How do you safely perform large-scale Alembic migrations?
Separate data migrations from schema migrations, use online DDL tools (pt-online-schema-change). Apply the expand-and-contract pattern for zero-downtime migrations. Always prepare rollback scripts.
Q14. How do you debug memory leaks in Python web apps?
Use tracemalloc for memory allocation tracking, objgraph for reference graph analysis. memray is suitable for production profiling. Common causes: circular references, growing global caches, unremoved event handlers.
Q15. FastAPI vs Litestar — which should you choose for new projects?
FastAPI has a larger community and ecosystem (80K+ stars). Litestar offers better performance and a built-in DI container. Consider team experience, ecosystem dependencies, and performance requirements. FastAPI is the safer choice in most cases.
12. Quiz
Q1. What happens when you define a FastAPI endpoint with a sync function (def)?
FastAPI runs sync functions in a separate thread pool. async def runs directly on the event loop, while regular def runs via run_in_executor() in a thread pool. This means sync functions do not block other requests.
Q2. What is the difference between Pydantic model_validate() and model_construct()?
model_validate() validates all fields and raises ValidationError for invalid data. model_construct() skips validation and creates the model directly. Use it for already-validated data (from DB queries) for better performance.
Q3. What is the SQL difference between Django select_related() and prefetch_related()?
select_related() uses SQL JOIN to fetch related objects in a single query (ForeignKey, OneToOne). prefetch_related() runs separate queries and joins in Python (ManyToMany, reverse relations). select_related() is generally more efficient.
Q4. What is the difference between asyncio.gather() and asyncio.TaskGroup()?
asyncio.gather() continues running remaining tasks even if one fails (with return_exceptions=True). Python 3.11's TaskGroup cancels all remaining tasks when one fails, providing structured concurrency. TaskGroup is safer and more predictable.
Q5. What does task_acks_late=True mean in Celery and what are the trade-offs?
By default, Celery sends ACK when receiving a task. task_acks_late=True sends ACK after task completion. Advantage: tasks are redelivered on worker crash. Disadvantage: tasks may execute twice, requiring idempotency.
References
- FastAPI Official Documentation
- Django Official Documentation
- Pydantic v2 Migration Guide
- SQLAlchemy 2.0 Documentation
- Celery Official Documentation
- LangChain Documentation
- Uvicorn Configuration Guide
- Python asyncio Documentation
- Django REST Framework
- Alembic Migration Guide
- structlog Documentation
- pytest-asyncio
- Flower Monitoring
- Understanding Python GIL
- ASGI Specification