Skip to content
Published on

ディープラーニング学習手法完全ガイド: 最適化から分散学習まで

Authors

はじめに

過去10年間で、ディープラーニングはコンピュータビジョン、自然言語処理、音声認識、強化学習など、ほぼすべてのAI分野で革命的な成果を達成しました。しかし、単にニューラルネットワークアーキテクチャを設計するだけでは、高性能なモデルを構築するのに十分ではありません。どのように学習させるかが決定的な要因です。

このガイドでは、ディープラーニングモデルを効果的に学習するためのあらゆる技術を体系的に網羅します。勾配降下法の基礎から始まり、高度なオプティマイザー、学習率スケジューリング、正則化、転移学習、混合精度学習、大規模分散学習まで、すべて実践的なコード例と共に解説します。


1. 勾配降下法の基礎

1.1 損失関数の理解

ディープラーニングにおいて、損失関数はモデルの予測値と正解ラベルの間の乖離を定量化します。学習の目標は、この損失値を最小化するモデルパラメータ(重み)を見つけることです。

損失関数Lはモデルパラメータthetaとデータ(x, y)に依存します。数学的に表現すると:

L(theta) = (1/N) * sum_{i=1}^{N} l(f(x_i; theta), y_i)

ここでfはモデル関数、lはサンプルごとの損失、Nはデータセットサイズです。

1.2 勾配降下法の直感的理解

勾配降下法の分かりやすいアナロジーは目を閉じた状態で山を降りるハイカーです。各ステップで、ハイカーは最も急な下降方向(勾配の逆方向)に移動します。このプロセスを繰り返すと、最終的に谷底(最小値)に到達します。

数学的には、更新則は:

theta_{t+1} = theta_t - lr * grad_L(theta_t)

ここでlrは学習率、grad_Lは損失関数の勾配です。

1.3 バッチGD vs ミニバッチGD vs SGD

バッチ勾配降下法

  • データセット全体で勾配を計算
  • 安定しているがメモリ集約的で遅い
  • 大規模データセットには実用的でない

確率的勾配降下法(SGD)

  • 単一サンプルから勾配を計算
  • 高速だがノイジーで不安定
  • オンライン学習に適している

ミニバッチ勾配降下法

  • 通常1勾配計算につき32〜512サンプルを使用
  • バッチGDとSGDの両方の利点を組み合わせる
  • 実際に最も広く使われるアプローチ
import torch
import torch.nn as nn
import numpy as np

