- Published on
Python 백엔드 2025 완전 가이드: FastAPI vs Django, 비동기 처리, AI 서비스 구축까지
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- 들어가며
- 1. Python 백엔드 랜드스케이프 2025
- 2. FastAPI vs Django vs Flask 비교
- 3. FastAPI 딥다이브
- 4. Django 딥다이브
- 5. 데이터베이스: SQLAlchemy 2.0 + Alembic
- 6. 비동기 패턴 (async/await)
- 7. Celery + Redis 비동기 작업 큐
- 8. AI 서비스 구축: FastAPI + LangChain
- 9. 테스팅
- 10. 프로덕션 배포
- 11. 면접 질문 15선
- Q1. FastAPI와 Flask의 가장 큰 차이는 무엇인가요?
- Q2. Python의 GIL이 웹 서버 성능에 미치는 영향은?
- Q3. Pydantic v1과 v2의 주요 차이는?
- Q4. Django ORM에서 N+1 문제를 해결하는 방법은?
- Q5. FastAPI의 Depends()가 내부적으로 어떻게 동작하나요?
- Q6. ASGI와 WSGI의 차이는?
- Q7. SQLAlchemy의 세션 관리 전략은?
- Q8. Celery에서 작업 실패 시 처리 전략은?
- Q9. FastAPI에서 파일 업로드를 효율적으로 처리하는 방법은?
- Q10. Django에서 async view 사용 시 주의사항은?
- Q11. Python에서 동시성과 병렬성의 차이는?
- Q12. FastAPI의 미들웨어 실행 순서는?
- Q13. Alembic으로 대규모 마이그레이션을 안전하게 수행하는 방법은?
- Q14. Python 웹 앱의 메모리 누수를 디버깅하는 방법은?
- Q15. FastAPI vs Litestar — 새 프로젝트에서 어떤 것을 선택해야 하나요?
- 12. 퀴즈
- 참고 자료
들어가며
Python은 2025년 현재 백엔드 개발 언어로서 독보적인 위치를 차지하고 있습니다. 특히 AI/ML 서비스의 폭발적 성장과 함께, FastAPI는 GitHub 스타 80K를 돌파하며 가장 빠르게 성장하는 웹 프레임워크가 되었습니다. Django 역시 4.2 LTS와 5.0의 async 지원 강화로 여전히 엔터프라이즈 시장에서 강력합니다.
이 글에서는 FastAPI와 Django를 중심으로, 비동기 처리, 데이터베이스, 작업 큐, AI 서비스 구축, 프로덕션 배포까지 Python 백엔드의 모든 것을 다룹니다. 면접 질문과 퀴즈도 포함되어 있어 커리어 준비에도 활용할 수 있습니다.
1. Python 백엔드 랜드스케이프 2025
왜 Python 백엔드인가?
Python은 세 가지 이유로 백엔드 개발에서 압도적입니다.
- AI/ML 생태계와의 자연스러운 통합 — PyTorch, TensorFlow, LangChain 등 AI 라이브러리를 그대로 사용 가능
- 빠른 프로토타이핑과 생산성 — 간결한 문법, 풍부한 라이브러리, 타입 힌트로 안전성까지
- 거대한 커뮤니티 — PyPI 패키지 50만 개 이상, StackOverflow 답변 풍부
주요 프레임워크 현황
| 프레임워크 | GitHub Stars | 최신 버전 | 핵심 특징 |
|---|---|---|---|
| FastAPI | 80K+ | 0.115+ | 비동기, 자동 문서화, Pydantic |
| Django | 82K+ | 5.1 | 풀스택, ORM, Admin |
| Flask | 69K+ | 3.1 | 마이크로, 유연성, 단순함 |
| Litestar | 5K+ | 2.x | FastAPI 대안, 성능 우선 |
2. FastAPI vs Django vs Flask 비교
성능 비교 (requests/sec — TechEmpower 기준)
| 항목 | FastAPI | Django | Flask |
|---|---|---|---|
| JSON 직렬화 | ~15,000 | ~3,500 | ~4,000 |
| 단일 DB 쿼리 | ~12,000 | ~3,000 | ~3,500 |
| 다중 DB 쿼리 | ~3,500 | ~1,200 | ~1,400 |
| 비동기 I/O | 네이티브 | 5.0+ 부분 지원 | 지원 안함 |
선택 기준
FastAPI를 선택하세요:
- 새로운 API 서비스를 빌드할 때
- AI/ML 모델 서빙이 필요할 때
- 높은 동시성이 요구될 때
- 마이크로서비스 아키텍처일 때
Django를 선택하세요:
- 관리자 패널이 필요할 때
- 빠른 CRUD 앱 개발이 필요할 때
- 풀스택 웹 애플리케이션일 때
- 팀에 Django 경험이 풍부할 때
Flask를 선택하세요:
- 극도로 단순한 API일 때
- 레거시 시스템 유지보수일 때
- 학습 목적일 때
에코시스템 비교
| 기능 | FastAPI | Django | Flask |
|---|---|---|---|
| ORM | SQLAlchemy | Django ORM | SQLAlchemy |
| 인증 | 직접 구현 / FastAPI-Users | django-allauth | Flask-Login |
| 관리자 | SQLAdmin | django-admin | Flask-Admin |
| API 문서 | 자동 (Swagger/ReDoc) | DRF Spectacular | Flask-RESTX |
| WebSocket | 네이티브 | Channels | Flask-SocketIO |
| 비동기 | 네이티브 | 부분 지원 | 미지원 |
3. FastAPI 딥다이브
3.1 기본 구조와 라우팅
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, Field
from datetime import datetime
app = FastAPI(
title="My API",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(..., pattern=r"^[\w\.\-]+@[\w\.\-]+\.\w+$")
age: int = Field(..., ge=0, le=150)
class UserResponse(BaseModel):
id: int
username: str
email: str
created_at: datetime
model_config = {"from_attributes": True}
@app.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
# 비즈니스 로직
return UserResponse(
id=1,
username=user.username,
email=user.email,
created_at=datetime.now(),
)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
if user_id <= 0:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return UserResponse(id=user_id, username="test", email="t@t.com", created_at=datetime.now())
3.2 Pydantic v2 데이터 검증
Pydantic v2는 Rust 기반 pydantic-core로 재작성되어 5~50배 성능 향상을 달성했습니다.
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Optional
from enum import Enum
class Role(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class UserProfile(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
role: Role = Role.USER
tags: list[str] = Field(default_factory=list, max_length=10)
metadata: Optional[dict[str, str]] = None
@field_validator("name")
@classmethod
def name_must_not_be_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("Name cannot be whitespace only")
return v.strip()
@model_validator(mode="after")
def validate_admin_tags(self):
if self.role == Role.ADMIN and not self.tags:
raise ValueError("Admin must have at least one tag")
return self
model_config = {
"json_schema_extra": {
"examples": [
{"name": "Alice", "role": "admin", "tags": ["ops"]}
]
}
}
3.3 의존성 주입 (Dependency Injection)
FastAPI의 Depends()는 강력한 DI 시스템을 제공합니다.
from fastapi import Depends, Header, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
# DB 세션 의존성
async def get_db() -> AsyncSession:
async with async_session_maker() as session:
try:
yield session
finally:
await session.close()
# 인증 의존성
async def get_current_user(
authorization: str = Header(...),
db: AsyncSession = Depends(get_db),
) -> User:
token = authorization.replace("Bearer ", "")
user = await verify_token(token, db)
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
# 권한 검증 의존성
def require_role(required_role: str):
async def role_checker(user: User = Depends(get_current_user)):
if user.role != required_role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return role_checker
# 사용
@app.get("/admin/users")
async def list_users(
admin: Annotated[User, Depends(require_role("admin"))],
db: Annotated[AsyncSession, Depends(get_db)],
):
return await get_all_users(db)
3.4 미들웨어와 에러 핸들링
from fastapi import Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
logger = logging.getLogger(__name__)
# CORS 미들웨어
app.add_middleware(
CORSMiddleware,
allow_origins=["https://myapp.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 커스텀 미들웨어 — 요청 로깅
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
elapsed = time.perf_counter() - start
logger.info(
f"method={request.method} path={request.url.path} "
f"status={response.status_code} duration={elapsed:.3f}s"
)
return response
app.add_middleware(RequestLoggingMiddleware)
# 글로벌 예외 핸들러
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
return JSONResponse(
status_code=422,
content={"detail": str(exc), "type": "validation_error"},
)
3.5 WebSocket과 백그라운드 태스크
from fastapi import WebSocket, WebSocketDisconnect, BackgroundTasks
# WebSocket 채팅
class ConnectionManager:
def __init__(self):
self.active: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.append(ws)
def disconnect(self, ws: WebSocket):
self.active.remove(ws)
async def broadcast(self, message: str):
for conn in self.active:
await conn.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/chat")
async def websocket_chat(ws: WebSocket):
await manager.connect(ws)
try:
while True:
data = await ws.receive_text()
await manager.broadcast(f"User: {data}")
except WebSocketDisconnect:
manager.disconnect(ws)
# 백그라운드 태스크
async def send_notification(email: str, message: str):
# 이메일 발송 로직
await asyncio.sleep(2) # 시뮬레이션
logger.info(f"Sent notification to {email}")
@app.post("/orders")
async def create_order(order: OrderCreate, background_tasks: BackgroundTasks):
result = await save_order(order)
background_tasks.add_task(send_notification, order.email, "Order confirmed!")
return result
4. Django 딥다이브
4.1 Django ORM과 모델
# models.py
from django.db import models
from django.core.validators import MinValueValidator
class Category(models.Model):
name = models.CharField(max_length=100, unique=True)
slug = models.SlugField(unique=True)
class Meta:
verbose_name_plural = "categories"
ordering = ["name"]
def __str__(self):
return self.name
class Product(models.Model):
name = models.CharField(max_length=200)
category = models.ForeignKey(Category, on_delete=models.CASCADE, related_name="products")
price = models.DecimalField(max_digits=10, decimal_places=2, validators=[MinValueValidator(0)])
stock = models.PositiveIntegerField(default=0)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
indexes = [
models.Index(fields=["category", "-created_at"]),
models.Index(fields=["price"]),
]
def __str__(self):
return self.name
# 쿼리 예시
# N+1 문제 해결
products = Product.objects.select_related("category").filter(is_active=True)
# 집계
from django.db.models import Avg, Count, Q
stats = Category.objects.annotate(
product_count=Count("products"),
avg_price=Avg("products__price"),
active_count=Count("products", filter=Q(products__is_active=True)),
)
4.2 Django Admin 커스터마이징
# admin.py
from django.contrib import admin
from .models import Product, Category
@admin.register(Product)
class ProductAdmin(admin.ModelAdmin):
list_display = ["name", "category", "price", "stock", "is_active", "created_at"]
list_filter = ["is_active", "category", "created_at"]
search_fields = ["name", "category__name"]
list_editable = ["price", "stock", "is_active"]
readonly_fields = ["created_at", "updated_at"]
list_per_page = 25
actions = ["activate_products", "deactivate_products"]
@admin.action(description="Activate selected products")
def activate_products(self, request, queryset):
updated = queryset.update(is_active=True)
self.message_user(request, f"{updated} products activated.")
@admin.action(description="Deactivate selected products")
def deactivate_products(self, request, queryset):
updated = queryset.update(is_active=False)
self.message_user(request, f"{updated} products deactivated.")
4.3 Django REST Framework (DRF)
# serializers.py
from rest_framework import serializers
from .models import Product, Category
class CategorySerializer(serializers.ModelSerializer):
product_count = serializers.IntegerField(read_only=True)
class Meta:
model = Category
fields = ["id", "name", "slug", "product_count"]
class ProductSerializer(serializers.ModelSerializer):
category_name = serializers.CharField(source="category.name", read_only=True)
class Meta:
model = Product
fields = ["id", "name", "category", "category_name", "price", "stock", "is_active"]
# views.py
from rest_framework import viewsets, filters
from django_filters.rest_framework import DjangoFilterBackend
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.select_related("category").filter(is_active=True)
serializer_class = ProductSerializer
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ["category", "is_active"]
search_fields = ["name"]
ordering_fields = ["price", "created_at"]
ordering = ["-created_at"]
4.4 Django 5.0 비동기 뷰
# Django 5.0+ async views
from django.http import JsonResponse
from asgiref.sync import sync_to_async
async def async_product_list(request):
# ORM 호출은 sync_to_async로 감싸야 함
products = await sync_to_async(list)(
Product.objects.select_related("category").filter(is_active=True)[:20]
)
data = [
{"id": p.id, "name": p.name, "price": str(p.price)}
for p in products
]
return JsonResponse({"products": data})
# Django Channels — WebSocket
# consumers.py
from channels.generic.websocket import AsyncJsonWebSocketConsumer
class ChatConsumer(AsyncJsonWebSocketConsumer):
async def connect(self):
self.room = self.scope["url_route"]["kwargs"]["room"]
await self.channel_layer.group_add(self.room, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room, self.channel_name)
async def receive_json(self, content):
await self.channel_layer.group_send(
self.room,
{"type": "chat.message", "message": content["message"]},
)
async def chat_message(self, event):
await self.send_json({"message": event["message"]})
5. 데이터베이스: SQLAlchemy 2.0 + Alembic
5.1 SQLAlchemy 2.0 비동기 모델
from sqlalchemy import String, Integer, ForeignKey, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from datetime import datetime
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
email: Mapped[str] = mapped_column(String(255), unique=True)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
posts: Mapped[list["Post"]] = relationship(back_populates="author", lazy="selectin")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(String)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")
# 비동기 엔진 설정
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
echo=False,
)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
5.2 비동기 CRUD 패턴
from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, user_id: int) -> User | None:
result = await self.session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_by_username(self, username: str) -> User | None:
result = await self.session.execute(
select(User).where(User.username == username)
)
return result.scalar_one_or_none()
async def list_with_posts(self, skip: int = 0, limit: int = 20) -> list[User]:
result = await self.session.execute(
select(User)
.options(selectinload(User.posts))
.offset(skip)
.limit(limit)
.order_by(User.created_at.desc())
)
return list(result.scalars().all())
async def create(self, username: str, email: str) -> User:
user = User(username=username, email=email)
self.session.add(user)
await self.session.commit()
await self.session.refresh(user)
return user
async def update_email(self, user_id: int, new_email: str) -> bool:
result = await self.session.execute(
update(User).where(User.id == user_id).values(email=new_email)
)
await self.session.commit()
return result.rowcount > 0
5.3 Alembic 마이그레이션
# 초기화
alembic init alembic
# 마이그레이션 생성
alembic revision --autogenerate -m "add users table"
# 마이그레이션 적용
alembic upgrade head
# 롤백
alembic downgrade -1
# alembic/env.py — 비동기 설정
from sqlalchemy.ext.asyncio import create_async_engine
def run_migrations_online():
connectable = create_async_engine(get_url())
async def do_run():
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
asyncio.run(do_run())
6. 비동기 패턴 (async/await)
6.1 asyncio 기본
import asyncio
import aiohttp
# 여러 API 동시 호출
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
# 실행
urls = [
"https://api.example.com/users",
"https://api.example.com/products",
"https://api.example.com/orders",
]
results = asyncio.run(fetch_all(urls))
6.2 세마포어로 동시성 제한
import asyncio
# 동시에 최대 10개만 실행
semaphore = asyncio.Semaphore(10)
async def rate_limited_fetch(session, url):
async with semaphore:
async with session.get(url) as response:
return await response.json()
async def fetch_many(urls: list[str]):
async with aiohttp.ClientSession() as session:
tasks = [rate_limited_fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
6.3 동기 vs 비동기 — 언제 무엇을 쓸까?
비동기가 좋은 경우:
- 외부 API 호출이 많은 경우
- 데이터베이스 I/O가 빈번한 경우
- WebSocket / 실시간 통신
- 파일 I/O가 많은 경우
동기가 나은 경우:
- CPU 집약적 작업 (이미지 처리, ML 추론)
- 간단한 CRUD 작업
- 레거시 라이브러리가 비동기를 지원하지 않는 경우
- concurrent.futures.ProcessPoolExecutor 사용이 나은 경우
6.4 concurrent.futures로 CPU 바운드 처리
from concurrent.futures import ProcessPoolExecutor
import asyncio
def cpu_heavy_task(data: bytes) -> bytes:
# CPU 집약적 작업 (이미지 리사이징 등)
import hashlib
return hashlib.sha256(data).digest()
async def process_files(files: list[bytes]) -> list[bytes]:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [
loop.run_in_executor(executor, cpu_heavy_task, f)
for f in files
]
return await asyncio.gather(*tasks)
7. Celery + Redis 비동기 작업 큐
7.1 기본 설정
# celery_app.py
from celery import Celery
app = Celery(
"myapp",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Seoul",
enable_utc=True,
task_track_started=True,
task_acks_late=True, # 작업 완료 후 ACK
worker_prefetch_multiplier=1, # 한 번에 하나씩 가져옴
task_reject_on_worker_lost=True,
)
7.2 태스크 정의와 에러 핸들링
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
retry_jitter=True,
)
def send_email_task(self, to: str, subject: str, body: str):
try:
send_email(to, subject, body)
logger.info(f"Email sent to {to}")
except Exception as exc:
logger.error(f"Email failed: {exc}")
raise self.retry(exc=exc)
@shared_task(bind=True)
def process_ai_inference(self, model_name: str, input_data: dict):
self.update_state(state="PROCESSING", meta={"progress": 0})
# 모델 로딩
model = load_model(model_name)
self.update_state(state="PROCESSING", meta={"progress": 50})
# 추론 실행
result = model.predict(input_data)
self.update_state(state="PROCESSING", meta={"progress": 100})
return result
7.3 주기적 작업 (Celery Beat)
from celery.schedules import crontab
app.conf.beat_schedule = {
"cleanup-expired-sessions": {
"task": "tasks.cleanup_sessions",
"schedule": crontab(hour=2, minute=0), # 매일 새벽 2시
},
"sync-external-data": {
"task": "tasks.sync_data",
"schedule": 300.0, # 5분마다
},
"weekly-report": {
"task": "tasks.generate_report",
"schedule": crontab(hour=9, minute=0, day_of_week="mon"),
},
}
7.4 Flower로 모니터링
# Flower 실행
celery -A myapp flower --port=5555
# Flower 기능:
# - 실시간 워커 상태 모니터링
# - 태스크 진행 상황 추적
# - 실패한 태스크 재시도
# - 워커 풀 크기 조절
8. AI 서비스 구축: FastAPI + LangChain
8.1 LangChain + FastAPI 통합
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
import asyncio
app = FastAPI()
llm = ChatOpenAI(model="gpt-4o", temperature=0.7, streaming=True)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Answer in Korean."),
("human", "{question}"),
])
chain = prompt | llm | StrOutputParser()
@app.post("/chat")
async def chat(question: str):
response = await chain.ainvoke({"question": question})
return {"answer": response}
@app.post("/chat/stream")
async def chat_stream(question: str):
async def generate():
async for chunk in chain.astream({"question": question}):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
8.2 RAG 엔드포인트
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 벡터 스토어 초기화
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
rag_prompt = ChatPromptTemplate.from_messages([
("system", "Answer based on the following context:\n{context}"),
("human", "{question}"),
])
@app.post("/rag/query")
async def rag_query(question: str):
# 관련 문서 검색
docs = await retriever.ainvoke(question)
context = "\n".join([doc.page_content for doc in docs])
# LLM으로 답변 생성
response = await (rag_prompt | llm | StrOutputParser()).ainvoke(
{"context": context, "question": question}
)
return {
"answer": response,
"sources": [{"content": d.page_content, "metadata": d.metadata} for d in docs],
}
8.3 모델 서빙 패턴
from contextlib import asynccontextmanager
import torch
# 앱 시작 시 모델 로딩 (lifespan)
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# 시작 시 모델 로딩
ml_models["sentiment"] = load_sentiment_model()
ml_models["ner"] = load_ner_model()
yield
# 종료 시 정리
ml_models.clear()
torch.cuda.empty_cache()
app = FastAPI(lifespan=lifespan)
@app.post("/predict/sentiment")
async def predict_sentiment(text: str):
model = ml_models["sentiment"]
# CPU 바운드 작업은 스레드 풀에서 실행
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, model.predict, text)
return {"sentiment": result}
9. 테스팅
9.1 pytest + httpx (FastAPI)
import pytest
from httpx import AsyncClient, ASGITransport
from main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.mark.anyio
async def test_create_user(client: AsyncClient):
response = await client.post("/users", json={
"username": "testuser",
"email": "test@example.com",
"age": 25,
})
assert response.status_code == 201
data = response.json()
assert data["username"] == "testuser"
@pytest.mark.anyio
async def test_get_user_not_found(client: AsyncClient):
response = await client.get("/users/0")
assert response.status_code == 404
@pytest.mark.anyio
async def test_create_user_invalid_email(client: AsyncClient):
response = await client.post("/users", json={
"username": "testuser",
"email": "invalid-email",
"age": 25,
})
assert response.status_code == 422
9.2 factory_boy로 테스트 데이터 생성
import factory
from factory.alchemy import SQLAlchemyModelFactory
class UserFactory(SQLAlchemyModelFactory):
class Meta:
model = User
sqlalchemy_session_persistence = "commit"
username = factory.Sequence(lambda n: f"user_{n}")
email = factory.LazyAttribute(lambda o: f"{o.username}@example.com")
class PostFactory(SQLAlchemyModelFactory):
class Meta:
model = Post
sqlalchemy_session_persistence = "commit"
title = factory.Faker("sentence")
content = factory.Faker("paragraph")
author = factory.SubFactory(UserFactory)
# 사용
async def test_list_users(db_session):
users = [UserFactory(session=db_session) for _ in range(10)]
# 테스트 로직
9.3 커버리지 설정
# pyproject.toml
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
[tool.coverage.run]
source = ["app"]
omit = ["tests/*", "alembic/*"]
[tool.coverage.report]
fail_under = 80
show_missing = true
# 실행
pytest --cov=app --cov-report=html tests/
10. 프로덕션 배포
10.1 Uvicorn + Gunicorn
# 개발
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# 프로덕션 — Gunicorn + Uvicorn Workers
gunicorn main:app \
-w 4 \
-k uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 120 \
--graceful-timeout 30 \
--access-logfile - \
--error-logfile -
10.2 Docker 멀티스테이지 빌드
# Stage 1: 빌드
FROM python:3.12-slim AS builder
WORKDIR /app
RUN pip install --no-cache-dir poetry
COPY pyproject.toml poetry.lock ./
RUN poetry export -f requirements.txt --output requirements.txt --without-hashes
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: 실행
FROM python:3.12-slim
WORKDIR /app
COPY /install /usr/local
COPY . .
# 비 root 사용자
RUN adduser --disabled-password --gecos "" appuser
USER appuser
EXPOSE 8000
HEALTHCHECK \
CMD python -c "import httpx; httpx.get('http://localhost:8000/health')"
CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
10.3 구조화된 로깅
import structlog
import logging
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
# 사용
async def create_order(order: OrderCreate):
logger.info("order_created", order_id=order.id, user_id=order.user_id, amount=order.total)
10.4 Health Check 엔드포인트
from fastapi import FastAPI
from datetime import datetime
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0.0",
}
@app.get("/health/ready")
async def readiness_check(db: AsyncSession = Depends(get_db)):
try:
await db.execute(text("SELECT 1"))
db_status = "connected"
except Exception:
db_status = "disconnected"
raise HTTPException(status_code=503)
return {"status": "ready", "database": db_status}
11. 면접 질문 15선
Q1. FastAPI와 Flask의 가장 큰 차이는 무엇인가요?
FastAPI는 네이티브 async/await을 지원하며, Pydantic 기반의 자동 요청 검증과 OpenAPI 문서 자동 생성을 제공합니다. Flask는 동기 기반이며 이러한 기능을 위해 별도 라이브러리가 필요합니다. 또한 FastAPI는 타입 힌트를 활용한 의존성 주입 시스템을 내장하고 있습니다.
Q2. Python의 GIL이 웹 서버 성능에 미치는 영향은?
GIL(Global Interpreter Lock)은 한 번에 하나의 스레드만 Python 바이트코드를 실행하도록 제한합니다. 하지만 I/O 바운드 작업(네트워크, DB)에서는 GIL이 해제되므로 async/await이 효과적입니다. CPU 바운드 작업은 multiprocessing이나 Gunicorn의 다중 워커로 해결합니다.
Q3. Pydantic v1과 v2의 주요 차이는?
v2는 Rust 기반 pydantic-core로 재작성되어 5-50배 빠릅니다. 주요 변경: BaseModel.model_validate() (기존 parse_obj()), model_config dict (기존 Config 클래스), field_validator 데코레이터 (기존 validator), model_serializer 등.
Q4. Django ORM에서 N+1 문제를 해결하는 방법은?
select_related()로 ForeignKey/OneToOneField 조인 쿼리를 수행하고, prefetch_related()로 ManyToManyField나 역방향 관계를 미리 로딩합니다. django-debug-toolbar로 쿼리 수를 모니터링하면 좋습니다.
Q5. FastAPI의 Depends()가 내부적으로 어떻게 동작하나요?
FastAPI는 요청이 올 때마다 의존성 그래프를 해석합니다. 각 Depends()는 서브의존성을 재귀적으로 해결하며, generator 의존성(yield)은 응답 후 cleanup 코드를 실행합니다. 동일 요청 내에서 같은 의존성은 캐시됩니다.
Q6. ASGI와 WSGI의 차이는?
WSGI는 동기 인터페이스로 요청당 하나의 스레드를 사용합니다. ASGI는 비동기 인터페이스로 하나의 프로세스에서 수천 개의 동시 연결을 처리할 수 있으며, WebSocket, HTTP/2, 서버 전송 이벤트(SSE)를 지원합니다.
Q7. SQLAlchemy의 세션 관리 전략은?
요청별 세션 패턴이 일반적입니다. FastAPI에서는 Depends()로 세션을 주입하고, yield로 자동 종료합니다. connection pooling (pool_size, max_overflow), expire_on_commit=False 설정, 그리고 scoped_session 사용이 중요합니다.
Q8. Celery에서 작업 실패 시 처리 전략은?
autoretry_for로 특정 예외에 대해 자동 재시도, max_retries로 최대 횟수 제한, retry_backoff으로 지수 백오프, task_acks_late로 완료 후 ACK을 보냅니다. 실패 시 Dead Letter Queue에 보내거나 on_failure 콜백으로 알림을 보낼 수 있습니다.
Q9. FastAPI에서 파일 업로드를 효율적으로 처리하는 방법은?
UploadFile을 사용하면 메모리에 전부 로딩하지 않고 스풀링합니다. 대용량 파일은 chunked read로 처리하고, S3 등 오브젝트 스토리지에 직접 스트리밍합니다. 백그라운드 태스크나 Celery로 후처리를 분리합니다.
Q10. Django에서 async view 사용 시 주의사항은?
Django ORM은 아직 완전한 비동기를 지원하지 않아 sync_to_async()로 감싸야 합니다. 미들웨어도 비동기 호환이 필요하며, 혼합 사용 시 성능 오버헤드가 발생할 수 있습니다. ASGI 서버(Daphne, Uvicorn)를 사용해야 합니다.
Q11. Python에서 동시성과 병렬성의 차이는?
동시성(concurrency)은 여러 작업을 번갈아 실행하는 것으로 asyncio가 대표적입니다. 병렬성(parallelism)은 여러 작업을 동시에 실행하는 것으로 multiprocessing이 해당됩니다. I/O 바운드는 동시성, CPU 바운드는 병렬성이 적합합니다.
Q12. FastAPI의 미들웨어 실행 순서는?
미들웨어는 양파(onion) 모델을 따릅니다. 요청은 등록 역순으로 미들웨어를 통과하고, 응답은 등록 순서로 돌아옵니다. CORS, 인증, 로깅 순으로 등록하는 것이 일반적입니다.
Q13. Alembic으로 대규모 마이그레이션을 안전하게 수행하는 방법은?
데이터 마이그레이션과 스키마 마이그레이션을 분리하고, 온라인 DDL(pt-online-schema-change)을 사용합니다. 다운타임 없는 마이그레이션을 위해 expand-and-contract 패턴을 적용합니다. 항상 롤백 스크립트를 준비합니다.
Q14. Python 웹 앱의 메모리 누수를 디버깅하는 방법은?
tracemalloc으로 메모리 할당을 추적하고, objgraph로 객체 참조 그래프를 분석합니다. memray는 프로덕션 프로파일링에 적합합니다. 순환 참조, 글로벌 캐시 증가, 이벤트 핸들러 미제거가 주요 원인입니다.
Q15. FastAPI vs Litestar — 새 프로젝트에서 어떤 것을 선택해야 하나요?
FastAPI는 더 큰 커뮤니티와 생태계를 가지고 있습니다(80K+ stars). Litestar는 더 나은 성능과 내장 DI 컨테이너를 제공합니다. 팀의 경험, 생태계 의존도, 성능 요구사항을 고려해야 합니다. 대부분의 경우 FastAPI가 안전한 선택입니다.
12. 퀴즈
Q1. FastAPI에서 동기 함수(def)로 엔드포인트를 정의하면 어떻게 실행되나요?
FastAPI는 동기 함수를 별도의 스레드 풀에서 실행합니다. async def는 이벤트 루프에서 직접 실행되고, 일반 def는 run_in_executor()를 통해 스레드 풀에서 실행됩니다. 따라서 동기 함수도 다른 요청을 블로킹하지 않습니다.
Q2. Pydantic의 model_validate()와 model_construct()의 차이는?
model_validate()는 모든 필드를 검증하며, 유효하지 않은 데이터에 대해 ValidationError를 발생시킵니다. model_construct()는 검증을 건너뛰고 직접 모델을 생성합니다. 이미 검증된 데이터(DB에서 조회)에 사용하면 성능이 향상됩니다.
Q3. Django의 select_related()와 prefetch_related()의 SQL 차이는?
select_related()는 SQL JOIN을 사용하여 하나의 쿼리로 관련 객체를 가져옵니다 (ForeignKey, OneToOne). prefetch_related()는 별도의 쿼리를 실행하고 Python에서 조인합니다 (ManyToMany, 역참조). select_related()가 일반적으로 더 효율적입니다.
Q4. asyncio.gather()와 asyncio.TaskGroup()의 차이는?
asyncio.gather()는 하나가 실패해도 나머지가 계속 실행됩니다 (return_exceptions=True 시). Python 3.11의 TaskGroup은 하나가 실패하면 나머지를 모두 취소하는 구조화된 동시성을 제공합니다. TaskGroup이 더 안전하고 예측 가능합니다.
Q5. Celery에서 task_acks_late=True의 의미와 장단점은?
기본적으로 Celery는 작업 수신 시 즉시 ACK을 보냅니다. task_acks_late=True는 작업 완료 후 ACK을 보냅니다. 장점: 워커 크래시 시 작업이 재전달됩니다. 단점: 작업이 두 번 실행될 수 있으므로 멱등성(idempotency)이 필요합니다.