- Authors

- Name
- Youngju Kim
- @fjvbn20031
Overview
Building modern AI applications goes far beyond simple API calls. You need to integrate streaming, multimodal inputs, RAG (Retrieval-Augmented Generation), and robust authentication into a cohesive product. This guide walks through building a production-grade LLM service using FastAPI on the backend and Next.js on the frontend.
1. AI App Architecture Design
The Three-Layer Architecture
A modern AI application is built around three distinct layers:
- Frontend Layer: Next.js App Router, Vercel AI SDK, streaming UI components
- Backend Layer: FastAPI, LangChain, auth middleware, caching
- AI/Data Layer: OpenAI/Claude APIs, vector databases, embedding models
This separation of concerns makes the system easier to scale, test, and maintain independently.
Streaming vs Batch Processing
Two primary approaches exist for handling LLM responses.
Streaming sends tokens to the client as they are generated. It creates a much better perceived performance and is ideal for conversational interfaces. Implemented via Server-Sent Events (SSE) or WebSockets.
Batch processing waits for the full response before returning it. This suits document processing, data analysis pipelines, and background jobs. Typically orchestrated with Celery and Redis queues.
Project Folder Structure
ai-app/
├── backend/
│ ├── app/
│ │ ├── main.py
│ │ ├── routers/
│ │ │ ├── chat.py
│ │ │ └── documents.py
│ │ ├── services/
│ │ │ ├── llm_service.py
│ │ │ └── vector_service.py
│ │ └── models/
│ │ └── schemas.py
│ ├── requirements.txt
│ └── Dockerfile
├── frontend/
│ ├── app/
│ │ ├── chat/
│ │ │ └── page.tsx
│ │ └── api/
│ │ └── chat/
│ │ └── route.ts
│ ├── components/
│ └── package.json
└── docker-compose.yml
2. FastAPI Backend
Installation
pip install fastapi uvicorn openai langchain langchain-openai python-dotenv
Pydantic Models for Request/Response Validation
# app/models/schemas.py
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum
class Role(str, Enum):
user = "user"
assistant = "assistant"
system = "system"
class Message(BaseModel):
role: Role
content: str
class ChatRequest(BaseModel):
messages: List[Message]
model: str = Field(default="gpt-4o-mini")
temperature: float = Field(default=0.7, ge=0, le=2)
max_tokens: Optional[int] = Field(default=None)
class ChatResponse(BaseModel):
content: str
usage: dict
Async Streaming Endpoint
FastAPI's StreamingResponse pushes LLM tokens to the client in real time as they are produced.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI
from app.models.schemas import ChatRequest
app = FastAPI(title="AI App Backend")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_methods=["*"],
allow_headers=["*"],
)
client = AsyncOpenAI()
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
stream = await client.chat.completions.create(
model=request.model,
messages=[m.dict() for m in request.messages],
stream=True,
temperature=request.temperature,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield f"data: {delta}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Dependency Injection for Auth
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired."
)
@app.post("/api/chat/secure")
async def secure_chat(request: ChatRequest, user=Depends(verify_token)):
# Only accessible to authenticated users
pass
3. LangChain Integration
Conversation Chain with Memory
LangChain simplifies memory management, chain composition, and tool integration.
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
memory = ConversationBufferWindowMemory(k=10)
template = """You are a helpful AI assistant.
Current conversation:
{history}
Human: {input}
AI:"""
prompt = PromptTemplate(
input_variables=["history", "input"],
template=template
)
chain = ConversationChain(llm=llm, memory=memory, prompt=prompt)
response = chain.predict(input="Hello, I am a Python developer.")
Building a RAG Pipeline
RAG (Retrieval-Augmented Generation) augments LLM responses by searching external documents and injecting relevant context into the prompt.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
# Load and chunk documents
loader = PyPDFLoader("document.pdf")
documents = loader.load()
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = splitter.split_documents(documents)
# Create vector store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings)
# Build RAG chain
llm = ChatOpenAI(model="gpt-4o")
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
answer = qa_chain.invoke({"query": "What are the key points in the document?"})
Creating Custom Tools
from langchain.tools import tool
from langchain.agents import initialize_agent, AgentType
@tool
def search_database(query: str) -> str:
"""Search the database for information. The query parameter is the keyword to search."""
results = db.search(query)
return str(results)
@tool
def get_weather(city: str) -> str:
"""Get the current weather for a specific city."""
response = requests.get(f"https://api.weather.com/v1/{city}")
return response.json()["description"]
llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [search_database, get_weather]
agent = initialize_agent(tools, llm, agent=AgentType.OPENAI_FUNCTIONS)
4. Next.js Frontend
Streaming Chat with Vercel AI SDK
The Vercel AI SDK is the official library for implementing AI streaming in Next.js.
npm install ai @ai-sdk/openai react-markdown
// app/api/chat/route.ts
import { openai } from '@ai-sdk/openai'
import { streamText } from 'ai'
export async function POST(req: Request) {
const { messages } = await req.json()
const result = await streamText({
model: openai('gpt-4o-mini'),
messages,
system: 'You are a helpful and friendly AI assistant.',
})
return result.toDataStreamResponse()
}
Chat UI Component
// app/chat/page.tsx
'use client'
import { useChat } from 'ai/react'
import ReactMarkdown from 'react-markdown'
export default function ChatPage() {
const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
api: '/api/chat',
})
return (
<div className="flex flex-col h-screen max-w-2xl mx-auto">
<header className="p-4 border-b font-semibold text-lg">
AI Assistant
</header>
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{messages.map(m => (
<div
key={m.id}
className={`flex ${m.role === 'user' ? 'justify-end' : 'justify-start'}`}
>
<div
className={`max-w-xs rounded-lg p-3 ${
m.role === 'user'
? 'bg-blue-500 text-white'
: 'bg-gray-100 text-gray-800'
}`}
>
<ReactMarkdown>{m.content}</ReactMarkdown>
</div>
</div>
))}
{isLoading && (
<div className="flex justify-start">
<div className="bg-gray-100 rounded-lg p-3 text-gray-500">
Generating response...
</div>
</div>
)}
</div>
<form onSubmit={handleSubmit} className="p-4 border-t flex gap-2">
<input
value={input}
onChange={handleInputChange}
className="flex-1 border rounded-lg px-3 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
placeholder="Type your message..."
disabled={isLoading}
/>
<button
type="submit"
disabled={isLoading}
className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"
>
Send
</button>
</form>
</div>
)
}
File Upload Handling
// app/upload/page.tsx
'use client'
import { useState } from 'react'
export default function UploadPage() {
const [status, setStatus] = useState('')
async function handleUpload(e: React.FormEvent<HTMLFormElement>) {
e.preventDefault()
const formData = new FormData(e.currentTarget)
setStatus('Uploading...')
const response = await fetch('/api/upload', {
method: 'POST',
body: formData,
})
if (response.ok) {
const data = await response.json()
setStatus(`Done: ${data.message}`)
} else {
setStatus('Upload failed')
}
}
return (
<form onSubmit={handleUpload} className="p-4">
<input type="file" name="file" accept=".pdf,.txt,.md" />
<button type="submit" className="mt-2 bg-green-500 text-white px-4 py-2 rounded">
Upload
</button>
{status && <p className="mt-2 text-sm">{status}</p>}
</form>
)
}
5. Vector Database Integration
pgvector (PostgreSQL Extension)
Using the pgvector extension allows vector search within your existing PostgreSQL database.
-- Enable the pgvector extension
CREATE EXTENSION vector;
-- Create a table with an embedding column
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(1536),
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
-- Create HNSW index for fast approximate nearest neighbor search
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);
import asyncpg
import numpy as np
async def store_embedding(content: str, embedding: list):
conn = await asyncpg.connect(DATABASE_URL)
await conn.execute(
"INSERT INTO documents (content, embedding) VALUES ($1, $2)",
content, embedding
)
async def search_similar(query_embedding: list, k: int = 5):
conn = await asyncpg.connect(DATABASE_URL)
results = await conn.fetch(
"""SELECT content, 1 - (embedding <=> $1) as similarity
FROM documents
ORDER BY embedding <=> $1
LIMIT $2""",
query_embedding, k
)
return results
Chroma DB (Local Development)
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
collection_name="my_documents",
embedding_function=embeddings,
persist_directory="./chroma_db"
)
# Add documents
vectorstore.add_texts(
texts=["Python is widely used for AI development.", "FastAPI is a high-performance API framework."],
metadatas=[{"source": "intro.txt"}, {"source": "framework.txt"}]
)
# Similarity search
results = vectorstore.similarity_search("API development", k=3)
6. Authentication and Security
JWT Token Authentication
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str):
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
return username
Prompt Injection Defense
import re
INJECTION_PATTERNS = [
r"ignore previous instructions",
r"disregard all prior",
r"you are now",
r"act as",
r"pretend you are",
]
def sanitize_input(user_input: str) -> str:
lower_input = user_input.lower()
for pattern in INJECTION_PATTERNS:
if re.search(pattern, lower_input):
raise HTTPException(
status_code=400,
detail="Potentially harmful input detected."
)
if len(user_input) > 4000:
raise HTTPException(status_code=400, detail="Input is too long.")
return user_input.strip()
Rate Limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/api/chat")
@limiter.limit("10/minute")
async def chat(request: Request, chat_request: ChatRequest):
# Limited to 10 requests per minute per IP
pass
7. Multimodal Input Processing
Image Analysis with GPT-4o Vision
import base64
from pathlib import Path
async def analyze_image(image_path: str, question: str) -> str:
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
ext = Path(image_path).suffix.lower()
mime_map = {".jpg": "image/jpeg", ".png": "image/png", ".gif": "image/gif"}
media_type = mime_map.get(ext, "image/jpeg")
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{image_data}"
},
},
{"type": "text", "text": question}
],
}
],
)
return response.choices[0].message.content
Audio Transcription with Whisper
async def transcribe_audio(audio_file_path: str) -> str:
with open(audio_file_path, "rb") as audio_file:
transcript = await client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="en"
)
return transcript.text
8. Performance Optimization
Redis Response Caching
import redis
import json
import hashlib
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
def get_cache_key(messages: list) -> str:
content = json.dumps(messages, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
async def cached_chat(messages: list) -> str:
cache_key = get_cache_key(messages)
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
result = response.choices[0].message.content
# Cache with 1-hour TTL
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
Database Connection Pooling
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
echo=False,
)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
9. Docker Compose Deployment
docker-compose.yml
version: '3.8'
services:
backend:
build: ./backend
ports:
- '8000:8000'
environment:
- OPENAI_API_KEY=your_key
- DATABASE_URL=postgresql+asyncpg://user:pass@db/aiapp
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
restart: unless-stopped
frontend:
build: ./frontend
ports:
- '3000:3000'
environment:
- NEXT_PUBLIC_API_URL=http://backend:8000
depends_on:
- backend
restart: unless-stopped
db:
image: pgvector/pgvector:pg16
environment:
- POSTGRES_DB=aiapp
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
restart: unless-stopped
volumes:
postgres_data:
redis_data:
Backend Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Frontend Dockerfile
FROM node:20-alpine AS builder
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY . .
RUN npm run build
FROM node:20-alpine AS runner
WORKDIR /app
COPY /app/.next/standalone ./
COPY /app/.next/static ./.next/static
EXPOSE 3000
CMD ["node", "server.js"]
Deployment Commands
# Build and start
docker-compose up --build -d
# View logs
docker-compose logs -f backend
# Scale out (3 backend instances)
docker-compose up --scale backend=3 -d
# Stop
docker-compose down
10. Quiz: Check Your Understanding
Q1. Why is SSE (Server-Sent Events) preferred for AI streaming over WebSockets?
Answer: SSE is simpler to implement and is purpose-built for unidirectional server-to-client data push over standard HTTP, which perfectly matches the LLM token streaming pattern.
Explanation: WebSockets support full-duplex communication but introduce extra complexity (upgrade handshake, connection management). For streaming LLM output, you only need one direction of data flow. SSE reuses HTTP connections, works through proxies more reliably, and has automatic reconnection built in. FastAPI's StreamingResponse pairs naturally with the browser's EventSource API for this pattern.
Q2. What is the main advantage of RAG over fine-tuning an LLM?
Answer: RAG does not require retraining the model. It retrieves up-to-date or domain-specific documents at inference time and injects them as context, which is far cheaper and faster to update.
Explanation: Fine-tuning bakes knowledge into model weights and requires expensive GPU compute. RAG keeps knowledge outside the model in a vector database. You can update your knowledge base just by adding or removing documents, without touching the model. It also reduces hallucination by grounding answers in retrieved evidence.
Q3. What does the chunk_overlap parameter do in RecursiveCharacterTextSplitter?
Answer: It specifies how many characters from the end of one chunk are repeated at the start of the next chunk, preserving context across chunk boundaries.
Explanation: When a document is split into chunks, a sentence or concept can be cut in half at the boundary. Overlapping ensures that the surrounding context is present in adjacent chunks. For example, with chunk_size=1000 and chunk_overlap=200, each chunk shares 200 characters with its neighbor, reducing the chance of losing relevant context during retrieval.
Q4. Why is the HNSW index critical for production vector databases?
Answer: HNSW enables approximate nearest neighbor (ANN) search that scales to millions of vectors while remaining fast, unlike brute-force exact search which becomes prohibitively slow at scale.
Explanation: An exact cosine similarity scan over one million 1536-dimensional vectors (OpenAI embeddings) takes seconds per query. HNSW builds a hierarchical graph structure that dramatically narrows the search space. The tradeoff is a small accuracy loss (approximate rather than exact), which is acceptable for most retrieval tasks. pgvector, Chroma, Pinecone, and Weaviate all support HNSW or similar ANN algorithms.
Q5. What is the purpose of the k parameter in ConversationBufferWindowMemory?
Answer: It sets the number of recent conversation turns to retain in the context window passed to the LLM.
Explanation: LLMs have a finite token context limit. Keeping the entire conversation history would eventually exceed this limit and increase cost. Setting k=10 retains only the 10 most recent user-assistant exchanges. Earlier turns are dropped. This balances coherent multi-turn dialogue against token budget. For longer-term memory, consider summarization memory or a dedicated memory store.