# 単純な線形回帰による勾配降下法の実装
class LinearRegression(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.linear = nn.Linear(input_dim, 1)

    def forward(self, x):
        return self.linear(x)

# ミニバッチ勾配降下法
def train_minibatch(model, X, y, batch_size=32, lr=0.01, epochs=100):
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    criterion = nn.MSELoss()
    losses = []

    N = len(X)
    for epoch in range(epochs):
        perm = torch.randperm(N)
        X_shuffled = X[perm]
        y_shuffled = y[perm]

        epoch_loss = 0
        for i in range(0, N, batch_size):
            x_batch = X_shuffled[i:i+batch_size]
            y_batch = y_shuffled[i:i+batch_size]

            optimizer.zero_grad()
            pred = model(x_batch)
            loss = criterion(pred, y_batch)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        losses.append(epoch_loss / (N // batch_size))
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Loss = {losses[-1]:.4f}")

    return losses

torch.manual_seed(42)
X = torch.randn(1000, 10)
true_w = torch.randn(10, 1)
y = X @ true_w + 0.1 * torch.randn(1000, 1)

model = LinearRegression(10)
losses = train_minibatch(model, X, y)

1.4 学習率の重要な役割

学習率はディープラーニングで最も重要なハイパーパラメータの一つです。

  • 大きすぎる: 損失が発散するか最小値の周りで振動する
  • 小さすぎる: 学習が非常に遅く、局所的最小値にスタックする可能性がある
  • 適切: 良好な最小値への高速収束

一般的な初期値は0.1、0.01、0.001ですが、最適な値はネットワークアーキテクチャとデータによって異なります。

1.5 数学的導出(偏微分、連鎖律)

ニューラルネットワークのバックプロパゲーションは連鎖律を使用して各層の勾配を計算します。

3層ネットワークの場合:

フォワード: x -> z1=W1*x -> a1=relu(z1) -> z2=W2*a1 -> output
損失: L = MSE(output, y)

バックワード(連鎖律):
dL/dW2 = dL/d_output * d_output/dz2 * dz2/dW2
dL/dW1 = dL/d_output * ... * da1/dz1 * dz1/dW1
import numpy as np

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_deriv(x):
    s = sigmoid(x)
    return s * (1 - s)

class SimpleNet:
    def __init__(self, input_dim, hidden_dim, output_dim):
        self.W1 = np.random.randn(input_dim, hidden_dim) * np.sqrt(2/input_dim)
        self.b1 = np.zeros(hidden_dim)
        self.W2 = np.random.randn(hidden_dim, output_dim) * np.sqrt(2/hidden_dim)
        self.b2 = np.zeros(output_dim)

    def forward(self, x):
        self.x = x
        self.z1 = x @ self.W1 + self.b1
        self.a1 = sigmoid(self.z1)
        self.z2 = self.a1 @ self.W2 + self.b2
        return self.z2

    def backward(self, y, lr=0.01):
        N = len(y)
        dL_dz2 = 2 * (self.z2 - y.reshape(-1, 1)) / N

        dL_dW2 = self.a1.T @ dL_dz2
        dL_db2 = dL_dz2.sum(axis=0)

        dL_da1 = dL_dz2 @ self.W2.T
        dL_dz1 = dL_da1 * sigmoid_deriv(self.z1)

        dL_dW1 = self.x.T @ dL_dz1
        dL_db1 = dL_dz1.sum(axis=0)

        self.W2 -= lr * dL_dW2
        self.b2 -= lr * dL_db2
        self.W1 -= lr * dL_dW1
        self.b1 -= lr * dL_db1

# テスト
net = SimpleNet(10, 32, 1)
X_np = np.random.randn(100, 10)
y_np = np.random.randn(100)

for i in range(100):
    pred = net.forward(X_np)
    loss = np.mean((pred.flatten() - y_np) ** 2)
    net.backward(y_np)
    if i % 20 == 0:
        print(f"Step {i}: MSE = {loss:.4f}")

2. 高度なオプティマイザー

2.1 モメンタムSGD

通常のSGDは直接勾配に従い、狭い谷型の損失地形でジグザグ運動を引き起こします。モメンタムは慣性の物理概念を導入し、オプティマイザーが以前の移動方向を記憶できるようにします。

v_t = beta * v_{t-1} + (1 - beta) * grad_t
theta_{t+1} = theta_t - lr * v_t

モメンタム係数(beta)は通常0.9に設定されます。

import torch
import torch.optim as optim

# モメンタムSGD
optimizer_momentum = optim.SGD(
    model.parameters(),
    lr=0.01,
    momentum=0.9,
    nesterov=False
)

# ネステロフ加速勾配(NAG)- 先読み勾配
optimizer_nag = optim.SGD(
    model.parameters(),
    lr=0.01,
    momentum=0.9,
    nesterov=True
)

2.2 Adagrad(適応学習率)

Adagradは各パラメータに個別の学習率を適用します。頻繁に更新されるパラメータは学習率が下げられ、稀にしか更新されないパラメータは学習率が維持されます。

G_t = G_{t-1} + grad_t^2
theta_{t+1} = theta_t - (lr / sqrt(G_t + epsilon)) * grad_t

スパースデータに効果的ですが、G_tが無限に累積され学習率がゼロに向かって縮小していきます。

optimizer_adagrad = optim.Adagrad(
    model.parameters(),
    lr=0.01,
    eps=1e-8,
    weight_decay=0
)

2.3 RMSprop

RMSpropは二乗勾配の指数移動平均を使用することでAdagradの学習率減衰問題を解決します。

E[g^2]_t = rho * E[g^2]_{t-1} + (1 - rho) * grad_t^2
theta_{t+1} = theta_t - (lr / sqrt(E[g^2]_t + epsilon)) * grad_t
optimizer_rmsprop = optim.RMSprop(
    model.parameters(),
    lr=0.001,
    alpha=0.99,
    eps=1e-8,
    momentum=0,
    centered=False
)

2.4 Adam(適応モーメント推定)

AdamはモメンタムとRMSpropを組み合わせ、1次モーメント(平均)と2次モーメント(分散)の両方を追跡します。現在最も広く使用されているオプティマイザーです。

アルゴリズム:

m_t = beta1 * m_{t-1} + (1 - beta1) * g_t       # 1次モーメント(バイアス補正前)
v_t = beta2 * v_{t-1} + (1 - beta2) * g_t^2     # 2次モーメント(バイアス補正前)

m_hat = m_t / (1 - beta1^t)                       # バイアス補正
v_hat = v_t / (1 - beta2^t)                       # バイアス補正

theta_{t+1} = theta_t - lr * m_hat / (sqrt(v_hat) + epsilon)

デフォルトハイパーパラメータ: lr=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8

optimizer_adam = optim.Adam(
    model.parameters(),
    lr=1e-3,
    betas=(0.9, 0.999),
    eps=1e-8,
    weight_decay=0
)

2.5 AdamW(分離した重み減衰)

標準的なAdamでは、L2正則化は勾配と結合しているため適応学習率の影響を受けます。AdamWは勾配ベースの更新から切り離して、重み減衰をパラメータ更新に直接適用します。

theta_{t+1} = theta_t - lr * (m_hat / (sqrt(v_hat) + epsilon) + lambda * theta_t)

AdamWはTransformerモデル(BERT、GPTなど)の学習の標準になっています。

optimizer_adamw = optim.AdamW(
    model.parameters(),
    lr=1e-4,
    betas=(0.9, 0.999),
    eps=1e-8,
    weight_decay=0.01
)

2.6 LARSとLAMB(大バッチ学習)

非常に大きなバッチサイズ(数千)を使用する場合、標準的なAdamの性能が低下します。LARS(Layer-wise Adaptive Rate Scaling)LAMBは層ごとに学習率を調整します。

LARS: lr_l = lr * ||w_l|| / (||g_l|| + lambda * ||w_l||)
LAMB: Adamの更新に層ごとの信頼比を適用

2.7 Lionオプティマイザー(2023年)

Google Brainの**Lion(EvoLved Sign Momentum)**は勾配更新の符号のみを使用し、Adamよりも低いメモリ使用量で競争力のある性能を発揮します。

class Lion(torch.optim.Optimizer):
    def __init__(self, params, lr=1e-4, betas=(0.9, 0.99), weight_decay=0.0):
        defaults = dict(lr=lr, betas=betas, weight_decay=weight_decay)
        super().__init__(params, defaults)

    def step(self, closure=None):
        loss = None
        if closure is not None:
            with torch.enable_grad():
                loss = closure()

        for group in self.param_groups:
            for p in group['params']:
                if p.grad is None:
                    continue

                grad = p.grad
                lr = group['lr']
                beta1, beta2 = group['betas']
                wd = group['weight_decay']

                state = self.state[p]
                if len(state) == 0:
                    state['exp_avg'] = torch.zeros_like(p)

                exp_avg = state['exp_avg']

                # Lionの更新
                update = exp_avg * beta1 + grad * (1 - beta1)
                p.data.mul_(1 - lr * wd)
                p.data.add_(update.sign_(), alpha=-lr)

                # モメンタムの更新
                exp_avg.mul_(beta2).add_(grad, alpha=1 - beta2)

        return loss

2.8 オプティマイザー比較実験

import torch
import torch.nn as nn

class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(2, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.net(x)

def train_and_compare(optimizers_dict, X, y, epochs=200):
    results = {}

    for name, opt_fn in optimizers_dict.items():
        model = MLP()
        optimizer = opt_fn(model.parameters())
        criterion = nn.MSELoss()
        losses = []

        for epoch in range(epochs):
            optimizer.zero_grad()
            pred = model(X)
            loss = criterion(pred, y)
            loss.backward()
            optimizer.step()
            losses.append(loss.item())

        results[name] = losses
        print(f"{name}: Final Loss = {losses[-1]:.4f}")

    return results

X = torch.randn(500, 2)
y = (X[:, 0] * 2 + X[:, 1] * 3 + torch.randn(500) * 0.1).unsqueeze(1)

optimizers = {
    'SGD': lambda p: torch.optim.SGD(p, lr=0.01),
    'SGD+Momentum': lambda p: torch.optim.SGD(p, lr=0.01, momentum=0.9),
    'Adam': lambda p: torch.optim.Adam(p, lr=0.001),
    'AdamW': lambda p: torch.optim.AdamW(p, lr=0.001, weight_decay=0.01),
    'RMSprop': lambda p: torch.optim.RMSprop(p, lr=0.001),
}

results = train_and_compare(optimizers, X, y)

3. 学習率スケジューリング

固定学習率が最適であることはほとんどありません。学習率スケジューリングは学習中に動的に学習率を調整し、より速い収束と優れた最終性能を達成します。

3.1 ステップ減衰と指数減衰

import torch
import torch.optim as optim

model = MLP()
optimizer = optim.SGD(model.parameters(), lr=0.1)

# ステップ減衰: step_sizeエポックごとにgamma倍する
step_scheduler = optim.lr_scheduler.StepLR(
    optimizer,
    step_size=30,
    gamma=0.1
)

# マルチステップ減衰: 指定のマイルストーンで減少
multistep_scheduler = optim.lr_scheduler.MultiStepLR(
    optimizer,
    milestones=[30, 60, 80],
    gamma=0.1
)

# 指数減衰: エポックごとに指数的に減少
exp_scheduler = optim.lr_scheduler.ExponentialLR(
    optimizer,
    gamma=0.95
)

3.2 コサインアニーリング

コサインアニーリングはコサイン曲線に沿って学習率を滑らかに減少させます。ウォームリスタート付きコサインアニーリングは探索のために定期的に学習率をリセットします。

# コサインアニーリング
cosine_scheduler = optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=100,
    eta_min=1e-6
)

# ウォームリスタート付きコサインアニーリング(SGDR)
cosine_restart = optim.lr_scheduler.CosineAnnealingWarmRestarts(
    optimizer,
    T_0=10,
    T_mult=2,
    eta_min=1e-6
)

3.3 ウォームアップ + コサインスケジュール

Transformerモデルを学習するための標準的なスケジュール。ウォームアップ中は学習率が線形に増加し、その後コサイン曲線に沿って減少します。

import math
from torch.optim.lr_scheduler import LambdaLR

def get_cosine_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps, num_cycles=0.5):
    def lr_lambda(current_step):
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1, num_warmup_steps))
        progress = float(current_step - num_warmup_steps) / float(
            max(1, num_training_steps - num_warmup_steps)
        )
        return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress)))

    return LambdaLR(optimizer, lr_lambda)

