Skip to content
Published on

AIアプリフルスタック開発ガイド: FastAPI + Next.jsでLLMサービスを構築

Authors

概要

現代のAIアプリケーション開発は、単純なAPI呼び出しを超えて、ストリーミング、マルチモーダル入力、RAG(Retrieval-Augmented Generation)などの複雑な技術を統合する必要があります。このガイドでは、FastAPIバックエンドとNext.jsフロントエンドを基盤に、プロダクションレベルのLLMサービスを構築する全プロセスを解説します。


1. AIアプリアーキテクチャ設計

現代のAIアプリの3層構造

現代的なAIアプリケーションは、3つのコアレイヤーで構成されます。

  • フロントエンドレイヤー: Next.js App Router、Vercel AI SDK、ストリーミングUI
  • バックエンドレイヤー: FastAPI、LangChain、認証ミドルウェア、キャッシング
  • AI/データレイヤー: OpenAI/Claude API、ベクターデータベース、埋め込みモデル

この関心の分離により、各レイヤーを独立してスケール・テスト・保守できます。

ストリーミング vs バッチ処理

LLMレスポンスを処理する主要な2つのアプローチがあります。

ストリーミングはトークンが生成されるたびにクライアントへ送信する方式です。知覚されるレスポンス速度が大幅に改善され、会話型インターフェースに最適です。Server-Sent Events(SSE)またはWebSocketを使用します。

バッチ処理はレスポンス全体が完成してから一括返却する方式です。文書処理、データ分析パイプライン、バックグラウンドジョブに適しています。CeleryとRedisキューで管理します。

プロジェクトフォルダ構成

ai-app/
├── backend/
│   ├── app/
│   │   ├── main.py
│   │   ├── routers/
│   │   │   ├── chat.py
│   │   │   └── documents.py
│   │   ├── services/
│   │   │   ├── llm_service.py
│   │   │   └── vector_service.py
│   │   └── models/
│   │       └── schemas.py
│   ├── requirements.txt
│   └── Dockerfile
├── frontend/
│   ├── app/
│   │   ├── chat/
│   │   │   └── page.tsx
│   │   └── api/
│   │       └── chat/
│   │           └── route.ts
│   ├── components/
│   └── package.json
└── docker-compose.yml

2. FastAPIバックエンド構成

インストール

pip install fastapi uvicorn openai langchain langchain-openai python-dotenv

Pydanticモデルによるリクエスト/レスポンス検証

# app/models/schemas.py
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum

class Role(str, Enum):
    user = "user"
    assistant = "assistant"
    system = "system"

class Message(BaseModel):
    role: Role
    content: str

class ChatRequest(BaseModel):
    messages: List[Message]
    model: str = Field(default="gpt-4o-mini")
    temperature: float = Field(default=0.7, ge=0, le=2)
    max_tokens: Optional[int] = Field(default=None)

class ChatResponse(BaseModel):
    content: str
    usage: dict

非同期ストリーミングエンドポイント

FastAPIのStreamingResponseを使用すると、LLMトークンをリアルタイムでクライアントに送信できます。

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI
from app.models.schemas import ChatRequest

app = FastAPI(title="AI App Backend")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_methods=["*"],
    allow_headers=["*"],
)

client = AsyncOpenAI()

@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        stream = await client.chat.completions.create(
            model=request.model,
            messages=[m.dict() for m in request.messages],
            stream=True,
            temperature=request.temperature,
        )
        async for chunk in stream:
            delta = chunk.choices[0].delta.content
            if delta:
                yield f"data: {delta}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

依存性注入パターン

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

security = HTTPBearer()

async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="トークンが期限切れです。"
        )

@app.post("/api/chat/secure")
async def secure_chat(request: ChatRequest, user=Depends(verify_token)):
    # 認証されたユーザーのみアクセス可能
    pass

3. LangChain統合

会話チェーンとメモリ管理

LangChainを使うと、メモリ管理、チェーン構成、ツール統合を簡単に実装できます。

from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
memory = ConversationBufferWindowMemory(k=10)

template = """あなたは親切なAIアシスタントです。

現在の会話:
{history}
Human: {input}
AI:"""

prompt = PromptTemplate(
    input_variables=["history", "input"],
    template=template
)

chain = ConversationChain(llm=llm, memory=memory, prompt=prompt)
response = chain.predict(input="こんにちは、私はPython開発者です。")

