Skip to content

필사 모드: AIアプリフルスタック開発ガイド: FastAPI + Next.jsでLLMサービスを構築

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

概要

現代のAIアプリケーション開発は、単純なAPI呼び出しを超えて、ストリーミング、マルチモーダル入力、RAG(Retrieval-Augmented Generation)などの複雑な技術を統合する必要があります。このガイドでは、FastAPIバックエンドとNext.jsフロントエンドを基盤に、プロダクションレベルのLLMサービスを構築する全プロセスを解説します。

1. AIアプリアーキテクチャ設計

現代のAIアプリの3層構造

現代的なAIアプリケーションは、3つのコアレイヤーで構成されます。

- **フロントエンドレイヤー**: Next.js App Router、Vercel AI SDK、ストリーミングUI

- **バックエンドレイヤー**: FastAPI、LangChain、認証ミドルウェア、キャッシング

- **AI/データレイヤー**: OpenAI/Claude API、ベクターデータベース、埋め込みモデル

この関心の分離により、各レイヤーを独立してスケール・テスト・保守できます。

ストリーミング vs バッチ処理

LLMレスポンスを処理する主要な2つのアプローチがあります。

**ストリーミング**はトークンが生成されるたびにクライアントへ送信する方式です。知覚されるレスポンス速度が大幅に改善され、会話型インターフェースに最適です。Server-Sent Events(SSE)またはWebSocketを使用します。

**バッチ処理**はレスポンス全体が完成してから一括返却する方式です。文書処理、データ分析パイプライン、バックグラウンドジョブに適しています。CeleryとRedisキューで管理します。

プロジェクトフォルダ構成

ai-app/

├── backend/

│ ├── app/

│ │ ├── main.py

│ │ ├── routers/

│ │ │ ├── chat.py

│ │ │ └── documents.py

│ │ ├── services/

│ │ │ ├── llm_service.py

│ │ │ └── vector_service.py

│ │ └── models/

│ │ └── schemas.py

│ ├── requirements.txt

│ └── Dockerfile

├── frontend/

│ ├── app/

│ │ ├── chat/

│ │ │ └── page.tsx

│ │ └── api/

│ │ └── chat/

│ │ └── route.ts

│ ├── components/

│ └── package.json

└── docker-compose.yml

2. FastAPIバックエンド構成

インストール

pip install fastapi uvicorn openai langchain langchain-openai python-dotenv

Pydanticモデルによるリクエスト/レスポンス検証

app/models/schemas.py

from pydantic import BaseModel, Field

from typing import List, Optional

from enum import Enum

class Role(str, Enum):

user = "user"

assistant = "assistant"

system = "system"

class Message(BaseModel):

role: Role

content: str

class ChatRequest(BaseModel):

messages: List[Message]

model: str = Field(default="gpt-4o-mini")

temperature: float = Field(default=0.7, ge=0, le=2)

max_tokens: Optional[int] = Field(default=None)

class ChatResponse(BaseModel):

content: str

usage: dict

非同期ストリーミングエンドポイント

FastAPIの`StreamingResponse`を使用すると、LLMトークンをリアルタイムでクライアントに送信できます。

from fastapi import FastAPI

from fastapi.responses import StreamingResponse

from fastapi.middleware.cors import CORSMiddleware

from openai import AsyncOpenAI

from app.models.schemas import ChatRequest

app = FastAPI(title="AI App Backend")

app.add_middleware(

CORSMiddleware,

allow_origins=["http://localhost:3000"],

allow_methods=["*"],

allow_headers=["*"],

)

client = AsyncOpenAI()

@app.post("/api/chat/stream")

async def chat_stream(request: ChatRequest):

async def generate():

stream = await client.chat.completions.create(

model=request.model,

messages=[m.dict() for m in request.messages],

stream=True,

temperature=request.temperature,

)

async for chunk in stream:

delta = chunk.choices[0].delta.content

if delta:

yield f"data: {delta}\n\n"

yield "data: [DONE]\n\n"

return StreamingResponse(generate(), media_type="text/event-stream")

依存性注入パターン

from fastapi import Depends, HTTPException, status

from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):

token = credentials.credentials

try:

payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])

return payload

except jwt.ExpiredSignatureError:

raise HTTPException(

status_code=status.HTTP_401_UNAUTHORIZED,

detail="トークンが期限切れです。"

)

@app.post("/api/chat/secure")

async def secure_chat(request: ChatRequest, user=Depends(verify_token)):

認証されたユーザーのみアクセス可能

pass

3. LangChain統合

会話チェーンとメモリ管理

LangChainを使うと、メモリ管理、チェーン構成、ツール統合を簡単に実装できます。

from langchain_openai import ChatOpenAI

from langchain.memory import ConversationBufferWindowMemory

from langchain.chains import ConversationChain