optimizer = optim.AdamW(model.parameters(), lr=5e-5)
scheduler = get_cosine_schedule_with_warmup(
    optimizer,
    num_warmup_steps=1000,
    num_training_steps=10000
)

3.4 OneCycleLR

OneCycleLRは学習率を積極的に上げてから下げ、高速収束を実現します。Leslie Smithが提案し、FastAIで普及しました。

optimizer = optim.SGD(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=0.1,
    steps_per_epoch=len(train_loader),
    epochs=10,
    pct_start=0.3,
    anneal_strategy='cos',
    div_factor=25.0,
    final_div_factor=1e4
)

for epoch in range(10):
    for batch in train_loader:
        optimizer.zero_grad()
        loss = criterion(model(batch[0]), batch[1])
        loss.backward()
        optimizer.step()
        scheduler.step()  # OneCycleLRはバッチごとにstep

3.5 学習率ファインダー

学習前に適切な学習率範囲を自動的に特定します。

from torch_lr_finder import LRFinder

model = MLP()
optimizer = optim.SGD(model.parameters(), lr=1e-7, weight_decay=1e-2)
criterion = nn.MSELoss()

lr_finder = LRFinder(model, optimizer, criterion, device="cuda")
lr_finder.range_test(train_loader, end_lr=100, num_iter=100)
lr_finder.plot()
lr_finder.reset()

# 損失の最も急な下降点でのLRを選択
# 通常は最小値での値の1/10から1/3を使用

4. 損失関数

4.1 回帰損失関数

import torch
import torch.nn as nn
import torch.nn.functional as F

# MSE - 外れ値に敏感
mse_loss = nn.MSELoss()

# MAE - 外れ値に頑健
mae_loss = nn.L1Loss()

# Huber損失 - MSEとMAEの妥協点
# |y - y_hat| < delta: 0.5 * (y - y_hat)^2
# |y - y_hat| >= delta: delta * (|y - y_hat| - 0.5 * delta)
huber_loss = nn.HuberLoss(delta=1.0)