RAGパイプライン構築

RAG(検索拡張生成)は外部文書を検索してLLMのレスポンス品質を高めるパターンです。

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader

# 文書のロードとチャンク分割
loader = PyPDFLoader("document.pdf")
documents = loader.load()

splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)
chunks = splitter.split_documents(documents)

# ベクターストアの作成
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings)

# RAGチェーンの構成
llm = ChatOpenAI(model="gpt-4o")
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)

answer = qa_chain.invoke({"query": "文書の主要な内容は何ですか?"})

カスタムツールの作成

from langchain.tools import tool
from langchain.agents import initialize_agent, AgentType

@tool
def search_database(query: str) -> str:
    """データベースから情報を検索します。queryは検索するキーワードです。"""
    results = db.search(query)
    return str(results)

@tool
def get_weather(city: str) -> str:
    """特定の都市の現在の天気を取得します。"""
    response = requests.get(f"https://api.weather.com/v1/{city}")
    return response.json()["description"]

llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [search_database, get_weather]
agent = initialize_agent(tools, llm, agent=AgentType.OPENAI_FUNCTIONS)

4. Next.jsフロントエンド

Vercel AI SDKでストリーミングチャット

Vercel AI SDKはNext.jsでAIストリーミングを実装する公式ライブラリです。

npm install ai @ai-sdk/openai react-markdown
// app/api/chat/route.ts
import { openai } from '@ai-sdk/openai'
import { streamText } from 'ai'

export async function POST(req: Request) {
  const { messages } = await req.json()

  const result = await streamText({
    model: openai('gpt-4o-mini'),
    messages,
    system: 'あなたは親切で役立つAIアシスタントです。',
  })

  return result.toDataStreamResponse()
}

チャットUIコンポーネント

// app/chat/page.tsx
'use client'
import { useChat } from 'ai/react'
import ReactMarkdown from 'react-markdown'

export default function ChatPage() {
  const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
    api: '/api/chat',
  })

  return (
    <div className="flex flex-col h-screen max-w-2xl mx-auto">
      <header className="p-4 border-b font-semibold text-lg">
        AIアシスタント
      </header>
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {messages.map(m => (
          <div
            key={m.id}
            className={`flex ${m.role === 'user' ? 'justify-end' : 'justify-start'}`}
          >
            <div
              className={`max-w-xs rounded-lg p-3 ${
                m.role === 'user'
                  ? 'bg-blue-500 text-white'
                  : 'bg-gray-100 text-gray-800'
              }`}
            >
              <ReactMarkdown>{m.content}</ReactMarkdown>
            </div>
          </div>
        ))}
        {isLoading && (
          <div className="flex justify-start">
            <div className="bg-gray-100 rounded-lg p-3 text-gray-500">
              回答を生成中...
            </div>
          </div>
        )}
      </div>
      <form onSubmit={handleSubmit} className="p-4 border-t flex gap-2">
        <input
          value={input}
          onChange={handleInputChange}
          className="flex-1 border rounded-lg px-3 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
          placeholder="メッセージを入力してください..."
          disabled={isLoading}
        />
        <button
          type="submit"
          disabled={isLoading}
          className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"
        >
          送信
        </button>
      </form>
    </div>
  )
}

ファイルアップロード処理

// app/upload/page.tsx
'use client'
import { useState } from 'react'

export default function UploadPage() {
  const [status, setStatus] = useState('')

  async function handleUpload(e: React.FormEvent<HTMLFormElement>) {
    e.preventDefault()
    const formData = new FormData(e.currentTarget)
    setStatus('アップロード中...')

    const response = await fetch('/api/upload', {
      method: 'POST',
      body: formData,
    })

    if (response.ok) {
      const data = await response.json()
      setStatus(`完了: ${data.message}`)
    } else {
      setStatus('アップロードに失敗しました')
    }
  }

  return (
    <form onSubmit={handleUpload} className="p-4">
      <input type="file" name="file" accept=".pdf,.txt,.md" />
      <button type="submit" className="mt-2 bg-green-500 text-white px-4 py-2 rounded">
        アップロード
      </button>
      {status && <p className="mt-2 text-sm">{status}</p>}
    </form>
  )
}

5. ベクターデータベース連携

pgvector(PostgreSQL拡張)

