Skip to content
Published on

Full-Stack AI App Development: Build LLM Services with FastAPI + Next.js

Authors

Overview

Building modern AI applications goes far beyond simple API calls. You need to integrate streaming, multimodal inputs, RAG (Retrieval-Augmented Generation), and robust authentication into a cohesive product. This guide walks through building a production-grade LLM service using FastAPI on the backend and Next.js on the frontend.


1. AI App Architecture Design

The Three-Layer Architecture

A modern AI application is built around three distinct layers:

  • Frontend Layer: Next.js App Router, Vercel AI SDK, streaming UI components
  • Backend Layer: FastAPI, LangChain, auth middleware, caching
  • AI/Data Layer: OpenAI/Claude APIs, vector databases, embedding models

This separation of concerns makes the system easier to scale, test, and maintain independently.

Streaming vs Batch Processing

Two primary approaches exist for handling LLM responses.

Streaming sends tokens to the client as they are generated. It creates a much better perceived performance and is ideal for conversational interfaces. Implemented via Server-Sent Events (SSE) or WebSockets.

Batch processing waits for the full response before returning it. This suits document processing, data analysis pipelines, and background jobs. Typically orchestrated with Celery and Redis queues.

Project Folder Structure

ai-app/
├── backend/
│   ├── app/
│   │   ├── main.py
│   │   ├── routers/
│   │   │   ├── chat.py
│   │   │   └── documents.py
│   │   ├── services/
│   │   │   ├── llm_service.py
│   │   │   └── vector_service.py
│   │   └── models/
│   │       └── schemas.py
│   ├── requirements.txt
│   └── Dockerfile
├── frontend/
│   ├── app/
│   │   ├── chat/
│   │   │   └── page.tsx
│   │   └── api/
│   │       └── chat/
│   │           └── route.ts
│   ├── components/
│   └── package.json
└── docker-compose.yml

2. FastAPI Backend

Installation

pip install fastapi uvicorn openai langchain langchain-openai python-dotenv

Pydantic Models for Request/Response Validation

# app/models/schemas.py
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum

class Role(str, Enum):
    user = "user"
    assistant = "assistant"
    system = "system"

class Message(BaseModel):
    role: Role
    content: str

class ChatRequest(BaseModel):
    messages: List[Message]
    model: str = Field(default="gpt-4o-mini")
    temperature: float = Field(default=0.7, ge=0, le=2)
    max_tokens: Optional[int] = Field(default=None)

class ChatResponse(BaseModel):
    content: str
    usage: dict

Async Streaming Endpoint

FastAPI's StreamingResponse pushes LLM tokens to the client in real time as they are produced.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI
from app.models.schemas import ChatRequest

app = FastAPI(title="AI App Backend")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_methods=["*"],
    allow_headers=["*"],
)

client = AsyncOpenAI()

@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        stream = await client.chat.completions.create(
            model=request.model,
            messages=[m.dict() for m in request.messages],
            stream=True,
            temperature=request.temperature,
        )
        async for chunk in stream:
            delta = chunk.choices[0].delta.content
            if delta:
                yield f"data: {delta}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Dependency Injection for Auth

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

security = HTTPBearer()

async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token has expired."
        )

@app.post("/api/chat/secure")
async def secure_chat(request: ChatRequest, user=Depends(verify_token)):
    # Only accessible to authenticated users
    pass

3. LangChain Integration

Conversation Chain with Memory

LangChain simplifies memory management, chain composition, and tool integration.

from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
memory = ConversationBufferWindowMemory(k=10)

template = """You are a helpful AI assistant.

Current conversation:
{history}
Human: {input}
AI:"""

prompt = PromptTemplate(
    input_variables=["history", "input"],
    template=template
)

chain = ConversationChain(llm=llm, memory=memory, prompt=prompt)
response = chain.predict(input="Hello, I am a Python developer.")

Building a RAG Pipeline

RAG (Retrieval-Augmented Generation) augments LLM responses by searching external documents and injecting relevant context into the prompt.

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader

# Load and chunk documents
loader = PyPDFLoader("document.pdf")
documents = loader.load()

splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)
chunks = splitter.split_documents(documents)

# Create vector store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings)

# Build RAG chain
llm = ChatOpenAI(model="gpt-4o")
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)

answer = qa_chain.invoke({"query": "What are the key points in the document?"})

Creating Custom Tools

from langchain.tools import tool
from langchain.agents import initialize_agent, AgentType

@tool
def search_database(query: str) -> str:
    """Search the database for information. The query parameter is the keyword to search."""
    results = db.search(query)
    return str(results)

@tool
def get_weather(city: str) -> str:
    """Get the current weather for a specific city."""
    response = requests.get(f"https://api.weather.com/v1/{city}")
    return response.json()["description"]

llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [search_database, get_weather]
agent = initialize_agent(tools, llm, agent=AgentType.OPENAI_FUNCTIONS)

4. Next.js Frontend

Streaming Chat with Vercel AI SDK

The Vercel AI SDK is the official library for implementing AI streaming in Next.js.

npm install ai @ai-sdk/openai react-markdown
// app/api/chat/route.ts
import { openai } from '@ai-sdk/openai'
import { streamText } from 'ai'

export async function POST(req: Request) {
  const { messages } = await req.json()

  const result = await streamText({
    model: openai('gpt-4o-mini'),
    messages,
    system: 'You are a helpful and friendly AI assistant.',
  })

  return result.toDataStreamResponse()
}

Chat UI Component

// app/chat/page.tsx
'use client'
import { useChat } from 'ai/react'
import ReactMarkdown from 'react-markdown'

export default function ChatPage() {
  const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
    api: '/api/chat',
  })

  return (
    <div className="flex flex-col h-screen max-w-2xl mx-auto">
      <header className="p-4 border-b font-semibold text-lg">
        AI Assistant
      </header>
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {messages.map(m => (
          <div
            key={m.id}
            className={`flex ${m.role === 'user' ? 'justify-end' : 'justify-start'}`}
          >
            <div
              className={`max-w-xs rounded-lg p-3 ${
                m.role === 'user'
                  ? 'bg-blue-500 text-white'
                  : 'bg-gray-100 text-gray-800'
              }`}
            >
              <ReactMarkdown>{m.content}</ReactMarkdown>
            </div>
          </div>
        ))}
        {isLoading && (
          <div className="flex justify-start">
            <div className="bg-gray-100 rounded-lg p-3 text-gray-500">
              Generating response...
            </div>
          </div>
        )}
      </div>
      <form onSubmit={handleSubmit} className="p-4 border-t flex gap-2">
        <input
          value={input}
          onChange={handleInputChange}
          className="flex-1 border rounded-lg px-3 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
          placeholder="Type your message..."
          disabled={isLoading}
        />
        <button
          type="submit"
          disabled={isLoading}
          className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"
        >
          Send
        </button>
      </form>
    </div>
  )
}

File Upload Handling

// app/upload/page.tsx
'use client'
import { useState } from 'react'

export default function UploadPage() {
  const [status, setStatus] = useState('')

  async function handleUpload(e: React.FormEvent<HTMLFormElement>) {
    e.preventDefault()
    const formData = new FormData(e.currentTarget)
    setStatus('Uploading...')

    const response = await fetch('/api/upload', {
      method: 'POST',
      body: formData,
    })

    if (response.ok) {
      const data = await response.json()
      setStatus(`Done: ${data.message}`)
    } else {
      setStatus('Upload failed')
    }
  }

  return (
    <form onSubmit={handleUpload} className="p-4">
      <input type="file" name="file" accept=".pdf,.txt,.md" />
      <button type="submit" className="mt-2 bg-green-500 text-white px-4 py-2 rounded">
        Upload
      </button>
      {status && <p className="mt-2 text-sm">{status}</p>}
    </form>
  )
}

5. Vector Database Integration

pgvector (PostgreSQL Extension)

Using the pgvector extension allows vector search within your existing PostgreSQL database.

-- Enable the pgvector extension
CREATE EXTENSION vector;

