- Authors

- Name
- Youngju Kim
- @fjvbn20031
ディープラーニングモデルの学習では、予期しない失敗に頻繁に遭遇します。Lossが突然NaNになる、いくら待ってもモデルが収束しない、メモリ不足エラーが発生する——これらはすべてのディープラーニング開発者が経験する問題です。このガイドでは、ディープラーニング学習中に発生する主要な問題を体系的に診断・解決する方法を、実践的なコード例とともに解説します。
1. ディープラーニング学習の一般的な失敗パターン
ディープラーニングの学習失敗は大きく3種類に分類できます。
Lossが減少しない
学習は始まっているのにLossがほとんど減少しないか、初期値付近にとどまっている状態。最も一般的な原因は、学習率が低すぎる、モデル実装のバグ、データ前処理の問題です。
LossがNaNになる
LossがNaN(Not a Number)またはInf(無限大)に変化する。数値不安定性によって発生し、学習率が高すぎる場合やデータに外れ値が含まれている場合に典型的に起こります。
学習Lossは減少しているが検証Lossが増加する
これは過学習(overfitting)です。モデルが学習データを記憶しているが、新しいデータへの汎化に失敗しています。
チェックリストベースの診断フレームワーク
def diagnose_training(model, train_loader, val_loader, optimizer, loss_fn, device):
"""
学習開始前に実行するクイック診断関数
"""
print("=== ディープラーニング学習診断チェックリスト ===\n")
# 1. データの検証
print("[1] データを検証中...")
batch = next(iter(train_loader))
X, y = batch
print(f" 入力形状: {X.shape}")
print(f" ラベル形状: {y.shape}")
print(f" 入力範囲: [{X.min():.4f}, {X.max():.4f}]")
print(f" 入力にNaNあり: {torch.isnan(X).any()}")
print(f" 入力にInfあり: {torch.isinf(X).any()}")
# 2. モデルパラメータの検証
print("\n[2] モデルパラメータを検証中...")
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f" 総パラメータ数: {total_params:,}")
print(f" 学習可能パラメータ数: {trainable_params:,}")
# 3. 順伝播のテスト
print("\n[3] 順伝播をテスト中...")
model.eval()
with torch.no_grad():
try:
output = model(X.to(device))
print(f" 出力形状: {output.shape}")
print(f" 出力にNaNあり: {torch.isnan(output).any()}")
loss = loss_fn(output, y.to(device))
print(f" 初期Loss: {loss.item():.4f}")
except Exception as e:
print(f" 順伝播失敗: {e}")
# 4. 逆伝播のテスト
print("\n[4] 逆伝播をテスト中...")
model.train()
optimizer.zero_grad()
output = model(X.to(device))
loss = loss_fn(output, y.to(device))
loss.backward()
# 勾配の検証
grad_norms = []
for name, param in model.named_parameters():
if param.grad is not None:
grad_norms.append((name, param.grad.norm().item()))
if grad_norms:
print(" 層別勾配ノルム上位5:")
for name, norm in sorted(grad_norms, key=lambda x: x[1], reverse=True)[:5]:
print(f" {name}: {norm:.6f}")
print("\n診断完了!")
2. Loss問題の診断
NaN Lossの原因と解決策
NaN Lossはディープラーニングで最も苛立たしい問題の一つです。複数の原因があり、それぞれ異なるアプローチが必要です。
学習率が高すぎる
NaN Lossの最も一般的な原因です。学習率が高すぎると、パラメータ更新の大きさが過大になりLossが爆発します。
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
def find_learning_rate(model, train_loader, loss_fn, device,
start_lr=1e-7, end_lr=10, num_iter=100):
"""
LRレンジテストを使用して最適な学習率範囲を見つける。
"""
optimizer = optim.SGD(model.parameters(), lr=start_lr)
lr_multiplier = (end_lr / start_lr) ** (1 / num_iter)
lrs = []
losses = []
best_loss = float('inf')
model.train()
data_iter = iter(train_loader)
for i in range(num_iter):
try:
X, y = next(data_iter)
except StopIteration:
data_iter = iter(train_loader)
X, y = next(data_iter)
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
output = model(X)
loss = loss_fn(output, y)
if torch.isnan(loss) or loss.item() > best_loss * 4:
print(f"lr={optimizer.param_groups[0]['lr']:.2e}でLoss爆発を検出")
break
if loss.item() < best_loss:
best_loss = loss.item()
lrs.append(optimizer.param_groups[0]['lr'])
losses.append(loss.item())
loss.backward()
optimizer.step()
for pg in optimizer.param_groups:
pg['lr'] *= lr_multiplier
plt.figure(figsize=(10, 4))
plt.plot(lrs, losses)
plt.xscale('log')
plt.xlabel('Learning Rate')
plt.ylabel('Loss')
plt.title('LR Range Test')
plt.grid(True)
plt.savefig('lr_range_test.png')
plt.show()
return lrs, losses
def safe_training_step(model, X, y, optimizer, loss_fn, scaler=None):
"""
NaN Lossを検出してスキップする安全な学習ステップ
"""
optimizer.zero_grad()
if torch.isnan(X).any() or torch.isinf(X).any():
print("警告: 入力にNaN/Infあり、ステップをスキップ")
return None
if scaler is not None:
with torch.cuda.amp.autocast():
output = model(X)
loss = loss_fn(output, y)
if torch.isnan(loss) or torch.isinf(loss):
print(f"警告: Lossが{loss.item()}です、ステップをスキップ")
return None
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
scaler.step(optimizer)
scaler.update()
else:
output = model(X)
loss = loss_fn(output, y)
if torch.isnan(loss) or torch.isinf(loss):
print(f"警告: Lossが{loss.item()}です、ステップをスキップ")
return None
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
return loss.item()
log(0)計算の防止
クロスエントロピー損失やlogベースの損失関数では、log(0)が-Infを返してNaNを引き起こします。
# 間違い: log(0)が発生する可能性
def bad_cross_entropy(pred, target):
return -torch.sum(target * torch.log(pred))
# 正しい: 数値安定性のためepsを使用
def safe_cross_entropy(pred, target, eps=1e-8):
pred = torch.clamp(pred, min=eps, max=1-eps)
return -torch.sum(target * torch.log(pred))
# ベスト: PyTorchの組み込み関数を使用(内部でlog-sum-expトリックを適用)
loss_fn = nn.CrossEntropyLoss() # 数値安定
log_softmax = nn.LogSoftmax(dim=1) # log + softmaxを組み合わせ
def numerically_stable_log_loss(logits, targets):
import torch.nn.functional as F
return F.cross_entropy(logits, targets)
torch.autograd.set_detect_anomalyの使用
import torch
# 異常検出モードを有効化(開発/デバッグ時のみ)
# 性能が低下するため、本番環境では無効化すること
with torch.autograd.detect_anomaly():
output = model(X)
loss = loss_fn(output, y)
loss.backward() # NaN/Infが発生した場合に正確な場所を出力
torch.autograd.set_detect_anomaly(True)
def train_with_anomaly_detection(model, loader, optimizer, loss_fn, device, epochs=5):
model.train()
for epoch in range(epochs):
for batch_idx, (X, y) in enumerate(loader):
X, y = X.to(device), y.to(device)
with torch.autograd.detect_anomaly():
optimizer.zero_grad()
output = model(X)
loss = loss_fn(output, y)
if torch.isnan(loss):
print(f"エポック{epoch}、バッチ{batch_idx}でNaN Loss")
print(f"入力統計: mean={X.mean():.4f}, std={X.std():.4f}")
print(f"出力統計: mean={output.mean():.4f}, std={output.std():.4f}")
break
loss.backward()
optimizer.step()
3. 勾配問題
勾配消失の診断
勾配消失は、逆伝播が早期の層に向かって勾配が極めて小さくなる深いネットワークで発生します。
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
def check_gradient_flow(model):
"""
層毎の勾配の大きさを可視化して勾配消失/爆発を診断する
"""
ave_grads = []
max_grads = []
layers = []
for name, param in model.named_parameters():
if param.requires_grad and param.grad is not None:
layers.append(name)
ave_grads.append(param.grad.abs().mean().item())
max_grads.append(param.grad.abs().max().item())
plt.figure(figsize=(12, 6))
plt.bar(range(len(ave_grads)), ave_grads, alpha=0.5, label='平均勾配')
plt.bar(range(len(max_grads)), max_grads, alpha=0.5, label='最大勾配')
plt.xticks(range(len(layers)), layers, rotation=90)
plt.xlabel("層")
plt.ylabel("勾配の大きさ")
plt.title("層別勾配フロー")
plt.legend()
plt.yscale('log')
plt.tight_layout()
plt.savefig('gradient_flow.png')
for name, avg_grad in zip(layers, ave_grads):
if avg_grad < 1e-6:
print(f"警告: {name}で勾配消失の可能性あり (avg={avg_grad:.2e})")
return layers, ave_grads, max_grads
def register_gradient_hooks(model):
"""
リアルタイム監視のための勾配フックを登録する
"""
gradient_stats = {}
def make_hook(name):
def hook(grad):
gradient_stats[name] = {
'mean': grad.abs().mean().item(),
'max': grad.abs().max().item(),
'std': grad.std().item(),
'has_nan': torch.isnan(grad).any().item(),
'has_inf': torch.isinf(grad).any().item()
}
if torch.isnan(grad).any():
print(f"NaN勾配を検出: {name}")
return grad
return hook
hooks = []
for name, param in model.named_parameters():
if param.requires_grad:
hook = param.register_hook(make_hook(name))
hooks.append(hook)
return gradient_stats, hooks
# 勾配消失の修正: Heの初期化 + BatchNorm + 残差接続
class ResidualBlock(nn.Module):
def __init__(self, dim):
super().__init__()
self.block = nn.Sequential(
nn.Linear(dim, dim),
nn.BatchNorm1d(dim),
nn.ReLU(),
nn.Linear(dim, dim),
nn.BatchNorm1d(dim)
)
self.relu = nn.ReLU()
def forward(self, x):
return self.relu(self.block(x) + x) # 残差接続
def init_weights(m):
if isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
勾配クリッピングによる勾配爆発の解決
import torch
import torch.nn as nn
def train_with_gradient_clipping(model, loader, optimizer, loss_fn, device,
max_norm=1.0, epochs=10):
"""
勾配クリッピング付きの安全な学習ループ
"""
model.train()
history = {'train_loss': [], 'grad_norm': []}
for epoch in range(epochs):
epoch_loss = 0
epoch_grad_norms = []
for X, y in loader:
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
output = model(X)
loss = loss_fn(output, y)
loss.backward()
# クリッピング前の勾配ノルムを計算
total_norm = 0
for p in model.parameters():
if p.grad is not None:
param_norm = p.grad.data.norm(2)
total_norm += param_norm.item() ** 2
total_norm = total_norm ** 0.5
epoch_grad_norms.append(total_norm)
# 勾配クリッピングを適用
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=max_norm)
optimizer.step()
epoch_loss += loss.item()
avg_loss = epoch_loss / len(loader)
avg_grad_norm = sum(epoch_grad_norms) / len(epoch_grad_norms)
history['train_loss'].append(avg_loss)
history['grad_norm'].append(avg_grad_norm)
print(f"エポック{epoch+1}: Loss={avg_loss:.4f}, 勾配ノルム={avg_grad_norm:.4f}")
if avg_grad_norm > max_norm * 10:
print(f"警告: 勾配ノルムが非常に大きい ({avg_grad_norm:.4f})。学習率を下げることを検討してください。")
return history
4. 過学習の解決
過学習の診断
import matplotlib.pyplot as plt
import numpy as np
def plot_learning_curves(train_losses, val_losses, train_accs=None, val_accs=None):
"""
学習/検証のLossと精度曲線を使用して過学習を診断する
"""
fig, axes = plt.subplots(1, 2 if train_accs else 1, figsize=(14, 5))
if not isinstance(axes, np.ndarray):
axes = [axes]
axes[0].plot(train_losses, label='学習Loss', color='blue')
axes[0].plot(val_losses, label='検証Loss', color='red', linestyle='--')
axes[0].set_xlabel('エポック')
axes[0].set_ylabel('Loss')
axes[0].set_title('学習/検証Loss')
axes[0].legend()
axes[0].grid(True)
min_val_idx = np.argmin(val_losses)
axes[0].axvline(x=min_val_idx, color='green', linestyle=':', label=f'最良エポック: {min_val_idx}')
axes[0].legend()
if train_accs and val_accs:
axes[1].plot(train_accs, label='学習精度', color='blue')
axes[1].plot(val_accs, label='検証精度', color='red', linestyle='--')
axes[1].set_xlabel('エポック')
axes[1].set_ylabel('精度')
axes[1].set_title('学習/検証精度')
axes[1].legend()
axes[1].grid(True)
final_gap = val_losses[-1] - train_losses[-1]
print(f"最終過学習ギャップ(検証-学習Loss): {final_gap:.4f}")
if final_gap > 0.1:
print("警告: 深刻な過学習を検出!")
plt.tight_layout()
plt.savefig('learning_curves.png')
plt.show()
早期停止の実装
class EarlyStopping:
"""
検証Lossを監視して過学習時に学習を早期停止する
"""
def __init__(self, patience=10, min_delta=0.001, restore_best=True, verbose=True):
self.patience = patience
self.min_delta = min_delta
self.restore_best = restore_best
self.verbose = verbose
self.best_loss = float('inf')
self.best_epoch = 0
self.counter = 0
self.best_weights = None
self.stopped_epoch = 0
def __call__(self, val_loss, model, epoch):
if val_loss < self.best_loss - self.min_delta:
self.best_loss = val_loss
self.best_epoch = epoch
self.counter = 0
if self.restore_best:
import copy
self.best_weights = copy.deepcopy(model.state_dict())
if self.verbose:
print(f"検証Loss改善: {val_loss:.6f} (エポック{epoch})")
return False
else:
self.counter += 1
if self.verbose:
print(f"早期停止カウンター: {self.counter}/{self.patience}")
if self.counter >= self.patience:
self.stopped_epoch = epoch
if self.restore_best and self.best_weights:
model.load_state_dict(self.best_weights)
print(f"最良の重みを復元しました (エポック{self.best_epoch})")
return True
return False
def train_with_regularization(model, train_loader, val_loader,
optimizer, loss_fn, device, epochs=100):
"""
各種正則化技術を使用した学習ループ
"""
early_stopping = EarlyStopping(patience=15, min_delta=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5, verbose=True
)
train_losses = []
val_losses = []
for epoch in range(epochs):
model.train()
train_loss = 0
for X, y in train_loader:
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
output = model(X)
loss = loss_fn(output, y)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
train_loss += loss.item()
model.eval()
val_loss = 0
with torch.no_grad():
for X, y in val_loader:
X, y = X.to(device), y.to(device)
output = model(X)
val_loss += loss_fn(output, y).item()
train_loss /= len(train_loader)
val_loss /= len(val_loader)
train_losses.append(train_loss)
val_losses.append(val_loss)
scheduler.step(val_loss)
print(f"エポック{epoch+1}: 学習={train_loss:.4f}, 検証={val_loss:.4f}")
if early_stopping(val_loss, model, epoch):
print(f"エポック{epoch+1}で早期停止")
break
return train_losses, val_losses
# Dropout + L2正則化の例
class RegularizedModel(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, dropout_rate=0.3):
super().__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(p=dropout_rate),
nn.Linear(hidden_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(p=dropout_rate),
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
return self.network(x)
# オプティマイザーのweight_decayによるL2正則化
optimizer = torch.optim.AdamW(
model.parameters(),
lr=1e-3,
weight_decay=1e-4
)
データ拡張戦略
import torchvision.transforms as transforms
train_transform = transforms.Compose([
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomRotation(degrees=15),
transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
transforms.RandomGrayscale(p=0.1),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
def mixup_data(x, y, alpha=0.2, device='cuda'):
"""
Mixup拡張: 2つのサンプルを線形補間して新しいサンプルを生成する
"""
if alpha > 0:
lam = np.random.beta(alpha, alpha)
else:
lam = 1
batch_size = x.size(0)
index = torch.randperm(batch_size).to(device)
mixed_x = lam * x + (1 - lam) * x[index, :]
y_a, y_b = y, y[index]
return mixed_x, y_a, y_b, lam
def mixup_criterion(criterion, pred, y_a, y_b, lam):
return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)
5. 学習速度の問題
データローディングのボトルネックを解決する
import torch
from torch.utils.data import DataLoader
import time
def profile_dataloader(dataset, batch_size=32, num_workers_list=[0, 2, 4, 8]):
"""
異なるnum_workers設定でデータローディング速度を比較する
"""
results = {}
for num_workers in num_workers_list:
loader = DataLoader(
dataset,
batch_size=batch_size,
num_workers=num_workers,
pin_memory=True,
prefetch_factor=2 if num_workers > 0 else None,
persistent_workers=True if num_workers > 0 else False
)
start = time.time()
for i, batch in enumerate(loader):
if i >= 10:
break
elapsed = time.time() - start
results[num_workers] = elapsed
print(f"num_workers={num_workers}: {elapsed:.3f}秒(10バッチ)")
best_workers = min(results, key=results.get)
print(f"\n最適なnum_workers: {best_workers}")
return results
def create_optimized_dataloader(dataset, batch_size, is_train=True):
return DataLoader(
dataset,
batch_size=batch_size,
shuffle=is_train,
num_workers=4,
pin_memory=True,
prefetch_factor=2,
persistent_workers=True,
drop_last=is_train
)
混合精度学習
import torch
from torch.cuda.amp import autocast, GradScaler
def train_mixed_precision(model, loader, optimizer, loss_fn, device, epochs=10):
"""
FP16混合精度学習で2〜3倍の速度向上
"""
scaler = GradScaler()
model.train()
for epoch in range(epochs):
for X, y in loader:
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
with autocast(device_type='cuda', dtype=torch.float16):
output = model(X)
loss = loss_fn(output, y)
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
scaler.step(optimizer)
scaler.update()
print(f"エポック{epoch+1}完了, scalerスケール: {scaler.get_scale()}")
torch.compileの適用
import torch
model = MyModel().to(device)
# デフォルトコンパイル
compiled_model = torch.compile(model)
# 最大性能モード(コンパイル時間が長くなる)
compiled_model = torch.compile(model, mode='max-autotune')
# 入力サイズが頻繁に変化する場合
compiled_model = torch.compile(model, dynamic=True)
def benchmark_model(model, inputs, n_iters=100):
# ウォームアップ
for _ in range(10):
_ = model(inputs)
torch.cuda.synchronize()
start = time.time()
for _ in range(n_iters):
_ = model(inputs)
torch.cuda.synchronize()
elapsed = time.time() - start
return elapsed / n_iters
6. メモリ不足(OOM)の解決策
GPUメモリ分析
import torch
import gc
def print_gpu_memory_summary(device=0):
"""
GPUメモリ使用状況の詳細を出力する
"""
if not torch.cuda.is_available():
print("CUDAが利用できません。")
return
print(f"=== GPU {device} メモリサマリー ===")
print(f"総メモリ: {torch.cuda.get_device_properties(device).total_memory / 1e9:.2f} GB")
print(f"予約済みメモリ: {torch.cuda.memory_reserved(device) / 1e9:.2f} GB")
print(f"割り当て済みメモリ: {torch.cuda.memory_allocated(device) / 1e9:.2f} GB")
print(f"キャッシュメモリ: {(torch.cuda.memory_reserved(device) - torch.cuda.memory_allocated(device)) / 1e9:.2f} GB")
print()
print(torch.cuda.memory_summary(device=device, abbreviated=False))
def clear_gpu_memory():
"""
GPUメモリキャッシュをクリアする
"""
gc.collect()
torch.cuda.empty_cache()
torch.cuda.synchronize()
print(f"クリーンアップ後のGPUメモリ: {torch.cuda.memory_allocated() / 1e9:.2f} GB")
勾配チェックポイントの実装
import torch
import torch.nn as nn
from torch.utils.checkpoint import checkpoint, checkpoint_sequential
class MemoryEfficientModel(nn.Module):
"""
勾配チェックポイントを使用したメモリ効率的なモデル
再計算によりメモリを節約するが計算時間が増加する
"""
def __init__(self, layers):
super().__init__()
self.layers = nn.ModuleList(layers)
def forward(self, x):
# checkpoint_sequential: 逐次的な層に自動的にチェックポイントを適用
# segments: チャンクの数(多いほどメモリ節約、遅くなる)
return checkpoint_sequential(self.layers, segments=4, input=x)
def forward_with_manual_checkpoints(self, x):
x = self.layers[0](x)
for layer in self.layers[1:-1]:
x = checkpoint(layer, x)
x = self.layers[-1](x)
return x
# TransformersでGradient Checkpointingを有効化
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained("gpt2")
model.gradient_checkpointing_enable() # Hugging Faceモデルへの簡単な適用
自動バッチサイズ探索
def find_optimal_batch_size(model, loss_fn, device,
start_batch=8, max_batch=512):
"""
OOMなしで使用できる最大バッチサイズを見つける
"""
batch_size = start_batch
optimal_batch_size = start_batch
while batch_size <= max_batch:
try:
dummy_input = torch.randn(batch_size, 3, 224, 224).to(device)
dummy_target = torch.randint(0, 1000, (batch_size,)).to(device)
output = model(dummy_input)
loss = loss_fn(output, dummy_target)
loss.backward()
optimal_batch_size = batch_size
print(f"バッチサイズ{batch_size}: 成功")
batch_size *= 2
del dummy_input, dummy_target, output, loss
torch.cuda.empty_cache()
except RuntimeError as e:
if "out of memory" in str(e):
print(f"バッチサイズ{batch_size}: OOM")
torch.cuda.empty_cache()
break
else:
raise e
print(f"\n推奨バッチサイズ: {optimal_batch_size} (安全マージン: {optimal_batch_size // 2})")
return optimal_batch_size
7. データパイプラインのデバッグ
データサンプルの可視化
import matplotlib.pyplot as plt
import numpy as np
import torch
from collections import Counter
def visualize_batch(loader, num_samples=16, class_names=None):
"""
データバッチサンプルを可視化して前処理結果を確認する
"""
X, y = next(iter(loader))
fig, axes = plt.subplots(4, 4, figsize=(12, 12))
axes = axes.flatten()
for i in range(min(num_samples, len(X))):
img = X[i].numpy()
# ImageNet正規化を逆適用
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img = std[:, None, None] * img + mean[:, None, None]
img = np.clip(img, 0, 1)
axes[i].imshow(img.transpose(1, 2, 0))
label = y[i].item()
title = class_names[label] if class_names else f"ラベル: {label}"
axes[i].set_title(title)
axes[i].axis('off')
plt.tight_layout()
plt.savefig('data_samples.png')
plt.show()
def check_label_distribution(dataset):
"""
ラベル分布を確認してクラス不均衡を検出する
"""
labels = [dataset[i][1] for i in range(len(dataset))]
counter = Counter(labels)
classes = sorted(counter.keys())
counts = [counter[c] for c in classes]
total = sum(counts)
print("ラベル分布:")
for cls, count in zip(classes, counts):
pct = count / total * 100
bar = '#' * int(pct / 2)
print(f" クラス{cls}: {count:5d} ({pct:.1f}%) {bar}")
max_count = max(counts)
min_count = min(counts)
imbalance_ratio = max_count / min_count
if imbalance_ratio > 10:
print(f"\n警告: 深刻なクラス不均衡! (比率: {imbalance_ratio:.1f}:1)")
print("解決策: 加重サンプリングまたはクラス加重損失関数を検討してください。")
return counter
def create_weighted_sampler(dataset):
"""
クラス不均衡に対処するための加重サンプラーを作成する
"""
labels = [dataset[i][1] for i in range(len(dataset))]
class_counts = Counter(labels)
weights = [1.0 / class_counts[label] for label in labels]
weights = torch.DoubleTensor(weights)
sampler = torch.utils.data.WeightedRandomSampler(
weights=weights,
num_samples=len(weights),
replacement=True
)
return sampler
8. モデルアーキテクチャのデバッグ
torchinfoによるモデル構造分析
from torchinfo import summary
import torch
import torch.nn as nn
def analyze_model(model, input_size):
"""
モデル構造を分析してボトルネック層を特定する
"""
model_stats = summary(
model,
input_size=input_size,
col_names=["input_size", "output_size", "num_params", "kernel_size",
"mult_adds"],
verbose=1
)
print("\n層別パラメータ分布:")
for name, module in model.named_modules():
num_params = sum(p.numel() for p in module.parameters(recurse=False))
if num_params > 0:
print(f" {name}: {num_params:,}パラメータ")
return model_stats
def monitor_activations(model, X):
"""
中間活性化値を監視して死んだニューロンを検出する
"""
activations = {}
def make_activation_hook(name):
def hook(module, input, output):
activations[name] = output.detach()
return hook
hooks = []
for name, module in model.named_modules():
if isinstance(module, (nn.ReLU, nn.GELU, nn.Tanh, nn.Sigmoid)):
hook = module.register_forward_hook(make_activation_hook(name))
hooks.append(hook)
with torch.no_grad():
model(X)
print("\n活性化統計:")
for name, act in activations.items():
dead_neurons = (act == 0).float().mean().item()
print(f" {name}:")
print(f" mean: {act.mean():.4f}, std: {act.std():.4f}")
print(f" 死んだニューロン率: {dead_neurons:.2%}")
if dead_neurons > 0.5:
print(f" 警告: ニューロンの{dead_neurons:.0%}が非活性!")
for hook in hooks:
hook.remove()
return activations
9. 学習モニタリング
TensorBoardの使用
from torch.utils.tensorboard import SummaryWriter
import torch
import numpy as np
class TensorBoardLogger:
def __init__(self, log_dir='runs/experiment'):
self.writer = SummaryWriter(log_dir)
def log_scalars(self, metrics: dict, epoch: int):
for name, value in metrics.items():
self.writer.add_scalar(name, value, epoch)
def log_model_gradients(self, model, epoch: int):
for name, param in model.named_parameters():
if param.grad is not None:
self.writer.add_histogram(f'gradients/{name}', param.grad, epoch)
self.writer.add_histogram(f'weights/{name}', param.data, epoch)
def log_learning_rate(self, optimizer, epoch: int):
for i, pg in enumerate(optimizer.param_groups):
self.writer.add_scalar(f'lr/group_{i}', pg['lr'], epoch)
def close(self):
self.writer.close()
def train_with_tensorboard(model, train_loader, val_loader,
optimizer, loss_fn, device, epochs=50):
logger = TensorBoardLogger(log_dir='runs/debug_session')
for epoch in range(epochs):
model.train()
train_loss, train_correct = 0, 0
for X, y in train_loader:
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
output = model(X)
loss = loss_fn(output, y)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_correct += (output.argmax(1) == y).sum().item()
train_loss /= len(train_loader)
train_acc = train_correct / len(train_loader.dataset)
model.eval()
val_loss, val_correct = 0, 0
with torch.no_grad():
for X, y in val_loader:
X, y = X.to(device), y.to(device)
output = model(X)
val_loss += loss_fn(output, y).item()
val_correct += (output.argmax(1) == y).sum().item()
val_loss /= len(val_loader)
val_acc = val_correct / len(val_loader.dataset)
logger.log_scalars({
'Loss/Train': train_loss,
'Loss/Val': val_loss,
'Accuracy/Train': train_acc,
'Accuracy/Val': val_acc,
}, epoch)
logger.log_model_gradients(model, epoch)
logger.log_learning_rate(optimizer, epoch)
logger.close()
print("TensorBoard: 'tensorboard --logdir=runs'で起動")
10. 再現性の確保
import random
import numpy as np
import torch
import os
def set_seed(seed: int = 42):
"""
完全な再現性のためすべての乱数ジェネレーターを固定する
"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed) # マルチGPU
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.use_deterministic_algorithms(True)
os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'
print(f"すべての乱数ジェネレーターをシード{seed}で初期化しました")
def save_experiment_config(config: dict, save_path: str = 'experiment_config.json'):
"""
完全な実験環境を記録する
"""
import json
import subprocess
full_config = config.copy()
full_config['environment'] = {
'python': subprocess.getoutput('python --version'),
'torch': torch.__version__,
'cuda': torch.version.cuda,
'cudnn': str(torch.backends.cudnn.version()),
'gpu': torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'
}
try:
full_config['git_hash'] = subprocess.getoutput('git rev-parse HEAD')
except Exception:
full_config['git_hash'] = 'unknown'
with open(save_path, 'w') as f:
json.dump(full_config, f, indent=2)
print(f"実験設定を保存しました: {save_path}")
return full_config
def test_reproducibility(model_fn, train_fn, seed=42, n_runs=3):
"""
同じシードで複数回実行して再現性を確認する
"""
results = []
for run in range(n_runs):
set_seed(seed)
model = model_fn()
loss = train_fn(model)
results.append(loss)
print(f"実行{run+1}: 最終Loss = {loss:.6f}")
max_diff = max(results) - min(results)
print(f"\n最大差異: {max_diff:.8f}")
if max_diff < 1e-5:
print("再現性チェック合格!")
else:
print("警告: 再現性の問題を検出。")
return results
11. 分散学習のデバッグ
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import os
def setup_distributed(rank, world_size, backend='nccl'):
"""
分散学習環境を初期化する
"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group(
backend=backend,
rank=rank,
world_size=world_size
)
torch.cuda.set_device(rank)
print(f"プロセス{rank}/{world_size}初期化完了")
def cleanup_distributed():
dist.destroy_process_group()
def debug_ddp_training(rank, world_size, model, dataset):
"""
DDP学習デバッグの例
"""
setup_distributed(rank, world_size)
device = torch.device(f'cuda:{rank}')
model = model.to(device)
model = DDP(model, device_ids=[rank], find_unused_parameters=True)
sampler = torch.utils.data.distributed.DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank,
shuffle=True
)
loader = torch.utils.data.DataLoader(
dataset,
batch_size=32,
sampler=sampler,
num_workers=4
)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
loss_fn = torch.nn.CrossEntropyLoss()
for epoch in range(10):
# 重要: エポック毎にサンプラーのシードを更新
sampler.set_epoch(epoch)
for X, y in loader:
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
output = model(X)
loss = loss_fn(output, y)
loss.backward()
optimizer.step()
# ランク0からのみログを出力
if rank == 0:
print(f"エポック{epoch+1}完了")
torch.save(model.module.state_dict(), f'checkpoint_epoch{epoch}.pt')
# すべてのランクを同期
dist.barrier()
cleanup_distributed()
12. MLflowと実験管理
import mlflow
import mlflow.pytorch
import torch
import optuna
def train_with_mlflow(model, train_loader, val_loader, optimizer,
loss_fn, device, params: dict):
"""
MLflowによる実験追跡とモデルバージョン管理
"""
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("deep-learning-debug")
with mlflow.start_run():
mlflow.log_params(params)
best_val_loss = float('inf')
for epoch in range(params['epochs']):
model.train()
train_loss = 0
for X, y in train_loader:
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
output = model(X)
loss = loss_fn(output, y)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
model.eval()
val_loss = 0
with torch.no_grad():
for X, y in val_loader:
X, y = X.to(device), y.to(device)
output = model(X)
val_loss += loss_fn(output, y).item()
val_loss /= len(val_loader)
mlflow.log_metrics({
'train_loss': train_loss,
'val_loss': val_loss
}, step=epoch)
if val_loss < best_val_loss:
best_val_loss = val_loss
mlflow.pytorch.log_model(model, "best_model")
mlflow.log_metric("best_val_loss", best_val_loss)
return best_val_loss
def hyperparameter_optimization_with_optuna(model_fn, train_loader,
val_loader, device, n_trials=50):
"""
Optunaによるハイパーパラメータ最適化
"""
def objective(trial):
lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True)
dropout = trial.suggest_float('dropout', 0.0, 0.5)
batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])
model = model_fn(dropout=dropout).to(device)
optimizer = torch.optim.AdamW(model.parameters(),
lr=lr, weight_decay=weight_decay)
loss_fn = torch.nn.CrossEntropyLoss()
val_loss = train_with_mlflow(
model, train_loader, val_loader, optimizer, loss_fn, device,
params={'lr': lr, 'weight_decay': weight_decay,
'dropout': dropout, 'batch_size': batch_size, 'epochs': 10}
)
return val_loss
study = optuna.create_study(
direction='minimize',
sampler=optuna.samplers.TPESampler(seed=42),
pruner=optuna.pruners.MedianPruner()
)
study.optimize(objective, n_trials=n_trials)
print("\n最良のハイパーパラメータ:")
for key, value in study.best_params.items():
print(f" {key}: {value}")
print(f"最良の検証Loss: {study.best_value:.4f}")
return study.best_params
まとめ:ディープラーニングデバッグワークフロー
ディープラーニングのデバッグには体系的なアプローチが不可欠です。以下の順序で問題を診断してください:
-
データから始める: 問題の80%はデータが原因です。まずNaN、間違ったラベル、正規化エラーを確認してください。
-
小さく始める: フルバッチ学習の前に、モデルが単一バッチを過学習できるかテストしてください。
-
勾配を確認する: 損失関数の後、勾配が正しく流れているか確認してください。
-
モニタリングツールを使用する: TensorBoard、W&B、MLflowのいずれかを選択してすべての実験を追跡してください。
-
再現性を確保する: 固定されたシードなしでのデバッグは非常に困難です。常にシードを設定してください。
def minimum_debug_checklist(model, train_loader, device):
"""
学習開始前に確認する最小チェックリスト
"""
print("ディープラーニング学習前チェックリスト")
print("=" * 50)
# 1. 単一バッチ過学習テスト
print("[1] 単一バッチ過学習テスト中...")
X, y = next(iter(train_loader))
X, y = X.to(device), y.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = torch.nn.CrossEntropyLoss()
initial_loss = None
for step in range(100):
optimizer.zero_grad()
output = model(X)
loss = loss_fn(output, y)
if initial_loss is None:
initial_loss = loss.item()
loss.backward()
optimizer.step()
final_loss = loss.item()
overfit_ratio = initial_loss / final_loss if final_loss > 0 else float('inf')
if overfit_ratio > 10:
print(f" 合格: Lossが{initial_loss:.4f}から{final_loss:.4f}に減少(比率: {overfit_ratio:.1f}x)")
else:
print(f" 警告: 単一バッチを過学習できません(比率: {overfit_ratio:.1f}x)")
print(" -> モデル容量、学習率、データエラーを確認してください")
print("\nチェックリスト完了!")
このガイドで解説した技術を体系的に適用することで、ディープラーニング学習中に発生するほとんどの問題を素早く診断・解決することができます。デバッグは経験とともに速くなりますが、適切なツールと方法論を持つことが最も重要です。