pgvector拡張を使用すると、既存のPostgreSQLデータベースでベクター検索が可能になります。

-- pgvector拡張を有効化
CREATE EXTENSION vector;

-- 埋め込みカラムを含むテーブル作成
CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    content TEXT,
    embedding vector(1536),
    metadata JSONB,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 高速近似最近傍探索のHNSWインデックス作成
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);
import asyncpg
import numpy as np

async def store_embedding(content: str, embedding: list):
    conn = await asyncpg.connect(DATABASE_URL)
    await conn.execute(
        "INSERT INTO documents (content, embedding) VALUES ($1, $2)",
        content, embedding
    )

async def search_similar(query_embedding: list, k: int = 5):
    conn = await asyncpg.connect(DATABASE_URL)
    results = await conn.fetch(
        """SELECT content, 1 - (embedding <=> $1) as similarity
           FROM documents
           ORDER BY embedding <=> $1
           LIMIT $2""",
        query_embedding, k
    )
    return results

Chroma DB(ローカル開発用)

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
    collection_name="my_documents",
    embedding_function=embeddings,
    persist_directory="./chroma_db"
)

# 文書の追加
vectorstore.add_texts(
    texts=["PythonはAI開発に広く使われています。", "FastAPIは高性能なAPIフレームワークです。"],
    metadatas=[{"source": "intro.txt"}, {"source": "framework.txt"}]
)

# 類似度検索
results = vectorstore.similarity_search("API開発", k=3)

6. 認証とセキュリティ

JWTトークン認証

from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_access_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(token: str):
    payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    username: str = payload.get("sub")
    if username is None:
        raise HTTPException(status_code=401, detail="無効なトークン")
    return username

プロンプトインジェクション対策

import re

INJECTION_PATTERNS = [
    r"ignore previous instructions",
    r"disregard all prior",
    r"you are now",
    r"act as",
    r"pretend you are",
]

def sanitize_input(user_input: str) -> str:
    lower_input = user_input.lower()
    for pattern in INJECTION_PATTERNS:
        if re.search(pattern, lower_input):
            raise HTTPException(
                status_code=400,
                detail="潜在的に有害な入力が検出されました。"
            )
    if len(user_input) > 4000:
        raise HTTPException(status_code=400, detail="入力が長すぎます。")
    return user_input.strip()

レート制限

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/api/chat")
@limiter.limit("10/minute")
async def chat(request: Request, chat_request: ChatRequest):
    # IPあたり1分間に10リクエスト制限
    pass

7. マルチモーダル入力処理

GPT-4oによる画像分析

import base64
from pathlib import Path

async def analyze_image(image_path: str, question: str) -> str:
    with open(image_path, "rb") as f:
        image_data = base64.b64encode(f.read()).decode("utf-8")

    ext = Path(image_path).suffix.lower()
    mime_map = {".jpg": "image/jpeg", ".png": "image/png", ".gif": "image/gif"}
    media_type = mime_map.get(ext, "image/jpeg")

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:{media_type};base64,{image_data}"
                        },
                    },
                    {"type": "text", "text": question}
                ],
            }
        ],
    )
    return response.choices[0].message.content

Whisper APIによる音声文字起こし

async def transcribe_audio(audio_file_path: str) -> str:
    with open(audio_file_path, "rb") as audio_file:
        transcript = await client.audio.transcriptions.create(
            model="whisper-1",
            file=audio_file,
            language="ja"
        )
    return transcript.text

8. パフォーマンス最適化

Redisによるレスポンスキャッシング

import redis
import json
import hashlib

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

def get_cache_key(messages: list) -> str:
    content = json.dumps(messages, sort_keys=True)
    return hashlib.md5(content.encode()).hexdigest()

async def cached_chat(messages: list) -> str:
    cache_key = get_cache_key(messages)
    cached = redis_client.get(cache_key)

    if cached:
        return json.loads(cached)

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages
    )
    result = response.choices[0].message.content

    # 1時間のTTLでキャッシング
    redis_client.setex(cache_key, 3600, json.dumps(result))
    return result

コネクションプーリング

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

engine = create_async_engine(
    DATABASE_URL,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,
    echo=False,
)

AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

9. Docker Composeデプロイ

docker-compose.yml