-- Create a table with an embedding column
CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    content TEXT,
    embedding vector(1536),
    metadata JSONB,
    created_at TIMESTAMP DEFAULT NOW()
);

-- Create HNSW index for fast approximate nearest neighbor search
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);
import asyncpg
import numpy as np

async def store_embedding(content: str, embedding: list):
    conn = await asyncpg.connect(DATABASE_URL)
    await conn.execute(
        "INSERT INTO documents (content, embedding) VALUES ($1, $2)",
        content, embedding
    )

async def search_similar(query_embedding: list, k: int = 5):
    conn = await asyncpg.connect(DATABASE_URL)
    results = await conn.fetch(
        """SELECT content, 1 - (embedding <=> $1) as similarity
           FROM documents
           ORDER BY embedding <=> $1
           LIMIT $2""",
        query_embedding, k
    )
    return results

Chroma DB (Local Development)

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
    collection_name="my_documents",
    embedding_function=embeddings,
    persist_directory="./chroma_db"
)

# Add documents
vectorstore.add_texts(
    texts=["Python is widely used for AI development.", "FastAPI is a high-performance API framework."],
    metadatas=[{"source": "intro.txt"}, {"source": "framework.txt"}]
)

# Similarity search
results = vectorstore.similarity_search("API development", k=3)

6. Authentication and Security

JWT Token Authentication

from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_access_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(token: str):
    payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    username: str = payload.get("sub")
    if username is None:
        raise HTTPException(status_code=401, detail="Invalid token")
    return username

Prompt Injection Defense

import re

INJECTION_PATTERNS = [
    r"ignore previous instructions",
    r"disregard all prior",
    r"you are now",
    r"act as",
    r"pretend you are",
]

def sanitize_input(user_input: str) -> str:
    lower_input = user_input.lower()
    for pattern in INJECTION_PATTERNS:
        if re.search(pattern, lower_input):
            raise HTTPException(
                status_code=400,
                detail="Potentially harmful input detected."
            )
    if len(user_input) > 4000:
        raise HTTPException(status_code=400, detail="Input is too long.")
    return user_input.strip()

Rate Limiting

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/api/chat")
@limiter.limit("10/minute")
async def chat(request: Request, chat_request: ChatRequest):
    # Limited to 10 requests per minute per IP
    pass

7. Multimodal Input Processing

Image Analysis with GPT-4o Vision

import base64
from pathlib import Path

async def analyze_image(image_path: str, question: str) -> str:
    with open(image_path, "rb") as f:
        image_data = base64.b64encode(f.read()).decode("utf-8")

    ext = Path(image_path).suffix.lower()
    mime_map = {".jpg": "image/jpeg", ".png": "image/png", ".gif": "image/gif"}
    media_type = mime_map.get(ext, "image/jpeg")

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:{media_type};base64,{image_data}"
                        },
                    },
                    {"type": "text", "text": question}
                ],
            }
        ],
    )
    return response.choices[0].message.content

Audio Transcription with Whisper

async def transcribe_audio(audio_file_path: str) -> str:
    with open(audio_file_path, "rb") as audio_file:
        transcript = await client.audio.transcriptions.create(
            model="whisper-1",
            file=audio_file,
            language="en"
        )
    return transcript.text

8. Performance Optimization

Redis Response Caching

import redis
import json
import hashlib

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

def get_cache_key(messages: list) -> str:
    content = json.dumps(messages, sort_keys=True)
    return hashlib.md5(content.encode()).hexdigest()

async def cached_chat(messages: list) -> str:
    cache_key = get_cache_key(messages)
    cached = redis_client.get(cache_key)

    if cached:
        return json.loads(cached)

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages
    )
    result = response.choices[0].message.content

    # Cache with 1-hour TTL
    redis_client.setex(cache_key, 3600, json.dumps(result))
    return result

Database Connection Pooling

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

engine = create_async_engine(
    DATABASE_URL,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,
    echo=False,
)

AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

9. Docker Compose Deployment

docker-compose.yml