def huber_loss_manual(pred, target, delta=1.0):
    residual = torch.abs(pred - target)
    condition = residual < delta
    squared_loss = 0.5 * residual ** 2
    linear_loss = delta * residual - 0.5 * delta ** 2
    return torch.where(condition, squared_loss, linear_loss).mean()

4.2 分類損失関数

# クロスエントロピー損失(多クラス)
ce_loss = nn.CrossEntropyLoss()

# バイナリクロスエントロピー(二値分類)
bce_loss = nn.BCEWithLogitsLoss()

# ラベルスムージングクロスエントロピー(過信頼を減少させる)
ce_smooth = nn.CrossEntropyLoss(label_smoothing=0.1)

# フォーカル損失(クラス不均衡に対処)
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, alpha=None, reduction='mean'):
        super().__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = ((1 - pt) ** self.gamma) * ce_loss

        if self.alpha is not None:
            alpha_t = self.alpha[targets]
            focal_loss = alpha_t * focal_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        return focal_loss

4.3 セグメンテーション損失関数

def bce_loss_fn(pred, target):
    return F.binary_cross_entropy_with_logits(pred, target)

# Dice損失(クラス不均衡に頑健)
def dice_loss(pred, target, smooth=1.0):
    pred = torch.sigmoid(pred)
    pred_flat = pred.view(-1)
    target_flat = target.view(-1)

    intersection = (pred_flat * target_flat).sum()
    dice = (2. * intersection + smooth) / (pred_flat.sum() + target_flat.sum() + smooth)
    return 1 - dice

# BCE + Diceの組み合わせ(セグメンテーションで一般的)
def bce_dice_loss(pred, target, bce_weight=0.5):
    bce = bce_loss_fn(pred, target)
    dice = dice_loss(pred, target)
    return bce_weight * bce + (1 - bce_weight) * dice

4.4 メトリック学習損失関数

# 対比損失(類似ペアを引き寄せ、非類似ペアを押し離す)
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        # label=1: 同クラス, label=0: 異クラス
        euclidean_dist = F.pairwise_distance(output1, output2)
        loss = (label * euclidean_dist.pow(2) +
                (1 - label) * F.relu(self.margin - euclidean_dist).pow(2))
        return loss.mean()

# トリプレット損失(アンカー、ポジティブ、ネガティブ)
class TripletLoss(nn.Module):
    def __init__(self, margin=0.3):
        super().__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        pos_dist = F.pairwise_distance(anchor, positive)
        neg_dist = F.pairwise_distance(anchor, negative)
        loss = F.relu(pos_dist - neg_dist + self.margin)
        return loss.mean()

5. 正則化技術

過学習を防ぎ、汎化性能を向上させる技術。

5.1 L1/L2正則化

# L2正則化(重み減衰)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)

# L1正則化(手動実装)
def l1_regularization(model, lambda_l1):
    l1_penalty = 0
    for param in model.parameters():
        l1_penalty += torch.abs(param).sum()
    return lambda_l1 * l1_penalty

# Elastic Net(L1 + L2)
def elastic_net_loss(model, criterion, outputs, targets, lambda_l1=1e-5, lambda_l2=1e-4):
    base_loss = criterion(outputs, targets)
    l1_penalty = sum(torch.abs(p).sum() for p in model.parameters())
    l2_penalty = sum((p ** 2).sum() for p in model.parameters())
    return base_loss + lambda_l1 * l1_penalty + lambda_l2 * l2_penalty

5.2 ドロップアウト

ドロップアウトは学習中にランダムにニューロンを無効化して共適応を防ぎます。逆ドロップアウトは学習中に保持確率で割ることで、推論時にスケーリングが不要になります。

class ModelWithDropout(nn.Module):
    def __init__(self, dropout_rate=0.5):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        return self.net(x)

# 学習モード: ドロップアウトが有効
model.train()

# 推論モード: ドロップアウトが無効
model.eval()

5.3 データ拡張

from torchvision import transforms

train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomCrop(32, padding=4),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomRotation(degrees=15),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Mixup拡張
def mixup_data(x, y, alpha=1.0):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    index = torch.randperm(batch_size)

    mixed_x = lam * x + (1 - lam) * x[index]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

# CutMix拡張
def cutmix_data(x, y, alpha=1.0):
    lam = np.random.beta(alpha, alpha)
    batch_size, C, H, W = x.size()
    index = torch.randperm(batch_size)

    cut_ratio = np.sqrt(1. - lam)
    cut_w = int(W * cut_ratio)
    cut_h = int(H * cut_ratio)

    cx = np.random.randint(W)
    cy = np.random.randint(H)

    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)

    mixed_x = x.clone()
    mixed_x[:, :, bby1:bby2, bbx1:bbx2] = x[index, :, bby1:bby2, bbx1:bbx2]
    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (W * H))

    return mixed_x, y, y[index], lam

5.4 早期停止

