Skip to content
Published on

Python バックエンド 2025 完全ガイド:FastAPI vs Django、非同期処理、AIサービス構築まで

Authors

はじめに

Pythonは2025年現在、バックエンド開発言語として独自の地位を築いています。特にAI/MLサービスの爆発的な成長に伴い、FastAPIはGitHubスター80Kを突破し、最も急速に成長しているWebフレームワークとなりました。Djangoも4.2 LTSと5.0の非同期サポート強化で、エンタープライズ市場で依然として強力です。

本記事では、FastAPIとDjangoを中心に、非同期処理、データベース、タスクキュー、AIサービス構築、プロダクションデプロイまで、Pythonバックエンドの全てを網羅します。面接質問とクイズも含まれており、キャリア準備にも活用できます。


1. Pythonバックエンドランドスケープ 2025

なぜPythonバックエンドなのか?

Pythonがバックエンド開発で圧倒的な理由は三つあります。

  1. AI/MLエコシステムとの自然な統合 — PyTorch、TensorFlow、LangChainなどのAIライブラリをそのまま使用可能
  2. 迅速なプロトタイピングと生産性 — 簡潔な文法、豊富なライブラリ、型ヒントで安全性まで
  3. 巨大なコミュニティ — PyPIパッケージ50万以上、StackOverflow回答が豊富

主要フレームワーク現況

フレームワークGitHub Stars最新バージョン核心特徴
FastAPI80K+0.115+非同期、自動ドキュメント、Pydantic
Django82K+5.1フルスタック、ORM、Admin
Flask69K+3.1マイクロ、柔軟性、シンプル
Litestar5K+2.xFastAPI代替、パフォーマンス優先

2. FastAPI vs Django vs Flask 比較

パフォーマンス比較(requests/sec — TechEmpower基準)

項目FastAPIDjangoFlask
JSONシリアライゼーション~15,000~3,500~4,000
単一DBクエリ~12,000~3,000~3,500
複数DBクエリ~3,500~1,200~1,400
非同期I/Oネイティブ5.0+ 部分サポート非サポート

選択基準

FastAPIを選ぶ場合:
  - 新しいAPIサービスを構築する時
  - AI/MLモデルサービングが必要な時
  - 高い並行性が要求される時
  - マイクロサービスアーキテクチャの時

Djangoを選ぶ場合:
  - 管理パネルが必要な時
  - 迅速なCRUDアプリ開発が必要な時
  - フルスタックWebアプリケーションの時
  - チームにDjango経験が豊富な時

Flaskを選ぶ場合:
  - 極めてシンプルなAPIの時
  - レガシーシステムのメンテナンスの時
  - 学習目的の時

エコシステム比較

機能FastAPIDjangoFlask
ORMSQLAlchemyDjango ORMSQLAlchemy
認証カスタム / FastAPI-Usersdjango-allauthFlask-Login
管理画面SQLAdmindjango-adminFlask-Admin
APIドキュメント自動(Swagger/ReDoc)DRF SpectacularFlask-RESTX
WebSocketネイティブChannelsFlask-SocketIO
非同期ネイティブ部分サポート非サポート

3. FastAPI ディープダイブ

3.1 基本構造とルーティング

from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, Field
from datetime import datetime

app = FastAPI(
    title="My API",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc",
)

class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: str = Field(..., pattern=r"^[\w\.\-]+@[\w\.\-]+\.\w+$")
    age: int = Field(..., ge=0, le=150)

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    created_at: datetime

    model_config = {"from_attributes": True}

@app.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
    return UserResponse(
        id=1,
        username=user.username,
        email=user.email,
        created_at=datetime.now(),
    )

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    if user_id <= 0:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found",
        )
    return UserResponse(id=user_id, username="test", email="t@t.com", created_at=datetime.now())

3.2 Pydantic v2 データバリデーション

Pydantic v2はRustベースのpydantic-coreで書き直され、5〜50倍のパフォーマンス向上を達成しました。

from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Optional
from enum import Enum

class Role(str, Enum):
    ADMIN = "admin"
    USER = "user"
    MODERATOR = "moderator"