from langchain.prompts import PromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)

memory = ConversationBufferWindowMemory(k=10)

template = """あなたは親切なAIアシスタントです。

現在の会話:

{history}

Human: {input}

AI:"""

prompt = PromptTemplate(

input_variables=["history", "input"],

template=template

)

chain = ConversationChain(llm=llm, memory=memory, prompt=prompt)

response = chain.predict(input="こんにちは、私はPython開発者です。")

RAGパイプライン構築

RAG(検索拡張生成)は外部文書を検索してLLMのレスポンス品質を高めるパターンです。

from langchain_openai import ChatOpenAI, OpenAIEmbeddings

from langchain_community.vectorstores import Chroma

from langchain.chains import RetrievalQA

from langchain.text_splitter import RecursiveCharacterTextSplitter

from langchain_community.document_loaders import PyPDFLoader

文書のロードとチャンク分割

loader = PyPDFLoader("document.pdf")

documents = loader.load()

splitter = RecursiveCharacterTextSplitter(

chunk_size=1000,

chunk_overlap=200

)

chunks = splitter.split_documents(documents)

ベクターストアの作成

embeddings = OpenAIEmbeddings()

vectorstore = Chroma.from_documents(chunks, embeddings)

RAGチェーンの構成

llm = ChatOpenAI(model="gpt-4o")

qa_chain = RetrievalQA.from_chain_type(

llm=llm,

chain_type="stuff",

retriever=vectorstore.as_retriever(search_kwargs={"k": 5})

)

answer = qa_chain.invoke({"query": "文書の主要な内容は何ですか?"})

カスタムツールの作成

from langchain.tools import tool

from langchain.agents import initialize_agent, AgentType

@tool

def search_database(query: str) -> str:

"""データベースから情報を検索します。queryは検索するキーワードです。"""

results = db.search(query)

return str(results)

@tool

def get_weather(city: str) -> str:

"""特定の都市の現在の天気を取得します。"""

response = requests.get(f"https://api.weather.com/v1/{city}")

return response.json()["description"]

llm = ChatOpenAI(model="gpt-4o", temperature=0)

tools = [search_database, get_weather]

agent = initialize_agent(tools, llm, agent=AgentType.OPENAI_FUNCTIONS)

4. Next.jsフロントエンド

Vercel AI SDKでストリーミングチャット

Vercel AI SDKはNext.jsでAIストリーミングを実装する公式ライブラリです。

npm install ai @ai-sdk/openai react-markdown

// app/api/chat/route.ts

export async function POST(req: Request) {

const { messages } = await req.json()

const result = await streamText({

model: openai('gpt-4o-mini'),

messages,

system: 'あなたは親切で役立つAIアシスタントです。',

})

return result.toDataStreamResponse()

}

チャットUIコンポーネント

// app/chat/page.tsx

'use client'

export default function ChatPage() {

const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({

api: '/api/chat',

})

return (

AIアシスタント

{messages.map(m => (

key={m.id}

className={`flex ${m.role === 'user' ? 'justify-end' : 'justify-start'}`}

>

className={`max-w-xs rounded-lg p-3 ${

m.role === 'user'

? 'bg-blue-500 text-white'

: 'bg-gray-100 text-gray-800'

}`}

>

))}

{isLoading && (

回答を生成中...

)}

value={input}

onChange={handleInputChange}

className="flex-1 border rounded-lg px-3 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"

placeholder="メッセージを入力してください..."

disabled={isLoading}

/>

type="submit"

disabled={isLoading}

className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"

>

送信

)

}

ファイルアップロード処理

// app/upload/page.tsx

'use client'

export default function UploadPage() {

const [status, setStatus] = useState('')

async function handleUpload(e: React.FormEvent<HTMLFormElement>) {

e.preventDefault()

const formData = new FormData(e.currentTarget)

setStatus('アップロード中...')

const response = await fetch('/api/upload', {

method: 'POST',

body: formData,

})

if (response.ok) {

const data = await response.json()

setStatus(`完了: ${data.message}`)

} else {

setStatus('アップロードに失敗しました')

}

}

return (

アップロード

{status && <p className="mt-2 text-sm">{status}</p>}

)

}

5. ベクターデータベース連携

pgvector(PostgreSQL拡張)

pgvector拡張を使用すると、既存のPostgreSQLデータベースでベクター検索が可能になります。

-- pgvector拡張を有効化

CREATE EXTENSION vector;

-- 埋め込みカラムを含むテーブル作成

CREATE TABLE documents (

id SERIAL PRIMARY KEY,

content TEXT,

embedding vector(1536),

metadata JSONB,

created_at TIMESTAMP DEFAULT NOW()

);

-- 高速近似最近傍探索のHNSWインデックス作成

CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);

async def store_embedding(content: str, embedding: list):