class EarlyStopping:
    def __init__(self, patience=10, min_delta=0.001, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.counter = 0
        self.best_loss = None
        self.best_weights = None
        self.early_stop = False

    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.best_weights = {k: v.clone() for k, v in model.state_dict().items()}
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            print(f"EarlyStopping counter: {self.counter}/{self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.best_weights = {k: v.clone() for k, v in model.state_dict().items()}
            self.counter = 0

    def restore(self, model):
        if self.restore_best_weights and self.best_weights:
            model.load_state_dict(self.best_weights)
            print("最良のモデル重みを復元しました")

6. 正規化層

6.1 バッチ正規化

2015年にSergey IoffeとChristian Szegedyが提案したバッチ正規化は、内部共変量シフト問題に対処するために各ミニバッチ内で特徴量を正規化します。

プロセス:

1. ミニバッチ平均: mu_B = (1/m) * sum(x_i)
2. ミニバッチ分散: sigma_B^2 = (1/m) * sum((x_i - mu_B)^2)
3. 正規化: x_hat_i = (x_i - mu_B) / sqrt(sigma_B^2 + epsilon)
4. スケールとシフト: y_i = gamma * x_hat_i + beta

gamma(スケール)とbeta(シフト)は学習可能なパラメータです。

import torch
import torch.nn as nn

class BatchNormNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(784, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        return self.net(x)

# バッチ正規化の手動実装
class BatchNorm(nn.Module):
    def __init__(self, num_features, eps=1e-5, momentum=0.1):
        super().__init__()
        self.gamma = nn.Parameter(torch.ones(num_features))
        self.beta = nn.Parameter(torch.zeros(num_features))
        self.eps = eps
        self.momentum = momentum

        self.register_buffer('running_mean', torch.zeros(num_features))
        self.register_buffer('running_var', torch.ones(num_features))

    def forward(self, x):
        if self.training:
            mean = x.mean(dim=0)
            var = x.var(dim=0, unbiased=False)
            self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean
            self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var
        else:
            mean = self.running_mean
            var = self.running_var

        x_norm = (x - mean) / torch.sqrt(var + self.eps)
        return self.gamma * x_norm + self.beta

6.2 レイヤー正規化(Transformer標準)

レイヤー正規化はバッチ次元ではなく特徴次元で正規化します。バッチサイズに依存しないため、RNNとTransformerに適しています。

class LayerNorm(nn.Module):
    def __init__(self, normalized_shape, eps=1e-5):
        super().__init__()
        if isinstance(normalized_shape, int):
            normalized_shape = (normalized_shape,)
        self.normalized_shape = normalized_shape
        self.gamma = nn.Parameter(torch.ones(normalized_shape))
        self.beta = nn.Parameter(torch.zeros(normalized_shape))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        x_norm = (x - mean) / torch.sqrt(var + self.eps)
        return self.gamma * x_norm + self.beta

# Pre-LayerNorm付きTransformerブロック(現代のGPTスタイル)
class TransformerBlock(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward):
        super().__init__()
        self.attention = nn.MultiheadAttention(d_model, nhead)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.GELU(),
            nn.Linear(dim_feedforward, d_model)
        )

    def forward(self, x):
        attn_out, _ = self.attention(self.norm1(x), self.norm1(x), self.norm1(x))
        x = x + attn_out
        x = x + self.ffn(self.norm2(x))
        return x

6.3 インスタンス、グループ、RMS正規化

# インスタンス正規化(サンプルごと、チャンネルごと)
# スタイル転送に効果的
instance_norm = nn.InstanceNorm2d(64)

# グループ正規化(チャンネルグループ内で正規化)
# バッチサイズが小さい場合のBNの代替
group_norm = nn.GroupNorm(num_groups=8, num_channels=64)

# RMS正規化(LLaMA、T5で使用)
# 速度のためLayerNormから平均中心化を除去
class RMSNorm(nn.Module):
    def __init__(self, dim, eps=1e-6):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(dim))

    def _norm(self, x):
        return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

    def forward(self, x):
        return self.weight * self._norm(x.float()).type_as(x)

# 各正規化をいつ使うかのまとめ:
# BatchNorm: CNN、バッチ依存、バッチサイズ >= 16が最良
# LayerNorm: Transformer/RNN、バッチ非依存
# InstanceNorm: スタイル転送、サンプルごとチャンネルごと
# GroupNorm: 小バッチ、検出/セグメンテーション
# RMSNorm: LLM、軽量LayerNormの代替

7. 重みの初期化

7.1 Xavier/He初期化

重みの初期化はトレーニングの開始点を設定します。不適切な初期化は勾配の消失または爆発を引き起こす可能性があります。

import torch
import torch.nn as nn

class WeightInitDemo(nn.Module):
    def __init__(self, init_method='xavier'):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.Linear(256, 256) for _ in range(5)
        ])
        self.apply_init(init_method)

    def apply_init(self, method):
        for layer in self.layers:
            if method == 'zeros':
                nn.init.zeros_(layer.weight)        # 悪い: 対称性問題
            elif method == 'random_small':
                nn.init.normal_(layer.weight, std=0.01)
            elif method == 'xavier_uniform':
                nn.init.xavier_uniform_(layer.weight)  # sigmoid/tanh用
            elif method == 'xavier_normal':
                nn.init.xavier_normal_(layer.weight)
            elif method == 'kaiming_uniform':
                nn.init.kaiming_uniform_(layer.weight, mode='fan_in', nonlinearity='relu')
            elif method == 'kaiming_normal':
                nn.init.kaiming_normal_(layer.weight, mode='fan_out', nonlinearity='relu')  # ReLU用
            nn.init.zeros_(layer.bias)

    def forward(self, x):
        for layer in self.layers:
            x = torch.relu(layer(x))
        return x

# 初期化の比較
x = torch.randn(100, 256)
for method in ['zeros', 'random_small', 'xavier_uniform', 'kaiming_normal']:
    model = WeightInitDemo(method)
    with torch.no_grad():
        out = model(x)
    print(f"{method}: output mean={out.mean():.4f}, std={out.std():.4f}")

Xavier/Glorot初期化はsigmoid/tanh活性化用に設計されています:

  • 一様: Uniform(-limit, +limit)から重みを抽出(limitはsqrt(6 / (fan_in + fan_out)))

He/Kaiming初期化はReLU活性化用に設計されています:

  • 正規: Normal(0, sqrt(2 / fan_in))から重みを抽出

8. 勾配問題の解決策

8.1 勾配の消失と爆発

勾配の消失: 勾配が層を逆伝播する際にゼロに向かって縮小し、初期層の学習を妨げます。深いネットワークのsigmoidとtanh活性化でよく見られます。