class UserProfile(BaseModel):
    name: str = Field(..., min_length=1, max_length=100)
    role: Role = Role.USER
    tags: list[str] = Field(default_factory=list, max_length=10)
    metadata: Optional[dict[str, str]] = None

    @field_validator("name")
    @classmethod
    def name_must_not_be_empty(cls, v: str) -> str:
        if not v.strip():
            raise ValueError("Name cannot be whitespace only")
        return v.strip()

    @model_validator(mode="after")
    def validate_admin_tags(self):
        if self.role == Role.ADMIN and not self.tags:
            raise ValueError("Admin must have at least one tag")
        return self

    model_config = {
        "json_schema_extra": {
            "examples": [
                {"name": "Alice", "role": "admin", "tags": ["ops"]}
            ]
        }
    }

3.3 依存性注入(Dependency Injection)

FastAPIのDepends()は強力なDIシステムを提供します。

from fastapi import Depends, Header, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated

# DBセッション依存性
async def get_db() -> AsyncSession:
    async with async_session_maker() as session:
        try:
            yield session
        finally:
            await session.close()

# 認証依存性
async def get_current_user(
    authorization: str = Header(...),
    db: AsyncSession = Depends(get_db),
) -> User:
    token = authorization.replace("Bearer ", "")
    user = await verify_token(token, db)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

# 権限検証依存性
def require_role(required_role: str):
    async def role_checker(user: User = Depends(get_current_user)):
        if user.role != required_role:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return user
    return role_checker

# 使用例
@app.get("/admin/users")
async def list_users(
    admin: Annotated[User, Depends(require_role("admin"))],
    db: Annotated[AsyncSession, Depends(get_db)],
):
    return await get_all_users(db)

3.4 ミドルウェアとエラーハンドリング

from fastapi import Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging

logger = logging.getLogger(__name__)

# CORSミドルウェア
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://myapp.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# カスタムミドルウェア — リクエストロギング
class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        elapsed = time.perf_counter() - start
        logger.info(
            f"method={request.method} path={request.url.path} "
            f"status={response.status_code} duration={elapsed:.3f}s"
        )
        return response

app.add_middleware(RequestLoggingMiddleware)

# グローバル例外ハンドラ
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
    return JSONResponse(
        status_code=422,
        content={"detail": str(exc), "type": "validation_error"},
    )

3.5 WebSocketとバックグラウンドタスク

from fastapi import WebSocket, WebSocketDisconnect, BackgroundTasks