version: '3.8'
services:
  backend:
    build: ./backend
    ports:
      - '8000:8000'
    environment:
      - OPENAI_API_KEY=your_key
      - DATABASE_URL=postgresql+asyncpg://user:pass@db/aiapp
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
    restart: unless-stopped

  frontend:
    build: ./frontend
    ports:
      - '3000:3000'
    environment:
      - NEXT_PUBLIC_API_URL=http://backend:8000
    depends_on:
      - backend
    restart: unless-stopped

  db:
    image: pgvector/pgvector:pg16
    environment:
      - POSTGRES_DB=aiapp
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    restart: unless-stopped

volumes:
  postgres_data:
  redis_data:

バックエンド Dockerfile

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

フロントエンド Dockerfile

FROM node:20-alpine AS builder

WORKDIR /app
COPY package*.json ./
RUN npm ci

COPY . .
RUN npm run build

FROM node:20-alpine AS runner
WORKDIR /app
COPY --from=builder /app/.next/standalone ./
COPY --from=builder /app/.next/static ./.next/static

EXPOSE 3000
CMD ["node", "server.js"]

デプロイコマンド

# ビルドと起動
docker-compose up --build -d

# ログ確認
docker-compose logs -f backend

# スケールアウト(バックエンド3インスタンス)
docker-compose up --scale backend=3 -d

# 停止
docker-compose down

10. クイズ: 理解度チェック

Q1. AIストリーミングにSSE(Server-Sent Events)を使う理由は?

正解: LLMがトークンを生成するたびにクライアントへ即座に送信することで、ユーザーがレスポンスを待つ間もテキストがリアルタイムで表示されるようにするためです。

解説: WebSocketより実装が単純で、HTTP経由のサーバーからクライアントへの単方向ストリーミングに最適です。FastAPIのStreamingResponseとブラウザのEventSource APIがこのパターンをサポートしています。自動再接続機能が組み込まれており、プロキシ経由でも安定して動作します。

Q2. ファインチューニングに対するRAGの主な利点は何ですか?

正解: モデルを再トレーニングする必要がなく、推論時に最新またはドメイン固有の文書を検索してコンテキストとして注入できるため、はるかに安価かつ迅速に更新できます。

解説: ファインチューニングはモデルの重みに知識を埋め込み、高コストのGPUコンピューティングが必要です。RAGはベクターデータベースにモデルの外部に知識を保持します。文書を追加・削除するだけでナレッジベースを更新でき、モデルに触れる必要がありません。また、根拠となる証拠に基づいて回答を生成するため、ハルシネーションも軽減されます。

Q3. RecursiveCharacterTextSplitterのchunk_overlapパラメータの役割は?

正解: あるチャンクの末尾から次のチャンクの先頭に繰り返す文字数を指定し、チャンク境界をまたいでコンテキストを保持します。

解説: 文書をチャンクに分割すると、境界でセンテンスや概念が切れる場合があります。オーバーラップにより、隣接するチャンクに周囲のコンテキストが含まれます。例えばchunk_size=1000、chunk_overlap=200の場合、各チャンクは隣のチャンクと200文字を共有し、検索時に関連コンテキストを失うリスクを減らします。

Q4. プロダクションのベクターデータベースでHNSWインデックスが重要な理由は?

正解: HNSWは近似最近傍(ANN)探索を可能にし、ブルートフォース完全一致探索では対応しきれないスケールでも高速に動作します。

解説: 100万件の1536次元ベクター(OpenAI埋め込み)をコサイン類似度でスキャンすると1クエリあたり数秒かかります。HNSWは階層的グラフ構造を構築して探索空間を大幅に絞り込みます。トレードオフは若干の精度低下(近似)ですが、ほとんどの検索タスクでは許容範囲です。pgvector、Chroma、Pinecone、WeaviateはいずれもHNSWまたは類似のANNアルゴリズムをサポートしています。

Q5. ConversationBufferWindowMemoryのkパラメータの目的は?

正解: LLMに渡すコンテキストウィンドウに保持する直近の会話ターン数を設定します。

解説: LLMには有限のトークンコンテキスト制限があります。会話履歴全体を保持すると最終的にこの制限を超え、コストも増大します。k=10に設定すると直近10回のユーザーとAIのやりとりのみを保持し、それ以前のターンは削除されます。長期記憶が必要な場合は、要約メモリや専用メモリストアの使用を検討してください。


参考資料