conn = await asyncpg.connect(DATABASE_URL)

await conn.execute(

"INSERT INTO documents (content, embedding) VALUES ($1, $2)",

content, embedding

)

async def search_similar(query_embedding: list, k: int = 5):

conn = await asyncpg.connect(DATABASE_URL)

results = await conn.fetch(

"""SELECT content, 1 - (embedding <=> $1) as similarity

FROM documents

ORDER BY embedding <=> $1

LIMIT $2""",

query_embedding, k

)

return results

Chroma DB(ローカル開発用)

from langchain_community.vectorstores import Chroma

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()

vectorstore = Chroma(

collection_name="my_documents",

embedding_function=embeddings,

persist_directory="./chroma_db"

)

文書の追加

vectorstore.add_texts(

texts=["PythonはAI開発に広く使われています。", "FastAPIは高性能なAPIフレームワークです。"],

metadatas=[{"source": "intro.txt"}, {"source": "framework.txt"}]

)

類似度検索

results = vectorstore.similarity_search("API開発", k=3)

6. 認証とセキュリティ

JWTトークン認証

from datetime import datetime, timedelta

from jose import JWTError, jwt

from passlib.context import CryptContext

SECRET_KEY = "your-secret-key"

ALGORITHM = "HS256"

ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_access_token(data: dict):

to_encode = data.copy()

expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

to_encode.update({"exp": expire})

return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(token: str):

payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])

username: str = payload.get("sub")

if username is None:

raise HTTPException(status_code=401, detail="無効なトークン")

return username

プロンプトインジェクション対策

INJECTION_PATTERNS = [

r"ignore previous instructions",

r"disregard all prior",

r"you are now",

r"act as",

r"pretend you are",

]

def sanitize_input(user_input: str) -> str:

lower_input = user_input.lower()

for pattern in INJECTION_PATTERNS:

if re.search(pattern, lower_input):

raise HTTPException(

status_code=400,

detail="潜在的に有害な入力が検出されました。"

)

if len(user_input) > 4000:

raise HTTPException(status_code=400, detail="入力が長すぎます。")

return user_input.strip()

レート制限

from slowapi import Limiter, _rate_limit_exceeded_handler

from slowapi.util import get_remote_address

from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)

app.state.limiter = limiter

app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/api/chat")

@limiter.limit("10/minute")

async def chat(request: Request, chat_request: ChatRequest):

IPあたり1分間に10リクエスト制限

pass

7. マルチモーダル入力処理

GPT-4oによる画像分析

from pathlib import Path

async def analyze_image(image_path: str, question: str) -> str:

with open(image_path, "rb") as f:

image_data = base64.b64encode(f.read()).decode("utf-8")

ext = Path(image_path).suffix.lower()

mime_map = {".jpg": "image/jpeg", ".png": "image/png", ".gif": "image/gif"}

media_type = mime_map.get(ext, "image/jpeg")

response = await client.chat.completions.create(

model="gpt-4o",

messages=[

{

"role": "user",

"content": [

{

"type": "image_url",

"image_url": {

"url": f"data:{media_type};base64,{image_data}"

},

},

{"type": "text", "text": question}

],

}

],

)

return response.choices[0].message.content

Whisper APIによる音声文字起こし

async def transcribe_audio(audio_file_path: str) -> str:

with open(audio_file_path, "rb") as audio_file:

transcript = await client.audio.transcriptions.create(

model="whisper-1",

file=audio_file,

language="ja"

)

return transcript.text

8. パフォーマンス最適化

Redisによるレスポンスキャッシング

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

def get_cache_key(messages: list) -> str:

content = json.dumps(messages, sort_keys=True)

return hashlib.md5(content.encode()).hexdigest()

async def cached_chat(messages: list) -> str:

cache_key = get_cache_key(messages)

cached = redis_client.get(cache_key)

if cached:

return json.loads(cached)

response = await client.chat.completions.create(

model="gpt-4o-mini",

messages=messages

)

result = response.choices[0].message.content

1時間のTTLでキャッシング

redis_client.setex(cache_key, 3600, json.dumps(result))

return result

コネクションプーリング

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

from sqlalchemy.orm import sessionmaker

engine = create_async_engine(

DATABASE_URL,

pool_size=10,

max_overflow=20,

pool_pre_ping=True,

echo=False,

)

AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db():

async with AsyncSessionLocal() as session:

try:

yield session

finally:

await session.close()

9. Docker Composeデプロイ

docker-compose.yml

version: '3.8'

services:

backend:

build: ./backend

ports:

- '8000:8000'

environment:

- OPENAI_API_KEY=your_key

- DATABASE_URL=postgresql+asyncpg://user:pass@db/aiapp

- REDIS_URL=redis://redis:6379

depends_on:

- db

- redis