# WebSocketチャット
class ConnectionManager:
    def __init__(self):
        self.active: list[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.active.append(ws)

    def disconnect(self, ws: WebSocket):
        self.active.remove(ws)

    async def broadcast(self, message: str):
        for conn in self.active:
            await conn.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def websocket_chat(ws: WebSocket):
    await manager.connect(ws)
    try:
        while True:
            data = await ws.receive_text()
            await manager.broadcast(f"User: {data}")
    except WebSocketDisconnect:
        manager.disconnect(ws)

# バックグラウンドタスク
async def send_notification(email: str, message: str):
    await asyncio.sleep(2)
    logger.info(f"Sent notification to {email}")

@app.post("/orders")
async def create_order(order: OrderCreate, background_tasks: BackgroundTasks):
    result = await save_order(order)
    background_tasks.add_task(send_notification, order.email, "Order confirmed!")
    return result

4. Django ディープダイブ

4.1 Django ORMとモデル

# models.py
from django.db import models
from django.core.validators import MinValueValidator

class Category(models.Model):
    name = models.CharField(max_length=100, unique=True)
    slug = models.SlugField(unique=True)

    class Meta:
        verbose_name_plural = "categories"
        ordering = ["name"]

    def __str__(self):
        return self.name

class Product(models.Model):
    name = models.CharField(max_length=200)
    category = models.ForeignKey(Category, on_delete=models.CASCADE, related_name="products")
    price = models.DecimalField(max_digits=10, decimal_places=2, validators=[MinValueValidator(0)])
    stock = models.PositiveIntegerField(default=0)
    is_active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        indexes = [
            models.Index(fields=["category", "-created_at"]),
            models.Index(fields=["price"]),
        ]

# クエリ例 — N+1問題の解決
products = Product.objects.select_related("category").filter(is_active=True)

# 集計
from django.db.models import Avg, Count, Q
stats = Category.objects.annotate(
    product_count=Count("products"),
    avg_price=Avg("products__price"),
    active_count=Count("products", filter=Q(products__is_active=True)),
)

4.2 Django Admin カスタマイズ

# admin.py
from django.contrib import admin
from .models import Product, Category

@admin.register(Product)
class ProductAdmin(admin.ModelAdmin):
    list_display = ["name", "category", "price", "stock", "is_active", "created_at"]
    list_filter = ["is_active", "category", "created_at"]
    search_fields = ["name", "category__name"]
    list_editable = ["price", "stock", "is_active"]
    readonly_fields = ["created_at", "updated_at"]
    list_per_page = 25

    actions = ["activate_products", "deactivate_products"]

    @admin.action(description="Activate selected products")
    def activate_products(self, request, queryset):
        updated = queryset.update(is_active=True)
        self.message_user(request, f"{updated} products activated.")

    @admin.action(description="Deactivate selected products")
    def deactivate_products(self, request, queryset):
        updated = queryset.update(is_active=False)
        self.message_user(request, f"{updated} products deactivated.")

4.3 Django REST Framework (DRF)

# serializers.py
from rest_framework import serializers
from .models import Product, Category

class CategorySerializer(serializers.ModelSerializer):
    product_count = serializers.IntegerField(read_only=True)

    class Meta:
        model = Category
        fields = ["id", "name", "slug", "product_count"]

class ProductSerializer(serializers.ModelSerializer):
    category_name = serializers.CharField(source="category.name", read_only=True)

    class Meta:
        model = Product
        fields = ["id", "name", "category", "category_name", "price", "stock", "is_active"]

# views.py
from rest_framework import viewsets, filters
from django_filters.rest_framework import DjangoFilterBackend

class ProductViewSet(viewsets.ModelViewSet):
    queryset = Product.objects.select_related("category").filter(is_active=True)
    serializer_class = ProductSerializer
    filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
    filterset_fields = ["category", "is_active"]
    search_fields = ["name"]
    ordering_fields = ["price", "created_at"]
    ordering = ["-created_at"]

4.4 Django 5.0 非同期ビュー

# Django 5.0+ 非同期ビュー
from django.http import JsonResponse
from asgiref.sync import sync_to_async

async def async_product_list(request):
    # ORM呼び出しはsync_to_asyncでラップが必要
    products = await sync_to_async(list)(
        Product.objects.select_related("category").filter(is_active=True)[:20]
    )
    data = [
        {"id": p.id, "name": p.name, "price": str(p.price)}
        for p in products
    ]
    return JsonResponse({"products": data})

# Django Channels — WebSocket
from channels.generic.websocket import AsyncJsonWebSocketConsumer

class ChatConsumer(AsyncJsonWebSocketConsumer):
    async def connect(self):
        self.room = self.scope["url_route"]["kwargs"]["room"]
        await self.channel_layer.group_add(self.room, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.room, self.channel_name)

    async def receive_json(self, content):
        await self.channel_layer.group_send(
            self.room,
            {"type": "chat.message", "message": content["message"]},
        )

    async def chat_message(self, event):
        await self.send_json({"message": event["message"]})

5. データベース:SQLAlchemy 2.0 + Alembic

5.1 SQLAlchemy 2.0 非同期モデル

from sqlalchemy import String, Integer, ForeignKey, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from datetime import datetime

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
    email: Mapped[str] = mapped_column(String(255), unique=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
    posts: Mapped[list["Post"]] = relationship(back_populates="author", lazy="selectin")

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(String)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    author: Mapped["User"] = relationship(back_populates="posts")

# 非同期エンジン設定
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
    echo=False,
)

async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

5.2 非同期CRUDパターン

from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession

class UserRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_by_id(self, user_id: int) -> User | None:
        result = await self.session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

    async def list_with_posts(self, skip: int = 0, limit: int = 20) -> list[User]:
        result = await self.session.execute(
            select(User)
            .options(selectinload(User.posts))
            .offset(skip)
            .limit(limit)
            .order_by(User.created_at.desc())
        )
        return list(result.scalars().all())

    async def create(self, username: str, email: str) -> User:
        user = User(username=username, email=email)
        self.session.add(user)
        await self.session.commit()
        await self.session.refresh(user)
        return user

    async def update_email(self, user_id: int, new_email: str) -> bool:
        result = await self.session.execute(
            update(User).where(User.id == user_id).values(email=new_email)
        )
        await self.session.commit()
        return result.rowcount > 0

5.3 Alembic マイグレーション

# 初期化
alembic init alembic

# マイグレーション生成
alembic revision --autogenerate -m "add users table"

# マイグレーション適用
alembic upgrade head

# ロールバック
alembic downgrade -1

6. 非同期パターン(async/await)

6.1 asyncio 基本

import asyncio
import aiohttp

# 複数API同時呼び出し
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

# 実行
urls = [
    "https://api.example.com/users",
    "https://api.example.com/products",
    "https://api.example.com/orders",
]
results = asyncio.run(fetch_all(urls))

6.2 セマフォによる並行性制限

import asyncio

# 同時に最大10個のみ実行
semaphore = asyncio.Semaphore(10)

async def rate_limited_fetch(session, url):
    async with semaphore:
        async with session.get(url) as response:
            return await response.json()

6.3 同期 vs 非同期 — いつ何を使うべきか

非同期が良い場合:
  - 外部API呼び出しが多い場合
  - データベースI/Oが頻繁な場合
  - WebSocket / リアルタイム通信
  - ファイルI/Oが多い場合

同期が良い場合:
  - CPU集約的タスク(画像処理、ML推論)
  - シンプルなCRUD操作
  - レガシーライブラリが非同期をサポートしない場合
  - concurrent.futures.ProcessPoolExecutorの使用が適切な場合

6.4 concurrent.futuresでCPUバウンド処理

from concurrent.futures import ProcessPoolExecutor
import asyncio

def cpu_heavy_task(data: bytes) -> bytes:
    import hashlib
    return hashlib.sha256(data).digest()

async def process_files(files: list[bytes]) -> list[bytes]:
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [
            loop.run_in_executor(executor, cpu_heavy_task, f)
            for f in files
        ]
        return await asyncio.gather(*tasks)

7. Celery + Redis タスクキュー

7.1 基本設定

# celery_app.py
from celery import Celery

app = Celery(
    "myapp",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Tokyo",
    enable_utc=True,
    task_track_started=True,
    task_acks_late=True,
    worker_prefetch_multiplier=1,
    task_reject_on_worker_lost=True,
)

7.2 タスク定義とエラーハンドリング

from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,
    retry_jitter=True,
)
def send_email_task(self, to: str, subject: str, body: str):
    try:
        send_email(to, subject, body)
        logger.info(f"Email sent to {to}")
    except Exception as exc:
        logger.error(f"Email failed: {exc}")
        raise self.retry(exc=exc)