勾配の爆発: 勾配が指数的に成長し、NaNやInf値を引き起こします。長いシーケンスのRNNでよく見られます。

import torch.nn.utils as utils

# 方法1: 勾配ノルムクリッピング
max_norm = 1.0
total_norm = utils.clip_grad_norm_(model.parameters(), max_norm)
print(f"Gradient norm: {total_norm:.4f}")

# 方法2: 勾配値クリッピング
utils.clip_grad_value_(model.parameters(), clip_value=0.5)

# トレーニングループでの使用
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

for batch in train_loader:
    optimizer.zero_grad()
    loss = criterion(model(batch[0]), batch[1])
    loss.backward()

    # backward後、optimizer.step前にクリップ
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

    optimizer.step()

8.2 残差接続(スキップ接続)

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)  # スキップ接続
        out = self.relu(out)
        return out

8.3 勾配チェックポインティング

非常に深いモデルの場合、計算とメモリをトレードオフします: 中間活性化を破棄し、バックワードパス中に再計算します。

from torch.utils.checkpoint import checkpoint, checkpoint_sequential

class DeepModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(*[
            nn.Sequential(nn.Linear(512, 512), nn.ReLU())
            for _ in range(20)
        ])

    def forward(self, x):
        # 標準: すべての活性化を保存 O(N)メモリ
        # return self.layers(x)

        # 勾配チェックポインティング: O(sqrt(N))メモリ
        return checkpoint_sequential(self.layers, segments=5, input=x)

9. 転移学習とファインチューニング

9.1 特徴抽出とファインチューニング

import torchvision.models as models

# 特徴抽出: 事前学習済み重みを凍結
def feature_extraction(num_classes):
    model = models.resnet50(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False

    # 分類ヘッドのみ置き換え
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model

# ファインチューニング: 選択的に層を解凍
def fine_tuning(num_classes, unfreeze_layers=None):
    model = models.resnet50(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False

    model.fc = nn.Linear(model.fc.in_features, num_classes)

    if unfreeze_layers:
        for name, param in model.named_parameters():
            for layer in unfreeze_layers:
                if layer in name:
                    param.requires_grad = True

    return model

9.2 段階的解凍と識別的学習率

def discriminative_lr_optimizer(model, base_lr=1e-4, lr_multiplier=10):
    # 初期層に低い学習率、後期層に高い学習率を割り当て
    param_groups = [
        {'params': model.layer1.parameters(), 'lr': base_lr / (lr_multiplier**3)},
        {'params': model.layer2.parameters(), 'lr': base_lr / (lr_multiplier**2)},
        {'params': model.layer3.parameters(), 'lr': base_lr / lr_multiplier},
        {'params': model.layer4.parameters(), 'lr': base_lr},
        {'params': model.fc.parameters(), 'lr': base_lr * lr_multiplier},
    ]
    return torch.optim.Adam(param_groups)

9.3 LoRA(低ランク適応)

LoRAは大規模言語モデルのパラメータ効率的なファインチューニング技術です。元の重み行列を凍結し、低ランク分解を学習します。

元の重み行列Wの形状がd x kのとき、LoRAはW' = W + BAを学習します。Bはd x r、Aはr x kの形状を持ち、ランクrはdとkの両方より十分に小さく設定されます。

class LoRALayer(nn.Module):
    def __init__(self, in_features, out_features, rank=4, alpha=1.0):
        super().__init__()
        self.rank = rank
        self.alpha = alpha
        self.scaling = alpha / rank

        # 凍結された元の重み
        self.weight = nn.Parameter(
            torch.randn(out_features, in_features),
            requires_grad=False
        )

        # LoRA行列A(ランダム初期化)
        self.lora_A = nn.Parameter(torch.randn(rank, in_features) * 0.01)
        # LoRA行列B(ゼロ初期化 -> 開始時は元と同一)
        self.lora_B = nn.Parameter(torch.zeros(out_features, rank))
        self.bias = nn.Parameter(torch.zeros(out_features))

    def forward(self, x):
        base_output = nn.functional.linear(x, self.weight, self.bias)
        lora_output = (x @ self.lora_A.T @ self.lora_B.T) * self.scaling
        return base_output + lora_output

# HuggingFace PEFTライブラリの使用
from peft import get_peft_model, LoraConfig, TaskType

lora_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=8,
    lora_alpha=32,
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.05,
    bias="none"
)

# peft_model = get_peft_model(base_model, lora_config)
# peft_model.print_trainable_parameters()
# trainable params: 4,194,304 || all params: 6,742,609,920 || trainable%: 0.062

10. ハイパーパラメータチューニング

10.1 Optunaによるベイズ最適化

import optuna
import torch
import torch.nn as nn

def objective(trial):
    lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
    n_layers = trial.suggest_int('n_layers', 1, 5)
    n_units = trial.suggest_categorical('n_units', [64, 128, 256, 512])
    dropout_rate = trial.suggest_float('dropout', 0.0, 0.5)
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'AdamW', 'SGD'])

    layers = []
    in_dim = 784
    for _ in range(n_layers):
        layers.extend([
            nn.Linear(in_dim, n_units),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        ])
        in_dim = n_units
    layers.append(nn.Linear(in_dim, 10))
    model = nn.Sequential(*layers)

    if optimizer_name == 'Adam':
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    elif optimizer_name == 'AdamW':
        optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
    else:
        optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    val_accuracy = 0.95  # 実際のトレーニングに置き換える
    return val_accuracy

study = optuna.create_study(
    direction='maximize',
    sampler=optuna.samplers.TPESampler(),
    pruner=optuna.pruners.MedianPruner()
)

study.optimize(objective, n_trials=100, timeout=3600)

print(f"Best trial: {study.best_trial.value:.4f}")
print(f"Best params: {study.best_trial.params}")

