Split View: AI 앱 풀스택 개발 가이드: FastAPI + Next.js로 LLM 서비스 만들기
AI 앱 풀스택 개발 가이드: FastAPI + Next.js로 LLM 서비스 만들기
개요
현대 AI 애플리케이션 개발은 단순한 API 호출을 넘어 스트리밍, 멀티모달, RAG(Retrieval-Augmented Generation) 등 복잡한 기술을 통합해야 합니다. 이 가이드는 FastAPI 백엔드와 Next.js 프론트엔드를 기반으로 프로덕션 수준의 LLM 서비스를 구축하는 전 과정을 다룹니다.
1. AI 앱 아키텍처 설계
현대 AI 앱의 3계층 구조
현대적인 AI 애플리케이션은 세 가지 핵심 레이어로 구성됩니다.
- 프론트엔드 레이어: Next.js App Router, Vercel AI SDK, 스트리밍 UI
- 백엔드 레이어: FastAPI, LangChain, 인증 미들웨어, 캐싱
- AI/데이터 레이어: OpenAI/Claude API, 벡터 데이터베이스, 임베딩 모델
스트리밍 vs 배치 처리
LLM 응답을 처리하는 두 가지 주요 방식이 있습니다.
스트리밍 처리는 토큰이 생성되는 즉시 클라이언트로 전송하는 방식입니다. 사용자 체감 응답 속도가 빠르고 대화형 인터페이스에 적합합니다. Server-Sent Events(SSE) 또는 WebSocket을 사용합니다.
배치 처리는 전체 응답이 완성된 후 한 번에 반환하는 방식입니다. 문서 처리, 데이터 분석, 배경 작업에 적합합니다. Celery + Redis 큐를 활용합니다.
프로젝트 폴더 구조
ai-app/
├── backend/
│ ├── app/
│ │ ├── main.py
│ │ ├── routers/
│ │ │ ├── chat.py
│ │ │ └── documents.py
│ │ ├── services/
│ │ │ ├── llm_service.py
│ │ │ └── vector_service.py
│ │ └── models/
│ │ └── schemas.py
│ ├── requirements.txt
│ └── Dockerfile
├── frontend/
│ ├── app/
│ │ ├── chat/
│ │ │ └── page.tsx
│ │ └── api/
│ │ └── chat/
│ │ └── route.ts
│ ├── components/
│ └── package.json
└── docker-compose.yml
2. FastAPI 백엔드 구성
설치 및 기본 설정
pip install fastapi uvicorn openai langchain langchain-openai python-dotenv
Pydantic 모델로 요청/응답 검증
# app/models/schemas.py
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum
class Role(str, Enum):
user = "user"
assistant = "assistant"
system = "system"
class Message(BaseModel):
role: Role
content: str
class ChatRequest(BaseModel):
messages: List[Message]
model: str = Field(default="gpt-4o-mini")
temperature: float = Field(default=0.7, ge=0, le=2)
max_tokens: Optional[int] = Field(default=None)
class ChatResponse(BaseModel):
content: str
usage: dict
비동기 스트리밍 엔드포인트
FastAPI의 StreamingResponse를 사용하면 LLM 토큰을 실시간으로 클라이언트에 전송할 수 있습니다.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI
from app.models.schemas import ChatRequest
app = FastAPI(title="AI App Backend")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_methods=["*"],
allow_headers=["*"],
)
client = AsyncOpenAI()
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
stream = await client.chat.completions.create(
model=request.model,
messages=[m.dict() for m in request.messages],
stream=True,
temperature=request.temperature,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield f"data: {delta}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
의존성 주입 패턴
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="토큰이 만료되었습니다."
)
@app.post("/api/chat/secure")
async def secure_chat(request: ChatRequest, user=Depends(verify_token)):
# 인증된 사용자만 접근 가능
pass
3. LangChain 통합
대화 체인 구성
LangChain을 사용하면 메모리 관리, 체인 구성, 도구 통합을 쉽게 할 수 있습니다.
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
memory = ConversationBufferWindowMemory(k=10)
template = """당신은 친절한 AI 어시스턴트입니다.
현재 대화:
{history}
Human: {input}
AI:"""
prompt = PromptTemplate(
input_variables=["history", "input"],
template=template
)
chain = ConversationChain(llm=llm, memory=memory, prompt=prompt)
response = chain.predict(input="안녕하세요, 저는 Python 개발자입니다.")
RAG 파이프라인 구축
RAG(검색 증강 생성)는 외부 문서를 검색하여 LLM의 응답 품질을 높이는 패턴입니다.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
# 문서 로드 및 청킹
loader = PyPDFLoader("document.pdf")
documents = loader.load()
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = splitter.split_documents(documents)
# 벡터 저장소 생성
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings)
# RAG 체인 구성
llm = ChatOpenAI(model="gpt-4o")
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
answer = qa_chain.invoke({"query": "문서의 주요 내용은?"})
커스텀 툴 만들기
from langchain.tools import tool
from langchain.agents import initialize_agent, AgentType
@tool
def search_database(query: str) -> str:
"""데이터베이스에서 정보를 검색합니다. query는 검색할 키워드입니다."""
# 실제 DB 쿼리 로직
results = db.search(query)
return str(results)
@tool
def get_weather(city: str) -> str:
"""특정 도시의 현재 날씨를 조회합니다."""
response = requests.get(f"https://api.weather.com/v1/{city}")
return response.json()["description"]
llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [search_database, get_weather]
agent = initialize_agent(tools, llm, agent=AgentType.OPENAI_FUNCTIONS)
4. Next.js 프론트엔드
Vercel AI SDK로 스트리밍 채팅
Vercel AI SDK는 Next.js에서 AI 스트리밍을 간편하게 구현하는 공식 라이브러리입니다.
npm install ai @ai-sdk/openai react-markdown
// app/api/chat/route.ts
import { openai } from '@ai-sdk/openai'
import { streamText } from 'ai'
export async function POST(req: Request) {
const { messages } = await req.json()
const result = await streamText({
model: openai('gpt-4o-mini'),
messages,
system: '당신은 친절하고 도움이 되는 AI 어시스턴트입니다.',
})
return result.toDataStreamResponse()
}
채팅 UI 컴포넌트
// app/chat/page.tsx
'use client'
import { useChat } from 'ai/react'
import ReactMarkdown from 'react-markdown'
export default function ChatPage() {
const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
api: '/api/chat',
})
return (
<div className="flex flex-col h-screen max-w-2xl mx-auto">
<header className="p-4 border-b font-semibold text-lg">
AI 어시스턴트
</header>
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{messages.map(m => (
<div
key={m.id}
className={`flex ${m.role === 'user' ? 'justify-end' : 'justify-start'}`}
>
<div
className={`max-w-xs rounded-lg p-3 ${
m.role === 'user'
? 'bg-blue-500 text-white'
: 'bg-gray-100 text-gray-800'
}`}
>
<ReactMarkdown>{m.content}</ReactMarkdown>
</div>
</div>
))}
{isLoading && (
<div className="flex justify-start">
<div className="bg-gray-100 rounded-lg p-3 text-gray-500">
답변 생성 중...
</div>
</div>
)}
</div>
<form onSubmit={handleSubmit} className="p-4 border-t flex gap-2">
<input
value={input}
onChange={handleInputChange}
className="flex-1 border rounded-lg px-3 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
placeholder="메시지를 입력하세요..."
disabled={isLoading}
/>
<button
type="submit"
disabled={isLoading}
className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"
>
전송
</button>
</form>
</div>
)
}
파일 업로드 처리
// app/upload/page.tsx
'use client'
import { useState } from 'react'
export default function UploadPage() {
const [status, setStatus] = useState('')
async function handleUpload(e: React.FormEvent<HTMLFormElement>) {
e.preventDefault()
const formData = new FormData(e.currentTarget)
setStatus('업로드 중...')
const response = await fetch('/api/upload', {
method: 'POST',
body: formData,
})
if (response.ok) {
const data = await response.json()
setStatus(`완료: ${data.message}`)
} else {
setStatus('업로드 실패')
}
}
return (
<form onSubmit={handleUpload} className="p-4">
<input type="file" name="file" accept=".pdf,.txt,.md" />
<button type="submit" className="mt-2 bg-green-500 text-white px-4 py-2 rounded">
업로드
</button>
{status && <p className="mt-2 text-sm">{status}</p>}
</form>
)
}
5. 벡터 데이터베이스 연동
pgvector (PostgreSQL 확장)
PostgreSQL의 pgvector 확장을 사용하면 기존 데이터베이스에서 벡터 검색을 수행할 수 있습니다.
-- pgvector 확장 활성화
CREATE EXTENSION vector;
-- 임베딩 컬럼이 포함된 테이블 생성
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(1536),
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
-- HNSW 인덱스 생성 (빠른 근사 최근접 이웃 검색)
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);
# pgvector 사용 예
import asyncpg
import numpy as np
async def store_embedding(content: str, embedding: list):
conn = await asyncpg.connect(DATABASE_URL)
await conn.execute(
"INSERT INTO documents (content, embedding) VALUES ($1, $2)",
content, embedding
)
async def search_similar(query_embedding: list, k: int = 5):
conn = await asyncpg.connect(DATABASE_URL)
results = await conn.fetch(
"""SELECT content, 1 - (embedding <=> $1) as similarity
FROM documents
ORDER BY embedding <=> $1
LIMIT $2""",
query_embedding, k
)
return results
Chroma DB (로컬 개발용)
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
collection_name="my_documents",
embedding_function=embeddings,
persist_directory="./chroma_db"
)
# 문서 추가
vectorstore.add_texts(
texts=["Python은 AI 개발에 널리 사용됩니다.", "FastAPI는 고성능 API 프레임워크입니다."],
metadatas=[{"source": "intro.txt"}, {"source": "framework.txt"}]
)
# 유사도 검색
results = vectorstore.similarity_search("API 개발", k=3)
6. 인증 및 보안
JWT 토큰 인증
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str):
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
return username
프롬프트 인젝션 방어
import re
INJECTION_PATTERNS = [
r"ignore previous instructions",
r"disregard all prior",
r"you are now",
r"act as",
r"pretend you are",
]
def sanitize_input(user_input: str) -> str:
lower_input = user_input.lower()
for pattern in INJECTION_PATTERNS:
if re.search(pattern, lower_input):
raise HTTPException(
status_code=400,
detail="잠재적으로 유해한 입력이 감지되었습니다."
)
# 최대 길이 제한
if len(user_input) > 4000:
raise HTTPException(status_code=400, detail="입력이 너무 깁니다.")
return user_input.strip()
Rate Limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/api/chat")
@limiter.limit("10/minute")
async def chat(request: Request, chat_request: ChatRequest):
# 분당 10회 제한
pass
7. 멀티모달 입력 처리
이미지 분석 (GPT-4o Vision)
import base64
from pathlib import Path
async def analyze_image(image_path: str, question: str) -> str:
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
ext = Path(image_path).suffix.lower()
mime_map = {".jpg": "image/jpeg", ".png": "image/png", ".gif": "image/gif"}
media_type = mime_map.get(ext, "image/jpeg")
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{image_data}"
},
},
{"type": "text", "text": question}
],
}
],
)
return response.choices[0].message.content
Whisper API로 음성 처리
async def transcribe_audio(audio_file_path: str) -> str:
with open(audio_file_path, "rb") as audio_file:
transcript = await client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="ko"
)
return transcript.text
8. 성능 최적화
Redis 응답 캐싱
import redis
import json
import hashlib
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
def get_cache_key(messages: list) -> str:
content = json.dumps(messages, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
async def cached_chat(messages: list) -> str:
cache_key = get_cache_key(messages)
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
result = response.choices[0].message.content
# 1시간 TTL로 캐싱
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
연결 풀링
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
echo=False,
)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
9. Docker Compose 배포
docker-compose.yml
version: '3.8'
services:
backend:
build: ./backend
ports:
- '8000:8000'
environment:
- OPENAI_API_KEY=your_key
- DATABASE_URL=postgresql+asyncpg://user:pass@db/aiapp
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
restart: unless-stopped
frontend:
build: ./frontend
ports:
- '3000:3000'
environment:
- NEXT_PUBLIC_API_URL=http://backend:8000
depends_on:
- backend
restart: unless-stopped
db:
image: pgvector/pgvector:pg16
environment:
- POSTGRES_DB=aiapp
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
restart: unless-stopped
volumes:
postgres_data:
redis_data:
backend/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
frontend/Dockerfile
FROM node:20-alpine AS builder
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY . .
RUN npm run build
FROM node:20-alpine AS runner
WORKDIR /app
COPY /app/.next/standalone ./
COPY /app/.next/static ./.next/static
EXPOSE 3000
CMD ["node", "server.js"]
배포 명령
# 빌드 및 시작
docker-compose up --build -d
# 로그 확인
docker-compose logs -f backend
# 스케일 아웃 (백엔드 3개 인스턴스)
docker-compose up --scale backend=3 -d
# 중지
docker-compose down
10. 퀴즈: 핵심 개념 확인
Q1. SSE(Server-Sent Events)를 AI 스트리밍에 사용하는 이유는?
정답: LLM이 토큰을 생성하는 즉시 클라이언트로 전달하여 사용자가 응답을 기다리는 동안 텍스트가 실시간으로 보이도록 하기 위해서입니다.
설명: WebSocket보다 단순하며 HTTP를 통한 단방향 서버에서 클라이언트로의 스트리밍에 최적입니다. FastAPI의 StreamingResponse와 프론트엔드의 EventSource API가 이 패턴을 지원합니다.
Q2. RAG(Retrieval-Augmented Generation)의 핵심 이점은?
정답: LLM의 훈련 데이터 한계를 극복하고 최신 또는 도메인 특화 정보를 제공할 수 있습니다.
설명: 모델을 재훈련하지 않고도 외부 문서를 검색하여 컨텍스트에 포함시킵니다. 벡터 유사도 검색으로 관련 문서를 찾고, 이를 프롬프트에 주입하여 정확한 답변을 생성합니다. 환각(hallucination) 현상을 줄이는 효과도 있습니다.
Q3. FastAPI에서 Pydantic 모델을 사용하는 주요 이유는?
정답: 요청 및 응답 데이터의 자동 검증, 직렬화, API 문서 자동 생성을 위해서입니다.
설명: Pydantic은 Python 타입 힌트를 기반으로 런타임에 데이터를 검증합니다. FastAPI는 이를 활용해 OpenAPI(Swagger) 문서를 자동으로 생성하며, 잘못된 입력에 대해 명확한 오류 메시지를 반환합니다.
Q4. 벡터 데이터베이스에서 HNSW 인덱스를 사용하는 이유는?
정답: 고차원 벡터 공간에서 근사 최근접 이웃(ANN) 검색을 빠르게 수행하기 위해서입니다.
설명: 수백만 개의 벡터를 브루트 포스 방식으로 비교하면 너무 느립니다. HNSW(Hierarchical Navigable Small World)는 계층적 그래프 구조로 검색 속도를 크게 향상시키면서 높은 정확도를 유지합니다. pgvector, Chroma, Weaviate 등이 지원합니다.
Q5. LangChain의 ConversationBufferWindowMemory에서 k 파라미터의 역할은?
정답: 컨텍스트 창에 유지할 최근 대화 턴(turn)의 수를 지정합니다.
설명: LLM은 토큰 한계가 있으므로 모든 대화 기록을 보낼 수 없습니다. k=10이면 가장 최근 10번의 사용자-AI 교환을 유지하고 이전 내용은 삭제합니다. 메모리 비용과 컨텍스트 유지 간의 균형을 맞추는 중요한 파라미터입니다.
참고 자료
Full-Stack AI App Development: Build LLM Services with FastAPI + Next.js
Overview
Building modern AI applications goes far beyond simple API calls. You need to integrate streaming, multimodal inputs, RAG (Retrieval-Augmented Generation), and robust authentication into a cohesive product. This guide walks through building a production-grade LLM service using FastAPI on the backend and Next.js on the frontend.
1. AI App Architecture Design
The Three-Layer Architecture
A modern AI application is built around three distinct layers:
- Frontend Layer: Next.js App Router, Vercel AI SDK, streaming UI components
- Backend Layer: FastAPI, LangChain, auth middleware, caching
- AI/Data Layer: OpenAI/Claude APIs, vector databases, embedding models
This separation of concerns makes the system easier to scale, test, and maintain independently.
Streaming vs Batch Processing
Two primary approaches exist for handling LLM responses.
Streaming sends tokens to the client as they are generated. It creates a much better perceived performance and is ideal for conversational interfaces. Implemented via Server-Sent Events (SSE) or WebSockets.
Batch processing waits for the full response before returning it. This suits document processing, data analysis pipelines, and background jobs. Typically orchestrated with Celery and Redis queues.
Project Folder Structure
ai-app/
├── backend/
│ ├── app/
│ │ ├── main.py
│ │ ├── routers/
│ │ │ ├── chat.py
│ │ │ └── documents.py
│ │ ├── services/
│ │ │ ├── llm_service.py
│ │ │ └── vector_service.py
│ │ └── models/
│ │ └── schemas.py
│ ├── requirements.txt
│ └── Dockerfile
├── frontend/
│ ├── app/
│ │ ├── chat/
│ │ │ └── page.tsx
│ │ └── api/
│ │ └── chat/
│ │ └── route.ts
│ ├── components/
│ └── package.json
└── docker-compose.yml
2. FastAPI Backend
Installation
pip install fastapi uvicorn openai langchain langchain-openai python-dotenv
Pydantic Models for Request/Response Validation
# app/models/schemas.py
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum
class Role(str, Enum):
user = "user"
assistant = "assistant"
system = "system"
class Message(BaseModel):
role: Role
content: str
class ChatRequest(BaseModel):
messages: List[Message]
model: str = Field(default="gpt-4o-mini")
temperature: float = Field(default=0.7, ge=0, le=2)
max_tokens: Optional[int] = Field(default=None)
class ChatResponse(BaseModel):
content: str
usage: dict
Async Streaming Endpoint
FastAPI's StreamingResponse pushes LLM tokens to the client in real time as they are produced.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI
from app.models.schemas import ChatRequest
app = FastAPI(title="AI App Backend")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_methods=["*"],
allow_headers=["*"],
)
client = AsyncOpenAI()
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
stream = await client.chat.completions.create(
model=request.model,
messages=[m.dict() for m in request.messages],
stream=True,
temperature=request.temperature,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield f"data: {delta}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Dependency Injection for Auth
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired."
)
@app.post("/api/chat/secure")
async def secure_chat(request: ChatRequest, user=Depends(verify_token)):
# Only accessible to authenticated users
pass
3. LangChain Integration
Conversation Chain with Memory
LangChain simplifies memory management, chain composition, and tool integration.
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
memory = ConversationBufferWindowMemory(k=10)
template = """You are a helpful AI assistant.
Current conversation:
{history}
Human: {input}
AI:"""
prompt = PromptTemplate(
input_variables=["history", "input"],
template=template
)
chain = ConversationChain(llm=llm, memory=memory, prompt=prompt)
response = chain.predict(input="Hello, I am a Python developer.")
Building a RAG Pipeline
RAG (Retrieval-Augmented Generation) augments LLM responses by searching external documents and injecting relevant context into the prompt.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
# Load and chunk documents
loader = PyPDFLoader("document.pdf")
documents = loader.load()
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = splitter.split_documents(documents)
# Create vector store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings)
# Build RAG chain
llm = ChatOpenAI(model="gpt-4o")
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
answer = qa_chain.invoke({"query": "What are the key points in the document?"})
Creating Custom Tools
from langchain.tools import tool
from langchain.agents import initialize_agent, AgentType
@tool
def search_database(query: str) -> str:
"""Search the database for information. The query parameter is the keyword to search."""
results = db.search(query)
return str(results)
@tool
def get_weather(city: str) -> str:
"""Get the current weather for a specific city."""
response = requests.get(f"https://api.weather.com/v1/{city}")
return response.json()["description"]
llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [search_database, get_weather]
agent = initialize_agent(tools, llm, agent=AgentType.OPENAI_FUNCTIONS)
4. Next.js Frontend
Streaming Chat with Vercel AI SDK
The Vercel AI SDK is the official library for implementing AI streaming in Next.js.
npm install ai @ai-sdk/openai react-markdown
// app/api/chat/route.ts
import { openai } from '@ai-sdk/openai'
import { streamText } from 'ai'
export async function POST(req: Request) {
const { messages } = await req.json()
const result = await streamText({
model: openai('gpt-4o-mini'),
messages,
system: 'You are a helpful and friendly AI assistant.',
})
return result.toDataStreamResponse()
}
Chat UI Component
// app/chat/page.tsx
'use client'
import { useChat } from 'ai/react'
import ReactMarkdown from 'react-markdown'
export default function ChatPage() {
const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
api: '/api/chat',
})
return (
<div className="flex flex-col h-screen max-w-2xl mx-auto">
<header className="p-4 border-b font-semibold text-lg">
AI Assistant
</header>
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{messages.map(m => (
<div
key={m.id}
className={`flex ${m.role === 'user' ? 'justify-end' : 'justify-start'}`}
>
<div
className={`max-w-xs rounded-lg p-3 ${
m.role === 'user'
? 'bg-blue-500 text-white'
: 'bg-gray-100 text-gray-800'
}`}
>
<ReactMarkdown>{m.content}</ReactMarkdown>
</div>
</div>
))}
{isLoading && (
<div className="flex justify-start">
<div className="bg-gray-100 rounded-lg p-3 text-gray-500">
Generating response...
</div>
</div>
)}
</div>
<form onSubmit={handleSubmit} className="p-4 border-t flex gap-2">
<input
value={input}
onChange={handleInputChange}
className="flex-1 border rounded-lg px-3 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
placeholder="Type your message..."
disabled={isLoading}
/>
<button
type="submit"
disabled={isLoading}
className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"
>
Send
</button>
</form>
</div>
)
}
File Upload Handling
// app/upload/page.tsx
'use client'
import { useState } from 'react'
export default function UploadPage() {
const [status, setStatus] = useState('')
async function handleUpload(e: React.FormEvent<HTMLFormElement>) {
e.preventDefault()
const formData = new FormData(e.currentTarget)
setStatus('Uploading...')
const response = await fetch('/api/upload', {
method: 'POST',
body: formData,
})
if (response.ok) {
const data = await response.json()
setStatus(`Done: ${data.message}`)
} else {
setStatus('Upload failed')
}
}
return (
<form onSubmit={handleUpload} className="p-4">
<input type="file" name="file" accept=".pdf,.txt,.md" />
<button type="submit" className="mt-2 bg-green-500 text-white px-4 py-2 rounded">
Upload
</button>
{status && <p className="mt-2 text-sm">{status}</p>}
</form>
)
}
5. Vector Database Integration
pgvector (PostgreSQL Extension)
Using the pgvector extension allows vector search within your existing PostgreSQL database.
-- Enable the pgvector extension
CREATE EXTENSION vector;
-- Create a table with an embedding column
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(1536),
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
-- Create HNSW index for fast approximate nearest neighbor search
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);
import asyncpg
import numpy as np
async def store_embedding(content: str, embedding: list):
conn = await asyncpg.connect(DATABASE_URL)
await conn.execute(
"INSERT INTO documents (content, embedding) VALUES ($1, $2)",
content, embedding
)
async def search_similar(query_embedding: list, k: int = 5):
conn = await asyncpg.connect(DATABASE_URL)
results = await conn.fetch(
"""SELECT content, 1 - (embedding <=> $1) as similarity
FROM documents
ORDER BY embedding <=> $1
LIMIT $2""",
query_embedding, k
)
return results
Chroma DB (Local Development)
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
collection_name="my_documents",
embedding_function=embeddings,
persist_directory="./chroma_db"
)
# Add documents
vectorstore.add_texts(
texts=["Python is widely used for AI development.", "FastAPI is a high-performance API framework."],
metadatas=[{"source": "intro.txt"}, {"source": "framework.txt"}]
)
# Similarity search
results = vectorstore.similarity_search("API development", k=3)
6. Authentication and Security
JWT Token Authentication
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str):
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
return username
Prompt Injection Defense
import re
INJECTION_PATTERNS = [
r"ignore previous instructions",
r"disregard all prior",
r"you are now",
r"act as",
r"pretend you are",
]
def sanitize_input(user_input: str) -> str:
lower_input = user_input.lower()
for pattern in INJECTION_PATTERNS:
if re.search(pattern, lower_input):
raise HTTPException(
status_code=400,
detail="Potentially harmful input detected."
)
if len(user_input) > 4000:
raise HTTPException(status_code=400, detail="Input is too long.")
return user_input.strip()
Rate Limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/api/chat")
@limiter.limit("10/minute")
async def chat(request: Request, chat_request: ChatRequest):
# Limited to 10 requests per minute per IP
pass
7. Multimodal Input Processing
Image Analysis with GPT-4o Vision
import base64
from pathlib import Path
async def analyze_image(image_path: str, question: str) -> str:
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
ext = Path(image_path).suffix.lower()
mime_map = {".jpg": "image/jpeg", ".png": "image/png", ".gif": "image/gif"}
media_type = mime_map.get(ext, "image/jpeg")
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{image_data}"
},
},
{"type": "text", "text": question}
],
}
],
)
return response.choices[0].message.content
Audio Transcription with Whisper
async def transcribe_audio(audio_file_path: str) -> str:
with open(audio_file_path, "rb") as audio_file:
transcript = await client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="en"
)
return transcript.text
8. Performance Optimization
Redis Response Caching
import redis
import json
import hashlib
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
def get_cache_key(messages: list) -> str:
content = json.dumps(messages, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
async def cached_chat(messages: list) -> str:
cache_key = get_cache_key(messages)
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
result = response.choices[0].message.content
# Cache with 1-hour TTL
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
Database Connection Pooling
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
echo=False,
)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
9. Docker Compose Deployment
docker-compose.yml
version: '3.8'
services:
backend:
build: ./backend
ports:
- '8000:8000'
environment:
- OPENAI_API_KEY=your_key
- DATABASE_URL=postgresql+asyncpg://user:pass@db/aiapp
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
restart: unless-stopped
frontend:
build: ./frontend
ports:
- '3000:3000'
environment:
- NEXT_PUBLIC_API_URL=http://backend:8000
depends_on:
- backend
restart: unless-stopped
db:
image: pgvector/pgvector:pg16
environment:
- POSTGRES_DB=aiapp
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
restart: unless-stopped
volumes:
postgres_data:
redis_data:
Backend Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Frontend Dockerfile
FROM node:20-alpine AS builder
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY . .
RUN npm run build
FROM node:20-alpine AS runner
WORKDIR /app
COPY /app/.next/standalone ./
COPY /app/.next/static ./.next/static
EXPOSE 3000
CMD ["node", "server.js"]
Deployment Commands
# Build and start
docker-compose up --build -d
# View logs
docker-compose logs -f backend
# Scale out (3 backend instances)
docker-compose up --scale backend=3 -d
# Stop
docker-compose down
10. Quiz: Check Your Understanding
Q1. Why is SSE (Server-Sent Events) preferred for AI streaming over WebSockets?
Answer: SSE is simpler to implement and is purpose-built for unidirectional server-to-client data push over standard HTTP, which perfectly matches the LLM token streaming pattern.
Explanation: WebSockets support full-duplex communication but introduce extra complexity (upgrade handshake, connection management). For streaming LLM output, you only need one direction of data flow. SSE reuses HTTP connections, works through proxies more reliably, and has automatic reconnection built in. FastAPI's StreamingResponse pairs naturally with the browser's EventSource API for this pattern.
Q2. What is the main advantage of RAG over fine-tuning an LLM?
Answer: RAG does not require retraining the model. It retrieves up-to-date or domain-specific documents at inference time and injects them as context, which is far cheaper and faster to update.
Explanation: Fine-tuning bakes knowledge into model weights and requires expensive GPU compute. RAG keeps knowledge outside the model in a vector database. You can update your knowledge base just by adding or removing documents, without touching the model. It also reduces hallucination by grounding answers in retrieved evidence.
Q3. What does the chunk_overlap parameter do in RecursiveCharacterTextSplitter?
Answer: It specifies how many characters from the end of one chunk are repeated at the start of the next chunk, preserving context across chunk boundaries.
Explanation: When a document is split into chunks, a sentence or concept can be cut in half at the boundary. Overlapping ensures that the surrounding context is present in adjacent chunks. For example, with chunk_size=1000 and chunk_overlap=200, each chunk shares 200 characters with its neighbor, reducing the chance of losing relevant context during retrieval.
Q4. Why is the HNSW index critical for production vector databases?
Answer: HNSW enables approximate nearest neighbor (ANN) search that scales to millions of vectors while remaining fast, unlike brute-force exact search which becomes prohibitively slow at scale.
Explanation: An exact cosine similarity scan over one million 1536-dimensional vectors (OpenAI embeddings) takes seconds per query. HNSW builds a hierarchical graph structure that dramatically narrows the search space. The tradeoff is a small accuracy loss (approximate rather than exact), which is acceptable for most retrieval tasks. pgvector, Chroma, Pinecone, and Weaviate all support HNSW or similar ANN algorithms.
Q5. What is the purpose of the k parameter in ConversationBufferWindowMemory?
Answer: It sets the number of recent conversation turns to retain in the context window passed to the LLM.
Explanation: LLMs have a finite token context limit. Keeping the entire conversation history would eventually exceed this limit and increase cost. Setting k=10 retains only the 10 most recent user-assistant exchanges. Earlier turns are dropped. This balances coherent multi-turn dialogue against token budget. For longer-term memory, consider summarization memory or a dedicated memory store.