@shared_task(bind=True)
def process_ai_inference(self, model_name: str, input_data: dict):
    self.update_state(state="PROCESSING", meta={"progress": 0})
    model = load_model(model_name)
    self.update_state(state="PROCESSING", meta={"progress": 50})
    result = model.predict(input_data)
    self.update_state(state="PROCESSING", meta={"progress": 100})
    return result

7.3 定期タスク(Celery Beat)

from celery.schedules import crontab

app.conf.beat_schedule = {
    "cleanup-expired-sessions": {
        "task": "tasks.cleanup_sessions",
        "schedule": crontab(hour=2, minute=0),  # 毎日午前2時
    },
    "sync-external-data": {
        "task": "tasks.sync_data",
        "schedule": 300.0,  # 5分ごと
    },
    "weekly-report": {
        "task": "tasks.generate_report",
        "schedule": crontab(hour=9, minute=0, day_of_week="mon"),
    },
}

8. AIサービス構築:FastAPI + LangChain

8.1 LangChain + FastAPI 統合

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser

app = FastAPI()

llm = ChatOpenAI(model="gpt-4o", temperature=0.7, streaming=True)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant. Answer in Japanese."),
    ("human", "{question}"),
])

chain = prompt | llm | StrOutputParser()

@app.post("/chat")
async def chat(question: str):
    response = await chain.ainvoke({"question": question})
    return {"answer": response}