11. 混合精度学習

11.1 FP32 vs FP16 vs BF16

フォーマット指数ビット仮数ビット範囲主な用途
FP32823+-3.4e38デフォルト学習
FP16510+-65504推論 / 学習(オーバーフローリスク)
BF1687+-3.4e38LLM学習(A100、TPU)

11.2 PyTorch AMP(自動混合精度)

import torch
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        inputs, labels = inputs.cuda(), labels.cuda()
        optimizer.zero_grad()

        # autocnstコンテキスト内でのFP16計算
        with autocast(dtype=torch.float16):
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        # 損失をスケールしてバックプロップ
        scaler.scale(loss).backward()

        # 勾配クリッピングのためにアンスケール
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # ステップ(NaN/Inf勾配が検出された場合はスキップ)
        scaler.step(optimizer)
        scaler.update()

# BF16(より安定、Ampere以降のGPUが必要)
with autocast(dtype=torch.bfloat16):
    outputs = model(inputs)
    loss = criterion(outputs, labels)

AMPの主なメリット:

  • メモリ削減: 約2倍(FP16パラメータ)
  • 速度向上: Tensor Core GPUで1.5倍〜3倍
  • ほとんどのタスクでFP32とほぼ同じ精度

12. 分散学習

12.1 DDPによるデータ並列性

データを複数のGPUに分散します。各GPUが独立してフォワードとバックワードパスを計算し、その後勾配が集約されます。

import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

def setup(rank, world_size):
    import os
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group('nccl', rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

def train_ddp(rank, world_size, model_class, dataset):
    setup(rank, world_size)

    device = torch.device(f'cuda:{rank}')
    model = model_class().to(device)
    model = DDP(model, device_ids=[rank])

    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )

    loader = torch.utils.data.DataLoader(
        dataset,
        batch_size=32,
        sampler=sampler,
        num_workers=4,
        pin_memory=True
    )

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3 * world_size)
    criterion = nn.CrossEntropyLoss()
    scaler = GradScaler()

    for epoch in range(num_epochs):
        sampler.set_epoch(epoch)  # エポックごとに異なるシャッフル

        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()

            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

        if rank == 0:
            print(f"Epoch {epoch}: Loss = {loss.item():.4f}")

    cleanup()

import torch.multiprocessing as mp

if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_ddp, args=(world_size, MyModel, dataset), nprocs=world_size, join=True)

12.2 FSDP(完全シャーディングデータ並列)

FSDPはモデルパラメータ、勾配、オプティマイザー状態をすべてのGPUにシャーディングします。シングルGPUのメモリを超える数十億パラメータのモデルの学習に不可欠です。

from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import ShardingStrategy, MixedPrecision
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
import functools

bf16_policy = MixedPrecision(
    param_dtype=torch.bfloat16,
    reduce_dtype=torch.bfloat16,
    buffer_dtype=torch.bfloat16
)

auto_wrap_policy = functools.partial(
    transformer_auto_wrap_policy,
    transformer_layer_cls={TransformerBlock}
)

model = FSDP(
    model,
    sharding_strategy=ShardingStrategy.FULL_SHARD,
    mixed_precision=bf16_policy,
    auto_wrap_policy=auto_wrap_policy,
    device_id=rank
)

FSDPシャーディング戦略:

  • FULL_SHARD: パラメータ、勾配、オプティマイザー状態をシャーディング(最大メモリ節約)
  • SHARD_GRAD_OP: 勾配とオプティマイザー状態のみシャーディング
  • NO_SHARD: DDPと同等

12.3 勾配累積

限られたGPUメモリで大きなバッチサイズをシミュレートするために、複数のマイクロバッチにわたって勾配を累積します。

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
scaler = GradScaler()

micro_batch_size = 8
accumulation_steps = 8  # 実効バッチサイズ: 64

optimizer.zero_grad()
for step, (inputs, labels) in enumerate(train_loader):
    inputs, labels = inputs.cuda(), labels.cuda()

    with autocast():
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss = loss / accumulation_steps  # 損失を正規化

    scaler.scale(loss).backward()

    if (step + 1) % accumulation_steps == 0:
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

13. 大規模言語モデル学習技術

13.1 指示チューニング

指示チューニングは自然言語の指示に従うようにモデルを学習させます。FLAN、InstructGPT、LLaMA-2の成功の中心でした。

# 指示チューニングデータの形式
instruction_data = [
    {
        "instruction": "以下のテキストの感情を分析してください。",
        "input": "今日の天気は素晴らしく、気分が最高です!",
        "output": "ポジティブな感情。テキストは天気への満足感と幸福感を表現しています。"
    },
    {
        "instruction": "以下の記事を要約してください。",
        "input": "...(長いテキスト)...",
        "output": "...(要約)..."
    }
]

# Alpacaスタイルのプロンプトテンプレート
def format_instruction(sample):
    if sample.get('input'):
        return f"""### Instruction:
{sample['instruction']}

### Input:
{sample['input']}

### Response:
{sample['output']}"""
    else:
        return f"""### Instruction:
{sample['instruction']}

### Response:
{sample['output']}"""

13.2 RLHF(人間のフィードバックからの強化学習)

RLHFは3つのステージで構成されます:

ステージ1 — SFT(教師ありファインチューニング): 高品質な人間のデモンストレーションでファインチューニング ステージ2 — 報酬モデル: 人間の好みを予測する報酬モデルを学習 ステージ3 — PPO: 報酬モデルを使用してRLでポリシーを最適化

