- Authors

- Name
- Youngju Kim
- @fjvbn20031
概要
現代のAIアプリケーション開発は、単純なAPI呼び出しを超えて、ストリーミング、マルチモーダル入力、RAG(Retrieval-Augmented Generation)などの複雑な技術を統合する必要があります。このガイドでは、FastAPIバックエンドとNext.jsフロントエンドを基盤に、プロダクションレベルのLLMサービスを構築する全プロセスを解説します。
1. AIアプリアーキテクチャ設計
現代のAIアプリの3層構造
現代的なAIアプリケーションは、3つのコアレイヤーで構成されます。
- フロントエンドレイヤー: Next.js App Router、Vercel AI SDK、ストリーミングUI
- バックエンドレイヤー: FastAPI、LangChain、認証ミドルウェア、キャッシング
- AI/データレイヤー: OpenAI/Claude API、ベクターデータベース、埋め込みモデル
この関心の分離により、各レイヤーを独立してスケール・テスト・保守できます。
ストリーミング vs バッチ処理
LLMレスポンスを処理する主要な2つのアプローチがあります。
ストリーミングはトークンが生成されるたびにクライアントへ送信する方式です。知覚されるレスポンス速度が大幅に改善され、会話型インターフェースに最適です。Server-Sent Events(SSE)またはWebSocketを使用します。
バッチ処理はレスポンス全体が完成してから一括返却する方式です。文書処理、データ分析パイプライン、バックグラウンドジョブに適しています。CeleryとRedisキューで管理します。
プロジェクトフォルダ構成
ai-app/
├── backend/
│ ├── app/
│ │ ├── main.py
│ │ ├── routers/
│ │ │ ├── chat.py
│ │ │ └── documents.py
│ │ ├── services/
│ │ │ ├── llm_service.py
│ │ │ └── vector_service.py
│ │ └── models/
│ │ └── schemas.py
│ ├── requirements.txt
│ └── Dockerfile
├── frontend/
│ ├── app/
│ │ ├── chat/
│ │ │ └── page.tsx
│ │ └── api/
│ │ └── chat/
│ │ └── route.ts
│ ├── components/
│ └── package.json
└── docker-compose.yml
2. FastAPIバックエンド構成
インストール
pip install fastapi uvicorn openai langchain langchain-openai python-dotenv
Pydanticモデルによるリクエスト/レスポンス検証
# app/models/schemas.py
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum
class Role(str, Enum):
user = "user"
assistant = "assistant"
system = "system"
class Message(BaseModel):
role: Role
content: str
class ChatRequest(BaseModel):
messages: List[Message]
model: str = Field(default="gpt-4o-mini")
temperature: float = Field(default=0.7, ge=0, le=2)
max_tokens: Optional[int] = Field(default=None)
class ChatResponse(BaseModel):
content: str
usage: dict
非同期ストリーミングエンドポイント
FastAPIのStreamingResponseを使用すると、LLMトークンをリアルタイムでクライアントに送信できます。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI
from app.models.schemas import ChatRequest
app = FastAPI(title="AI App Backend")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_methods=["*"],
allow_headers=["*"],
)
client = AsyncOpenAI()
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
stream = await client.chat.completions.create(
model=request.model,
messages=[m.dict() for m in request.messages],
stream=True,
temperature=request.temperature,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield f"data: {delta}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
依存性注入パターン
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="トークンが期限切れです。"
)
@app.post("/api/chat/secure")
async def secure_chat(request: ChatRequest, user=Depends(verify_token)):
# 認証されたユーザーのみアクセス可能
pass
3. LangChain統合
会話チェーンとメモリ管理
LangChainを使うと、メモリ管理、チェーン構成、ツール統合を簡単に実装できます。
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
memory = ConversationBufferWindowMemory(k=10)
template = """あなたは親切なAIアシスタントです。
現在の会話:
{history}
Human: {input}
AI:"""
prompt = PromptTemplate(
input_variables=["history", "input"],
template=template
)
chain = ConversationChain(llm=llm, memory=memory, prompt=prompt)
response = chain.predict(input="こんにちは、私はPython開発者です。")
RAGパイプライン構築
RAG(検索拡張生成)は外部文書を検索してLLMのレスポンス品質を高めるパターンです。
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
# 文書のロードとチャンク分割
loader = PyPDFLoader("document.pdf")
documents = loader.load()
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = splitter.split_documents(documents)
# ベクターストアの作成
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings)
# RAGチェーンの構成
llm = ChatOpenAI(model="gpt-4o")
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
answer = qa_chain.invoke({"query": "文書の主要な内容は何ですか?"})
カスタムツールの作成
from langchain.tools import tool
from langchain.agents import initialize_agent, AgentType
@tool
def search_database(query: str) -> str:
"""データベースから情報を検索します。queryは検索するキーワードです。"""
results = db.search(query)
return str(results)
@tool
def get_weather(city: str) -> str:
"""特定の都市の現在の天気を取得します。"""
response = requests.get(f"https://api.weather.com/v1/{city}")
return response.json()["description"]
llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [search_database, get_weather]
agent = initialize_agent(tools, llm, agent=AgentType.OPENAI_FUNCTIONS)
4. Next.jsフロントエンド
Vercel AI SDKでストリーミングチャット
Vercel AI SDKはNext.jsでAIストリーミングを実装する公式ライブラリです。
npm install ai @ai-sdk/openai react-markdown
// app/api/chat/route.ts
import { openai } from '@ai-sdk/openai'
import { streamText } from 'ai'
export async function POST(req: Request) {
const { messages } = await req.json()
const result = await streamText({
model: openai('gpt-4o-mini'),
messages,
system: 'あなたは親切で役立つAIアシスタントです。',
})
return result.toDataStreamResponse()
}
チャットUIコンポーネント
// app/chat/page.tsx
'use client'
import { useChat } from 'ai/react'
import ReactMarkdown from 'react-markdown'
export default function ChatPage() {
const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
api: '/api/chat',
})
return (
<div className="flex flex-col h-screen max-w-2xl mx-auto">
<header className="p-4 border-b font-semibold text-lg">
AIアシスタント
</header>
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{messages.map(m => (
<div
key={m.id}
className={`flex ${m.role === 'user' ? 'justify-end' : 'justify-start'}`}
>
<div
className={`max-w-xs rounded-lg p-3 ${
m.role === 'user'
? 'bg-blue-500 text-white'
: 'bg-gray-100 text-gray-800'
}`}
>
<ReactMarkdown>{m.content}</ReactMarkdown>
</div>
</div>
))}
{isLoading && (
<div className="flex justify-start">
<div className="bg-gray-100 rounded-lg p-3 text-gray-500">
回答を生成中...
</div>
</div>
)}
</div>
<form onSubmit={handleSubmit} className="p-4 border-t flex gap-2">
<input
value={input}
onChange={handleInputChange}
className="flex-1 border rounded-lg px-3 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
placeholder="メッセージを入力してください..."
disabled={isLoading}
/>
<button
type="submit"
disabled={isLoading}
className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"
>
送信
</button>
</form>
</div>
)
}
ファイルアップロード処理
// app/upload/page.tsx
'use client'
import { useState } from 'react'
export default function UploadPage() {
const [status, setStatus] = useState('')
async function handleUpload(e: React.FormEvent<HTMLFormElement>) {
e.preventDefault()
const formData = new FormData(e.currentTarget)
setStatus('アップロード中...')
const response = await fetch('/api/upload', {
method: 'POST',
body: formData,
})
if (response.ok) {
const data = await response.json()
setStatus(`完了: ${data.message}`)
} else {
setStatus('アップロードに失敗しました')
}
}
return (
<form onSubmit={handleUpload} className="p-4">
<input type="file" name="file" accept=".pdf,.txt,.md" />
<button type="submit" className="mt-2 bg-green-500 text-white px-4 py-2 rounded">
アップロード
</button>
{status && <p className="mt-2 text-sm">{status}</p>}
</form>
)
}
5. ベクターデータベース連携
pgvector(PostgreSQL拡張)
pgvector拡張を使用すると、既存のPostgreSQLデータベースでベクター検索が可能になります。
-- pgvector拡張を有効化
CREATE EXTENSION vector;
-- 埋め込みカラムを含むテーブル作成
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(1536),
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
-- 高速近似最近傍探索のHNSWインデックス作成
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);
import asyncpg
import numpy as np
async def store_embedding(content: str, embedding: list):
conn = await asyncpg.connect(DATABASE_URL)
await conn.execute(
"INSERT INTO documents (content, embedding) VALUES ($1, $2)",
content, embedding
)
async def search_similar(query_embedding: list, k: int = 5):
conn = await asyncpg.connect(DATABASE_URL)
results = await conn.fetch(
"""SELECT content, 1 - (embedding <=> $1) as similarity
FROM documents
ORDER BY embedding <=> $1
LIMIT $2""",
query_embedding, k
)
return results
Chroma DB(ローカル開発用)
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
collection_name="my_documents",
embedding_function=embeddings,
persist_directory="./chroma_db"
)
# 文書の追加
vectorstore.add_texts(
texts=["PythonはAI開発に広く使われています。", "FastAPIは高性能なAPIフレームワークです。"],
metadatas=[{"source": "intro.txt"}, {"source": "framework.txt"}]
)
# 類似度検索
results = vectorstore.similarity_search("API開発", k=3)
6. 認証とセキュリティ
JWTトークン認証
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str):
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="無効なトークン")
return username
プロンプトインジェクション対策
import re
INJECTION_PATTERNS = [
r"ignore previous instructions",
r"disregard all prior",
r"you are now",
r"act as",
r"pretend you are",
]
def sanitize_input(user_input: str) -> str:
lower_input = user_input.lower()
for pattern in INJECTION_PATTERNS:
if re.search(pattern, lower_input):
raise HTTPException(
status_code=400,
detail="潜在的に有害な入力が検出されました。"
)
if len(user_input) > 4000:
raise HTTPException(status_code=400, detail="入力が長すぎます。")
return user_input.strip()
レート制限
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/api/chat")
@limiter.limit("10/minute")
async def chat(request: Request, chat_request: ChatRequest):
# IPあたり1分間に10リクエスト制限
pass
7. マルチモーダル入力処理
GPT-4oによる画像分析
import base64
from pathlib import Path
async def analyze_image(image_path: str, question: str) -> str:
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
ext = Path(image_path).suffix.lower()
mime_map = {".jpg": "image/jpeg", ".png": "image/png", ".gif": "image/gif"}
media_type = mime_map.get(ext, "image/jpeg")
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{image_data}"
},
},
{"type": "text", "text": question}
],
}
],
)
return response.choices[0].message.content
Whisper APIによる音声文字起こし
async def transcribe_audio(audio_file_path: str) -> str:
with open(audio_file_path, "rb") as audio_file:
transcript = await client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="ja"
)
return transcript.text
8. パフォーマンス最適化
Redisによるレスポンスキャッシング
import redis
import json
import hashlib
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
def get_cache_key(messages: list) -> str:
content = json.dumps(messages, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
async def cached_chat(messages: list) -> str:
cache_key = get_cache_key(messages)
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
result = response.choices[0].message.content
# 1時間のTTLでキャッシング
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
コネクションプーリング
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
echo=False,
)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
9. Docker Composeデプロイ
docker-compose.yml
version: '3.8'
services:
backend:
build: ./backend
ports:
- '8000:8000'
environment:
- OPENAI_API_KEY=your_key
- DATABASE_URL=postgresql+asyncpg://user:pass@db/aiapp
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
restart: unless-stopped
frontend:
build: ./frontend
ports:
- '3000:3000'
environment:
- NEXT_PUBLIC_API_URL=http://backend:8000
depends_on:
- backend
restart: unless-stopped
db:
image: pgvector/pgvector:pg16
environment:
- POSTGRES_DB=aiapp
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
restart: unless-stopped
volumes:
postgres_data:
redis_data:
バックエンド Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
フロントエンド Dockerfile
FROM node:20-alpine AS builder
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY . .
RUN npm run build
FROM node:20-alpine AS runner
WORKDIR /app
COPY /app/.next/standalone ./
COPY /app/.next/static ./.next/static
EXPOSE 3000
CMD ["node", "server.js"]
デプロイコマンド
# ビルドと起動
docker-compose up --build -d
# ログ確認
docker-compose logs -f backend
# スケールアウト(バックエンド3インスタンス)
docker-compose up --scale backend=3 -d
# 停止
docker-compose down
10. クイズ: 理解度チェック
Q1. AIストリーミングにSSE(Server-Sent Events)を使う理由は?
正解: LLMがトークンを生成するたびにクライアントへ即座に送信することで、ユーザーがレスポンスを待つ間もテキストがリアルタイムで表示されるようにするためです。
解説: WebSocketより実装が単純で、HTTP経由のサーバーからクライアントへの単方向ストリーミングに最適です。FastAPIのStreamingResponseとブラウザのEventSource APIがこのパターンをサポートしています。自動再接続機能が組み込まれており、プロキシ経由でも安定して動作します。
Q2. ファインチューニングに対するRAGの主な利点は何ですか?
正解: モデルを再トレーニングする必要がなく、推論時に最新またはドメイン固有の文書を検索してコンテキストとして注入できるため、はるかに安価かつ迅速に更新できます。
解説: ファインチューニングはモデルの重みに知識を埋め込み、高コストのGPUコンピューティングが必要です。RAGはベクターデータベースにモデルの外部に知識を保持します。文書を追加・削除するだけでナレッジベースを更新でき、モデルに触れる必要がありません。また、根拠となる証拠に基づいて回答を生成するため、ハルシネーションも軽減されます。
Q3. RecursiveCharacterTextSplitterのchunk_overlapパラメータの役割は?
正解: あるチャンクの末尾から次のチャンクの先頭に繰り返す文字数を指定し、チャンク境界をまたいでコンテキストを保持します。
解説: 文書をチャンクに分割すると、境界でセンテンスや概念が切れる場合があります。オーバーラップにより、隣接するチャンクに周囲のコンテキストが含まれます。例えばchunk_size=1000、chunk_overlap=200の場合、各チャンクは隣のチャンクと200文字を共有し、検索時に関連コンテキストを失うリスクを減らします。
Q4. プロダクションのベクターデータベースでHNSWインデックスが重要な理由は?
正解: HNSWは近似最近傍(ANN)探索を可能にし、ブルートフォース完全一致探索では対応しきれないスケールでも高速に動作します。
解説: 100万件の1536次元ベクター(OpenAI埋め込み)をコサイン類似度でスキャンすると1クエリあたり数秒かかります。HNSWは階層的グラフ構造を構築して探索空間を大幅に絞り込みます。トレードオフは若干の精度低下(近似)ですが、ほとんどの検索タスクでは許容範囲です。pgvector、Chroma、Pinecone、WeaviateはいずれもHNSWまたは類似のANNアルゴリズムをサポートしています。
Q5. ConversationBufferWindowMemoryのkパラメータの目的は?
正解: LLMに渡すコンテキストウィンドウに保持する直近の会話ターン数を設定します。
解説: LLMには有限のトークンコンテキスト制限があります。会話履歴全体を保持すると最終的にこの制限を超え、コストも増大します。k=10に設定すると直近10回のユーザーとAIのやりとりのみを保持し、それ以前のターンは削除されます。長期記憶が必要な場合は、要約メモリや専用メモリストアの使用を検討してください。