@app.post("/chat/stream")
async def chat_stream(question: str):
    async def generate():
        async for chunk in chain.astream({"question": question}):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

8.2 RAG エンドポイント

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

rag_prompt = ChatPromptTemplate.from_messages([
    ("system", "Answer based on the following context:\n{context}"),
    ("human", "{question}"),
])

@app.post("/rag/query")
async def rag_query(question: str):
    docs = await retriever.ainvoke(question)
    context = "\n".join([doc.page_content for doc in docs])

    response = await (rag_prompt | llm | StrOutputParser()).ainvoke(
        {"context": context, "question": question}
    )

    return {
        "answer": response,
        "sources": [{"content": d.page_content, "metadata": d.metadata} for d in docs],
    }

8.3 モデルサービングパターン

from contextlib import asynccontextmanager
import torch

ml_models = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    ml_models["sentiment"] = load_sentiment_model()
    ml_models["ner"] = load_ner_model()
    yield
    ml_models.clear()
    torch.cuda.empty_cache()

app = FastAPI(lifespan=lifespan)

@app.post("/predict/sentiment")
async def predict_sentiment(text: str):
    model = ml_models["sentiment"]
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, model.predict, text)
    return {"sentiment": result}

9. テスティング

9.1 pytest + httpx(FastAPI)

import pytest
from httpx import AsyncClient, ASGITransport
from main import app

@pytest.fixture
async def client():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac

@pytest.mark.anyio
async def test_create_user(client: AsyncClient):
    response = await client.post("/users", json={
        "username": "testuser",
        "email": "test@example.com",
        "age": 25,
    })
    assert response.status_code == 201
    data = response.json()
    assert data["username"] == "testuser"

@pytest.mark.anyio
async def test_get_user_not_found(client: AsyncClient):
    response = await client.get("/users/0")
    assert response.status_code == 404

9.2 factory_boyでテストデータ生成

import factory
from factory.alchemy import SQLAlchemyModelFactory

class UserFactory(SQLAlchemyModelFactory):
    class Meta:
        model = User
        sqlalchemy_session_persistence = "commit"

    username = factory.Sequence(lambda n: f"user_{n}")
    email = factory.LazyAttribute(lambda o: f"{o.username}@example.com")

class PostFactory(SQLAlchemyModelFactory):
    class Meta:
        model = Post
        sqlalchemy_session_persistence = "commit"

    title = factory.Faker("sentence")
    content = factory.Faker("paragraph")
    author = factory.SubFactory(UserFactory)

10. プロダクションデプロイ

10.1 Uvicorn + Gunicorn

# 開発
uvicorn main:app --reload --host 0.0.0.0 --port 8000

# プロダクション — Gunicorn + Uvicorn Workers
gunicorn main:app \
  -w 4 \
  -k uvicorn.workers.UvicornWorker \
  --bind 0.0.0.0:8000 \
  --timeout 120 \
  --graceful-timeout 30 \
  --access-logfile - \
  --error-logfile -

10.2 Docker マルチステージビルド

# Stage 1: ビルド
FROM python:3.12-slim AS builder

WORKDIR /app
RUN pip install --no-cache-dir poetry

COPY pyproject.toml poetry.lock ./
RUN poetry export -f requirements.txt --output requirements.txt --without-hashes
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

# Stage 2: ランタイム
FROM python:3.12-slim

WORKDIR /app
COPY --from=builder /install /usr/local
COPY . .

RUN adduser --disabled-password --gecos "" appuser
USER appuser

EXPOSE 8000

HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD python -c "import httpx; httpx.get('http://localhost:8000/health')"

CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]

10.3 構造化ロギング

import structlog
import logging

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    logger_factory=structlog.PrintLoggerFactory(),
)

logger = structlog.get_logger()

async def create_order(order: OrderCreate):
    logger.info("order_created", order_id=order.id, user_id=order.user_id, amount=order.total)

10.4 ヘルスチェックエンドポイント

from fastapi import FastAPI
from datetime import datetime

@app.get("/health")
async def health_check():
    return {
        "status": "healthy",
        "timestamp": datetime.utcnow().isoformat(),
        "version": "1.0.0",
    }

@app.get("/health/ready")
async def readiness_check(db: AsyncSession = Depends(get_db)):
    try:
        await db.execute(text("SELECT 1"))
        db_status = "connected"
    except Exception:
        db_status = "disconnected"
        raise HTTPException(status_code=503)

    return {"status": "ready", "database": db_status}

11. 面接質問 15選

Q1. FastAPIとFlaskの最大の違いは何ですか?

FastAPIはネイティブのasync/awaitをサポートし、Pydanticベースの自動リクエストバリデーションとOpenAPIドキュメントの自動生成を提供します。Flaskは同期ベースで、これらの機能には別途ライブラリが必要です。また、FastAPIは型ヒントを活用した依存性注入システムを内蔵しています。

Q2. PythonのGILがWebサーバーのパフォーマンスに与える影響は?

GIL(Global Interpreter Lock)は一度に1つのスレッドだけがPythonバイトコードを実行するよう制限します。しかし、I/Oバウンドタスク(ネットワーク、DB)ではGILが解放されるため、async/awaitが効果的です。CPUバウンドタスクはmultiprocessingやGunicornの複数ワーカーで解決します。

Q3. Pydantic v1とv2の主な違いは?

v2はRustベースのpydantic-coreで書き直され、5〜50倍高速です。主な変更:model_validate()(旧parse_obj())、model_config dict(旧Configクラス)、field_validatorデコレータ(旧validator)、model_serializer等。

Q4. Django ORMでN+1問題を解決する方法は?

select_related()でForeignKey/OneToOneFieldのJOINクエリを実行し、prefetch_related()でManyToManyFieldや逆方向リレーションを事前ロードします。django-debug-toolbarでクエリ数をモニタリングすると良いです。

Q5. FastAPIのDepends()は内部的にどう動作しますか?

FastAPIはリクエストごとに依存性グラフを解決します。各Depends()はサブ依存性を再帰的に解決し、ジェネレータ依存性(yield)はレスポンス後にクリーンアップコードを実行します。同一リクエスト内では同じ依存性がキャッシュされます。

Q6. ASGIとWSGIの違いは?

WSGIはリクエストごとに1スレッドを使用する同期インターフェースです。ASGIは1プロセスで数千の同時接続を処理できる非同期インターフェースで、WebSocket、HTTP/2、Server-Sent Events(SSE)をサポートします。

Q7. SQLAlchemyのセッション管理戦略は?

リクエストごとのセッションパターンが一般的です。FastAPIではDepends()でセッションを注入し、yieldで自動クローズします。connection pooling(pool_size、max_overflow)、expire_on_commit=False設定、scoped_sessionの使用が重要です。

Q8. Celeryでタスク失敗時の処理戦略は?

autoretry_forで特定例外に対する自動リトライ、max_retriesで最大回数制限、retry_backoffで指数バックオフ、task_acks_lateで完了後ACKを送信します。失敗時はDead Letter Queueに送るか、on_failureコールバックで通知を送れます。

Q9. FastAPIで大容量ファイルアップロードを効率的に処理する方法は?

UploadFileを使用するとメモリに全てロードせずスプーリングします。大容量ファイルはチャンクドリードで処理し、S3等のオブジェクトストレージに直接ストリーミングします。後処理はバックグラウンドタスクやCeleryで分離します。

Q10. Djangoでasync view使用時の注意事項は?

Django ORMはまだ完全な非同期をサポートしていないため、sync_to_async()でラップが必要です。ミドルウェアも非同期互換が必要で、混合使用時はパフォーマンスオーバーヘッドが発生する可能性があります。ASGIサーバー(Daphne、Uvicorn)を使用する必要があります。