restart: unless-stopped

frontend:

build: ./frontend

ports:

- '3000:3000'

environment:

- NEXT_PUBLIC_API_URL=http://backend:8000

depends_on:

- backend

restart: unless-stopped

db:

image: pgvector/pgvector:pg16

environment:

- POSTGRES_DB=aiapp

- POSTGRES_USER=user

- POSTGRES_PASSWORD=pass

volumes:

- postgres_data:/var/lib/postgresql/data

restart: unless-stopped

redis:

image: redis:7-alpine

volumes:

- redis_data:/data

restart: unless-stopped

volumes:

postgres_data:

redis_data:

バックエンド Dockerfile

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

フロントエンド Dockerfile

FROM node:20-alpine AS builder

WORKDIR /app

COPY package*.json ./

RUN npm ci

COPY . .

RUN npm run build

FROM node:20-alpine AS runner

WORKDIR /app

COPY --from=builder /app/.next/standalone ./

COPY --from=builder /app/.next/static ./.next/static

EXPOSE 3000

CMD ["node", "server.js"]

デプロイコマンド

ビルドと起動

docker-compose up --build -d

ログ確認

docker-compose logs -f backend

スケールアウト(バックエンド3インスタンス)

docker-compose up --scale backend=3 -d

停止

docker-compose down

10. クイズ: 理解度チェック

**正解**: LLMがトークンを生成するたびにクライアントへ即座に送信することで、ユーザーがレスポンスを待つ間もテキストがリアルタイムで表示されるようにするためです。

**解説**: WebSocketより実装が単純で、HTTP経由のサーバーからクライアントへの単方向ストリーミングに最適です。FastAPIの`StreamingResponse`とブラウザの`EventSource` APIがこのパターンをサポートしています。自動再接続機能が組み込まれており、プロキシ経由でも安定して動作します。

**正解**: モデルを再トレーニングする必要がなく、推論時に最新またはドメイン固有の文書を検索してコンテキストとして注入できるため、はるかに安価かつ迅速に更新できます。

**解説**: ファインチューニングはモデルの重みに知識を埋め込み、高コストのGPUコンピューティングが必要です。RAGはベクターデータベースにモデルの外部に知識を保持します。文書を追加・削除するだけでナレッジベースを更新でき、モデルに触れる必要がありません。また、根拠となる証拠に基づいて回答を生成するため、ハルシネーションも軽減されます。

**正解**: あるチャンクの末尾から次のチャンクの先頭に繰り返す文字数を指定し、チャンク境界をまたいでコンテキストを保持します。

**解説**: 文書をチャンクに分割すると、境界でセンテンスや概念が切れる場合があります。オーバーラップにより、隣接するチャンクに周囲のコンテキストが含まれます。例えばchunk_size=1000、chunk_overlap=200の場合、各チャンクは隣のチャンクと200文字を共有し、検索時に関連コンテキストを失うリスクを減らします。

**正解**: HNSWは近似最近傍(ANN)探索を可能にし、ブルートフォース完全一致探索では対応しきれないスケールでも高速に動作します。

**解説**: 100万件の1536次元ベクター(OpenAI埋め込み)をコサイン類似度でスキャンすると1クエリあたり数秒かかります。HNSWは階層的グラフ構造を構築して探索空間を大幅に絞り込みます。トレードオフは若干の精度低下(近似)ですが、ほとんどの検索タスクでは許容範囲です。pgvector、Chroma、Pinecone、WeaviateはいずれもHNSWまたは類似のANNアルゴリズムをサポートしています。

**正解**: LLMに渡すコンテキストウィンドウに保持する直近の会話ターン数を設定します。

**解説**: LLMには有限のトークンコンテキスト制限があります。会話履歴全体を保持すると最終的にこの制限を超え、コストも増大します。k=10に設定すると直近10回のユーザーとAIのやりとりのみを保持し、それ以前のターンは削除されます。長期記憶が必要な場合は、要約メモリや専用メモリストアの使用を検討してください。

参考資料

- [FastAPI公式ドキュメント](https://fastapi.tiangolo.com/)

- [Vercel AI SDKドキュメント](https://sdk.vercel.ai/docs)

- [LangChainドキュメント](https://python.langchain.com/docs/get_started/introduction)

- [Next.js App Routerドキュメント](https://nextjs.org/docs/app)

- [pgvector GitHub](https://github.com/pgvector/pgvector)

- [OpenAI APIリファレンス](https://platform.openai.com/docs/api-reference)

현재 단락 (1/451)

現代のAIアプリケーション開発は、単純なAPI呼び出しを超えて、ストリーミング、マルチモーダル入力、RAG(Retrieval-Augmented Generation)などの複雑な技術を統合する必要...

작성 글자: 0원문 글자: 14,234작성 단락: 0/451