version: '3.8'
services:
  backend:
    build: ./backend
    ports:
      - '8000:8000'
    environment:
      - OPENAI_API_KEY=your_key
      - DATABASE_URL=postgresql+asyncpg://user:pass@db/aiapp
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
    restart: unless-stopped

  frontend:
    build: ./frontend
    ports:
      - '3000:3000'
    environment:
      - NEXT_PUBLIC_API_URL=http://backend:8000
    depends_on:
      - backend
    restart: unless-stopped

  db:
    image: pgvector/pgvector:pg16
    environment:
      - POSTGRES_DB=aiapp
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    restart: unless-stopped

volumes:
  postgres_data:
  redis_data:

Backend Dockerfile

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

Frontend Dockerfile

FROM node:20-alpine AS builder

WORKDIR /app
COPY package*.json ./
RUN npm ci

COPY . .
RUN npm run build

FROM node:20-alpine AS runner
WORKDIR /app
COPY --from=builder /app/.next/standalone ./
COPY --from=builder /app/.next/static ./.next/static

EXPOSE 3000
CMD ["node", "server.js"]

Deployment Commands

# Build and start
docker-compose up --build -d

# View logs
docker-compose logs -f backend

# Scale out (3 backend instances)
docker-compose up --scale backend=3 -d

# Stop
docker-compose down

10. Quiz: Check Your Understanding

Q1. Why is SSE (Server-Sent Events) preferred for AI streaming over WebSockets?

Answer: SSE is simpler to implement and is purpose-built for unidirectional server-to-client data push over standard HTTP, which perfectly matches the LLM token streaming pattern.

Explanation: WebSockets support full-duplex communication but introduce extra complexity (upgrade handshake, connection management). For streaming LLM output, you only need one direction of data flow. SSE reuses HTTP connections, works through proxies more reliably, and has automatic reconnection built in. FastAPI's StreamingResponse pairs naturally with the browser's EventSource API for this pattern.

Q2. What is the main advantage of RAG over fine-tuning an LLM?

Answer: RAG does not require retraining the model. It retrieves up-to-date or domain-specific documents at inference time and injects them as context, which is far cheaper and faster to update.

Explanation: Fine-tuning bakes knowledge into model weights and requires expensive GPU compute. RAG keeps knowledge outside the model in a vector database. You can update your knowledge base just by adding or removing documents, without touching the model. It also reduces hallucination by grounding answers in retrieved evidence.

Q3. What does the chunk_overlap parameter do in RecursiveCharacterTextSplitter?

Answer: It specifies how many characters from the end of one chunk are repeated at the start of the next chunk, preserving context across chunk boundaries.

Explanation: When a document is split into chunks, a sentence or concept can be cut in half at the boundary. Overlapping ensures that the surrounding context is present in adjacent chunks. For example, with chunk_size=1000 and chunk_overlap=200, each chunk shares 200 characters with its neighbor, reducing the chance of losing relevant context during retrieval.

Q4. Why is the HNSW index critical for production vector databases?

Answer: HNSW enables approximate nearest neighbor (ANN) search that scales to millions of vectors while remaining fast, unlike brute-force exact search which becomes prohibitively slow at scale.

Explanation: An exact cosine similarity scan over one million 1536-dimensional vectors (OpenAI embeddings) takes seconds per query. HNSW builds a hierarchical graph structure that dramatically narrows the search space. The tradeoff is a small accuracy loss (approximate rather than exact), which is acceptable for most retrieval tasks. pgvector, Chroma, Pinecone, and Weaviate all support HNSW or similar ANN algorithms.

Q5. What is the purpose of the k parameter in ConversationBufferWindowMemory?

Answer: It sets the number of recent conversation turns to retain in the context window passed to the LLM.

Explanation: LLMs have a finite token context limit. Keeping the entire conversation history would eventually exceed this limit and increase cost. Setting k=10 retains only the 10 most recent user-assistant exchanges. Earlier turns are dropped. This balances coherent multi-turn dialogue against token budget. For longer-term memory, consider summarization memory or a dedicated memory store.


References