Q11. Pythonにおける並行性と並列性の違いは?

並行性(concurrency)は複数タスクを交互に実行することで、asyncioが代表的です。並列性(parallelism)は複数タスクを同時に実行することで、multiprocessingが該当します。I/Oバウンドには並行性、CPUバウンドには並列性が適切です。

Q12. FastAPIのミドルウェア実行順序は?

ミドルウェアはオニオン(玉ねぎ)モデルに従います。リクエストは登録逆順でミドルウェアを通過し、レスポンスは登録順で戻ります。CORS、認証、ロギングの順で登録するのが一般的です。

Q13. Alembicで大規模マイグレーションを安全に実行する方法は?

データマイグレーションとスキーママイグレーションを分離し、オンラインDDL(pt-online-schema-change)を使用します。ダウンタイムなしのマイグレーションにはexpand-and-contractパターンを適用します。常にロールバックスクリプトを準備します。

Q14. PythonWebアプリのメモリリークをデバッグする方法は?

tracemallocでメモリ割り当てを追跡し、objgraphでオブジェクト参照グラフを分析します。memrayはプロダクションプロファイリングに適しています。主な原因:循環参照、グローバルキャッシュの増大、イベントハンドラの未削除。

Q15. FastAPI vs Litestar — 新プロジェクトでどちらを選ぶべきですか?

FastAPIはより大きなコミュニティとエコシステムを持っています(80K+ stars)。Litestarはより良いパフォーマンスと組み込みDIコンテナを提供します。チームの経験、エコシステム依存度、パフォーマンス要件を考慮する必要があります。大半の場合、FastAPIが安全な選択です。


12. クイズ

Q1. FastAPIで同期関数(def)でエンドポイントを定義するとどう実行されますか?

FastAPIは同期関数を別のスレッドプールで実行します。async defはイベントループで直接実行され、通常のdefrun_in_executor()を通じてスレッドプールで実行されます。そのため、同期関数も他のリクエストをブロックしません。

Q2. Pydanticのmodel_validate()とmodel_construct()の違いは?

model_validate()は全フィールドをバリデーションし、無効なデータに対してValidationErrorを発生させます。model_construct()はバリデーションをスキップして直接モデルを生成します。既にバリデーション済みのデータ(DBから取得)に使用するとパフォーマンスが向上します。

Q3. Djangoのselect_related()とprefetch_related()のSQL上の違いは?

select_related()はSQL JOINを使用して1つのクエリで関連オブジェクトを取得します(ForeignKey、OneToOne)。prefetch_related()は別々のクエリを実行しPythonでジョインします(ManyToMany、逆参照)。一般的にselect_related()がより効率的です。

Q4. asyncio.gather()とasyncio.TaskGroup()の違いは?

asyncio.gather()は1つが失敗しても残りが実行を続けます(return_exceptions=True時)。Python 3.11のTaskGroupは1つが失敗すると残り全てをキャンセルする構造化並行性を提供します。TaskGroupがより安全で予測可能です。

Q5. Celeryのtask_acks_late=Trueの意味とトレードオフは?

デフォルトではCeleryはタスク受信時に即座にACKを送信します。task_acks_late=Trueはタスク完了後にACKを送信します。利点:ワーカークラッシュ時にタスクが再配信されます。欠点:タスクが2回実行される可能性があるため、冪等性(idempotency)が必要です。


参考資料

  1. FastAPI 公式ドキュメント
  2. Django 公式ドキュメント
  3. Pydantic v2 マイグレーションガイド
  4. SQLAlchemy 2.0 ドキュメント
  5. Celery 公式ドキュメント
  6. LangChain ドキュメント
  7. Uvicorn 設定ガイド
  8. Python asyncio ドキュメント
  9. Django REST Framework
  10. Alembic マイグレーションガイド
  11. structlog ドキュメント
  12. pytest-asyncio
  13. Flower モニタリング
  14. Python GIL を理解する
  15. ASGI 仕様