# ステージ2: 報酬モデル(Bradley-Terry好みモデル)
class RewardModel(nn.Module):
    def __init__(self, base_model):
        super().__init__()
        self.base_model = base_model
        self.reward_head = nn.Linear(base_model.config.hidden_size, 1)

    def forward(self, input_ids, attention_mask):
        outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden = outputs.last_hidden_state[:, -1, :]
        reward = self.reward_head(last_hidden).squeeze(-1)
        return reward

# 好み損失(Bradley-Terryモデル)
def preference_loss(reward_chosen, reward_rejected):
    # p(chosen > rejected) = sigmoid(r_chosen - r_rejected)
    return -torch.log(torch.sigmoid(reward_chosen - reward_rejected)).mean()

13.3 DPO(直接好み最適化)

DPOはPPOを不要にすることでRLHFを簡略化し、閉形式の再パラメータ化を使用して好みデータでポリシーを直接最適化します。

import torch
import torch.nn.functional as F

def dpo_loss(
    policy_chosen_logps,
    policy_rejected_logps,
    reference_chosen_logps,
    reference_rejected_logps,
    beta=0.1
):
    # ポリシーと参照モデル間の対数比
    chosen_rewards = beta * (policy_chosen_logps - reference_chosen_logps)
    rejected_rewards = beta * (policy_rejected_logps - reference_rejected_logps)

    # DPO損失: -log(sigmoid(chosen_rewards - rejected_rewards))
    loss = -F.logsigmoid(chosen_rewards - rejected_rewards).mean()

    chosen_reward = chosen_rewards.detach().mean()
    rejected_reward = rejected_rewards.detach().mean()
    reward_accuracy = (chosen_rewards > rejected_rewards).float().mean()

    return loss, chosen_reward, rejected_reward, reward_accuracy

RLHFに対するDPOの利点:

  • 別個の報酬モデルを学習する必要がない
  • PPOハイパーパラメータのチューニングが不要
  • より安定した学習
  • 同等またはそれ以上のアライメント結果

14. 完全なトレーニングパイプライン

14.1 本番品質のトレーナー

import torch
import torch.nn as nn
from torch.cuda.amp import autocast, GradScaler

class Trainer:
    def __init__(
        self,
        model,
        train_loader,
        val_loader,
        optimizer,
        scheduler,
        criterion,
        device='cuda',
        use_amp=True,
        grad_clip=1.0,
        accumulation_steps=1
    ):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.criterion = criterion
        self.device = device
        self.use_amp = use_amp
        self.grad_clip = grad_clip
        self.accumulation_steps = accumulation_steps
        self.scaler = GradScaler() if use_amp else None

    def train_epoch(self):
        self.model.train()
        total_loss = 0
        self.optimizer.zero_grad()

        for step, (inputs, labels) in enumerate(self.train_loader):
            inputs, labels = inputs.to(self.device), labels.to(self.device)

            if self.use_amp:
                with autocast():
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, labels) / self.accumulation_steps
                self.scaler.scale(loss).backward()
            else:
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels) / self.accumulation_steps
                loss.backward()

            if (step + 1) % self.accumulation_steps == 0:
                if self.use_amp:
                    self.scaler.unscale_(self.optimizer)

                if self.grad_clip:
                    nn.utils.clip_grad_norm_(self.model.parameters(), self.grad_clip)

                if self.use_amp:
                    self.scaler.step(self.optimizer)
                    self.scaler.update()
                else:
                    self.optimizer.step()

                if self.scheduler:
                    self.scheduler.step()

                self.optimizer.zero_grad()

            total_loss += loss.item() * self.accumulation_steps

        return total_loss / len(self.train_loader)

    @torch.no_grad()
    def evaluate(self):
        self.model.eval()
        total_loss = 0
        correct = 0
        total = 0

        for inputs, labels in self.val_loader:
            inputs, labels = inputs.to(self.device), labels.to(self.device)

            with autocast() if self.use_amp else torch.no_grad():
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        return total_loss / len(self.val_loader), 100. * correct / total

    def fit(self, epochs, save_path=None):
        best_val_acc = 0
        early_stopping = EarlyStopping(patience=10)

        for epoch in range(epochs):
            train_loss = self.train_epoch()
            val_loss, val_acc = self.evaluate()

            print(f"Epoch {epoch+1}/{epochs}: "
                  f"Train Loss: {train_loss:.4f}, "
                  f"Val Loss: {val_loss:.4f}, "
                  f"Val Acc: {val_acc:.2f}%")

            if val_acc > best_val_acc:
                best_val_acc = val_acc
                if save_path:
                    torch.save(self.model.state_dict(), save_path)

            early_stopping(val_loss, self.model)
            if early_stopping.early_stop:
                print("早期停止がトリガーされました!")
                break

        return best_val_acc

おわりに:ベストプラクティス

効果的なディープラーニング学習のコアな原則まとめ:

オプティマイザーの選択

  • 一般的なタスク: AdamW(lr=1e-3〜1e-4、weight_decay=0.01)
  • Transformer: AdamW + ウォームアップ + コサインスケジュール
  • 大バッチ学習: LAMBまたはLARS
  • メモリ制約: Lion

正則化戦略

  • ドロップアウトは通常0.1〜0.5の間に設定
  • 小規模データセット: 強い正則化(大きなweight decay、高いドロップアウト)
  • 大規模データセット: 弱いまたは正則化なし

学習率スケジューリング

  • CNN: OneCycleLRまたはステップ減衰
  • Transformer: ウォームアップ + コサインまたは逆平方根

混合精度

  • 常にAMPを使用(1.5倍〜3倍の速度向上、2倍のメモリ節約)
  • A100/H100以降: BF16を優先
  • 古いGPU: FP16 + 損失スケーリングを使用

分散学習

  • マルチGPUシングルサーバー: DDP + NCCL
  • 数十億パラメータモデル: FSDP
  • 常に勾配累積を使用して実効バッチサイズを増加

参考文献