Skip to content

필사 모드: ディープラーニングデバッグ完全ガイド:学習失敗の診断から性能最適化まで

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

ディープラーニングモデルの学習では、予期しない失敗に頻繁に遭遇します。Lossが突然NaNになる、いくら待ってもモデルが収束しない、メモリ不足エラーが発生する——これらはすべてのディープラーニング開発者が経験する問題です。このガイドでは、ディープラーニング学習中に発生する主要な問題を体系的に診断・解決する方法を、実践的なコード例とともに解説します。

1. ディープラーニング学習の一般的な失敗パターン

ディープラーニングの学習失敗は大きく3種類に分類できます。

Lossが減少しない

学習は始まっているのにLossがほとんど減少しないか、初期値付近にとどまっている状態。最も一般的な原因は、学習率が低すぎる、モデル実装のバグ、データ前処理の問題です。

LossがNaNになる

LossがNaN(Not a Number)またはInf(無限大)に変化する。数値不安定性によって発生し、学習率が高すぎる場合やデータに外れ値が含まれている場合に典型的に起こります。

学習Lossは減少しているが検証Lossが増加する

これは過学習(overfitting)です。モデルが学習データを記憶しているが、新しいデータへの汎化に失敗しています。

チェックリストベースの診断フレームワーク

def diagnose_training(model, train_loader, val_loader, optimizer, loss_fn, device):

"""

学習開始前に実行するクイック診断関数

"""

print("=== ディープラーニング学習診断チェックリスト ===\n")

1. データの検証

print("[1] データを検証中...")

batch = next(iter(train_loader))

X, y = batch

print(f" 入力形状: {X.shape}")

print(f" ラベル形状: {y.shape}")

print(f" 入力範囲: [{X.min():.4f}, {X.max():.4f}]")

print(f" 入力にNaNあり: {torch.isnan(X).any()}")

print(f" 入力にInfあり: {torch.isinf(X).any()}")

2. モデルパラメータの検証

print("\n[2] モデルパラメータを検証中...")

total_params = sum(p.numel() for p in model.parameters())

trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f" 総パラメータ数: {total_params:,}")

print(f" 学習可能パラメータ数: {trainable_params:,}")

3. 順伝播のテスト

print("\n[3] 順伝播をテスト中...")

model.eval()

with torch.no_grad():

try:

output = model(X.to(device))

print(f" 出力形状: {output.shape}")

print(f" 出力にNaNあり: {torch.isnan(output).any()}")

loss = loss_fn(output, y.to(device))

print(f" 初期Loss: {loss.item():.4f}")

except Exception as e:

print(f" 順伝播失敗: {e}")

4. 逆伝播のテスト

print("\n[4] 逆伝播をテスト中...")

model.train()

optimizer.zero_grad()

output = model(X.to(device))

loss = loss_fn(output, y.to(device))

loss.backward()

勾配の検証

grad_norms = []

for name, param in model.named_parameters():

if param.grad is not None:

grad_norms.append((name, param.grad.norm().item()))

if grad_norms:

print(" 層別勾配ノルム上位5:")

for name, norm in sorted(grad_norms, key=lambda x: x[1], reverse=True)[:5]:

print(f" {name}: {norm:.6f}")

print("\n診断完了!")

2. Loss問題の診断

NaN Lossの原因と解決策

NaN Lossはディープラーニングで最も苛立たしい問題の一つです。複数の原因があり、それぞれ異なるアプローチが必要です。

学習率が高すぎる

NaN Lossの最も一般的な原因です。学習率が高すぎると、パラメータ更新の大きさが過大になりLossが爆発します。

def find_learning_rate(model, train_loader, loss_fn, device,

start_lr=1e-7, end_lr=10, num_iter=100):

"""

LRレンジテストを使用して最適な学習率範囲を見つける。

"""

optimizer = optim.SGD(model.parameters(), lr=start_lr)

lr_multiplier = (end_lr / start_lr) ** (1 / num_iter)

lrs = []

losses = []

best_loss = float('inf')

model.train()

data_iter = iter(train_loader)

for i in range(num_iter):

try:

X, y = next(data_iter)

except StopIteration:

data_iter = iter(train_loader)

X, y = next(data_iter)

X, y = X.to(device), y.to(device)

optimizer.zero_grad()

output = model(X)

loss = loss_fn(output, y)

if torch.isnan(loss) or loss.item() > best_loss * 4:

print(f"lr={optimizer.param_groups[0]['lr']:.2e}でLoss爆発を検出")

break

if loss.item() < best_loss:

best_loss = loss.item()

lrs.append(optimizer.param_groups[0]['lr'])

losses.append(loss.item())

loss.backward()

optimizer.step()

for pg in optimizer.param_groups:

pg['lr'] *= lr_multiplier

plt.figure(figsize=(10, 4))

plt.plot(lrs, losses)

plt.xscale('log')

plt.xlabel('Learning Rate')

plt.ylabel('Loss')

plt.title('LR Range Test')

plt.grid(True)

plt.savefig('lr_range_test.png')

plt.show()

return lrs, losses

def safe_training_step(model, X, y, optimizer, loss_fn, scaler=None):

"""

NaN Lossを検出してスキップする安全な学習ステップ

"""

optimizer.zero_grad()

if torch.isnan(X).any() or torch.isinf(X).any():

print("警告: 入力にNaN/Infあり、ステップをスキップ")

return None

if scaler is not None:

with torch.cuda.amp.autocast():

output = model(X)

loss = loss_fn(output, y)

if torch.isnan(loss) or torch.isinf(loss):

print(f"警告: Lossが{loss.item()}です、ステップをスキップ")

return None

scaler.scale(loss).backward()

scaler.unscale_(optimizer)

torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

scaler.step(optimizer)

scaler.update()

else:

output = model(X)

loss = loss_fn(output, y)

if torch.isnan(loss) or torch.isinf(loss):

print(f"警告: Lossが{loss.item()}です、ステップをスキップ")

return None

loss.backward()

torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

optimizer.step()

return loss.item()

log(0)計算の防止

クロスエントロピー損失やlogベースの損失関数では、log(0)が-Infを返してNaNを引き起こします。

間違い: log(0)が発生する可能性

def bad_cross_entropy(pred, target):

return -torch.sum(target * torch.log(pred))

正しい: 数値安定性のためepsを使用

def safe_cross_entropy(pred, target, eps=1e-8):

pred = torch.clamp(pred, min=eps, max=1-eps)

return -torch.sum(target * torch.log(pred))

ベスト: PyTorchの組み込み関数を使用(内部でlog-sum-expトリックを適用)

loss_fn = nn.CrossEntropyLoss() # 数値安定

log_softmax = nn.LogSoftmax(dim=1) # log + softmaxを組み合わせ

def numerically_stable_log_loss(logits, targets):

return F.cross_entropy(logits, targets)

torch.autograd.set_detect_anomalyの使用

異常検出モードを有効化(開発/デバッグ時のみ)

性能が低下するため、本番環境では無効化すること

with torch.autograd.detect_anomaly():

output = model(X)

loss = loss_fn(output, y)

loss.backward() # NaN/Infが発生した場合に正確な場所を出力

torch.autograd.set_detect_anomaly(True)

def train_with_anomaly_detection(model, loader, optimizer, loss_fn, device, epochs=5):

model.train()

for epoch in range(epochs):

for batch_idx, (X, y) in enumerate(loader):

X, y = X.to(device), y.to(device)

with torch.autograd.detect_anomaly():

optimizer.zero_grad()

output = model(X)

loss = loss_fn(output, y)

if torch.isnan(loss):

print(f"エポック{epoch}、バッチ{batch_idx}でNaN Loss")

print(f"入力統計: mean={X.mean():.4f}, std={X.std():.4f}")

print(f"出力統計: mean={output.mean():.4f}, std={output.std():.4f}")

break

loss.backward()

optimizer.step()

3. 勾配問題

勾配消失の診断

勾配消失は、逆伝播が早期の層に向かって勾配が極めて小さくなる深いネットワークで発生します。

def check_gradient_flow(model):

"""

層毎の勾配の大きさを可視化して勾配消失/爆発を診断する

"""

ave_grads = []

max_grads = []

layers = []

for name, param in model.named_parameters():

if param.requires_grad and param.grad is not None:

layers.append(name)

ave_grads.append(param.grad.abs().mean().item())

max_grads.append(param.grad.abs().max().item())

plt.figure(figsize=(12, 6))

plt.bar(range(len(ave_grads)), ave_grads, alpha=0.5, label='平均勾配')

plt.bar(range(len(max_grads)), max_grads, alpha=0.5, label='最大勾配')

plt.xticks(range(len(layers)), layers, rotation=90)

plt.xlabel("層")

plt.ylabel("勾配の大きさ")

plt.title("層別勾配フロー")

plt.legend()

plt.yscale('log')

plt.tight_layout()

plt.savefig('gradient_flow.png')

for name, avg_grad in zip(layers, ave_grads):

if avg_grad < 1e-6:

print(f"警告: {name}で勾配消失の可能性あり (avg={avg_grad:.2e})")

return layers, ave_grads, max_grads

def register_gradient_hooks(model):

"""

リアルタイム監視のための勾配フックを登録する

"""

gradient_stats = {}

def make_hook(name):

def hook(grad):

gradient_stats[name] = {

'mean': grad.abs().mean().item(),

'max': grad.abs().max().item(),

'std': grad.std().item(),

'has_nan': torch.isnan(grad).any().item(),

'has_inf': torch.isinf(grad).any().item()

}

if torch.isnan(grad).any():

print(f"NaN勾配を検出: {name}")

return grad

return hook

hooks = []

for name, param in model.named_parameters():

if param.requires_grad:

hook = param.register_hook(make_hook(name))

hooks.append(hook)

return gradient_stats, hooks

勾配消失の修正: Heの初期化 + BatchNorm + 残差接続

class ResidualBlock(nn.Module):

def __init__(self, dim):

super().__init__()

self.block = nn.Sequential(

nn.Linear(dim, dim),

nn.BatchNorm1d(dim),

nn.ReLU(),

nn.Linear(dim, dim),

nn.BatchNorm1d(dim)

)

self.relu = nn.ReLU()

def forward(self, x):

return self.relu(self.block(x) + x) # 残差接続

def init_weights(m):

if isinstance(m, nn.Linear):

nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

if m.bias is not None:

nn.init.zeros_(m.bias)

elif isinstance(m, nn.Conv2d):

nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

if m.bias is not None:

nn.init.zeros_(m.bias)

elif isinstance(m, nn.BatchNorm2d):

nn.init.ones_(m.weight)

nn.init.zeros_(m.bias)

勾配クリッピングによる勾配爆発の解決

def train_with_gradient_clipping(model, loader, optimizer, loss_fn, device,

max_norm=1.0, epochs=10):

"""

勾配クリッピング付きの安全な学習ループ

"""

model.train()

history = {'train_loss': [], 'grad_norm': []}

for epoch in range(epochs):

epoch_loss = 0

epoch_grad_norms = []

for X, y in loader:

X, y = X.to(device), y.to(device)

optimizer.zero_grad()

output = model(X)

loss = loss_fn(output, y)

loss.backward()

クリッピング前の勾配ノルムを計算

total_norm = 0

for p in model.parameters():

if p.grad is not None:

param_norm = p.grad.data.norm(2)

total_norm += param_norm.item() ** 2

total_norm = total_norm ** 0.5

epoch_grad_norms.append(total_norm)

勾配クリッピングを適用

torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=max_norm)

optimizer.step()

epoch_loss += loss.item()

avg_loss = epoch_loss / len(loader)

avg_grad_norm = sum(epoch_grad_norms) / len(epoch_grad_norms)

history['train_loss'].append(avg_loss)

history['grad_norm'].append(avg_grad_norm)

print(f"エポック{epoch+1}: Loss={avg_loss:.4f}, 勾配ノルム={avg_grad_norm:.4f}")

if avg_grad_norm > max_norm * 10:

print(f"警告: 勾配ノルムが非常に大きい ({avg_grad_norm:.4f})。学習率を下げることを検討してください。")

return history

4. 過学習の解決

過学習の診断

def plot_learning_curves(train_losses, val_losses, train_accs=None, val_accs=None):

"""

学習/検証のLossと精度曲線を使用して過学習を診断する

"""

fig, axes = plt.subplots(1, 2 if train_accs else 1, figsize=(14, 5))

if not isinstance(axes, np.ndarray):

axes = [axes]

axes[0].plot(train_losses, label='学習Loss', color='blue')

axes[0].plot(val_losses, label='検証Loss', color='red', linestyle='--')

axes[0].set_xlabel('エポック')

axes[0].set_ylabel('Loss')

axes[0].set_title('学習/検証Loss')

axes[0].legend()

axes[0].grid(True)

min_val_idx = np.argmin(val_losses)

axes[0].axvline(x=min_val_idx, color='green', linestyle=':', label=f'最良エポック: {min_val_idx}')

axes[0].legend()

if train_accs and val_accs:

axes[1].plot(train_accs, label='学習精度', color='blue')

axes[1].plot(val_accs, label='検証精度', color='red', linestyle='--')

axes[1].set_xlabel('エポック')

axes[1].set_ylabel('精度')

axes[1].set_title('学習/検証精度')

axes[1].legend()

axes[1].grid(True)

final_gap = val_losses[-1] - train_losses[-1]

print(f"最終過学習ギャップ(検証-学習Loss): {final_gap:.4f}")

if final_gap > 0.1:

print("警告: 深刻な過学習を検出!")

plt.tight_layout()

plt.savefig('learning_curves.png')

plt.show()

早期停止の実装

class EarlyStopping:

"""

検証Lossを監視して過学習時に学習を早期停止する

"""

def __init__(self, patience=10, min_delta=0.001, restore_best=True, verbose=True):

self.patience = patience

self.min_delta = min_delta

self.restore_best = restore_best

self.verbose = verbose

self.best_loss = float('inf')

self.best_epoch = 0

self.counter = 0

self.best_weights = None

self.stopped_epoch = 0

def __call__(self, val_loss, model, epoch):

if val_loss < self.best_loss - self.min_delta:

self.best_loss = val_loss

self.best_epoch = epoch

self.counter = 0

if self.restore_best:

self.best_weights = copy.deepcopy(model.state_dict())

if self.verbose:

print(f"検証Loss改善: {val_loss:.6f} (エポック{epoch})")

return False

else:

self.counter += 1

if self.verbose:

print(f"早期停止カウンター: {self.counter}/{self.patience}")

if self.counter >= self.patience:

self.stopped_epoch = epoch

if self.restore_best and self.best_weights:

model.load_state_dict(self.best_weights)

print(f"最良の重みを復元しました (エポック{self.best_epoch})")

return True

return False

def train_with_regularization(model, train_loader, val_loader,

optimizer, loss_fn, device, epochs=100):

"""

各種正則化技術を使用した学習ループ

"""

early_stopping = EarlyStopping(patience=15, min_delta=0.001)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(

optimizer, mode='min', factor=0.5, patience=5, verbose=True

)

train_losses = []

val_losses = []

for epoch in range(epochs):

model.train()

train_loss = 0

for X, y in train_loader:

X, y = X.to(device), y.to(device)

optimizer.zero_grad()

output = model(X)

loss = loss_fn(output, y)

loss.backward()

torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

optimizer.step()

train_loss += loss.item()

model.eval()

val_loss = 0

with torch.no_grad():

for X, y in val_loader:

X, y = X.to(device), y.to(device)

output = model(X)

val_loss += loss_fn(output, y).item()

train_loss /= len(train_loader)

val_loss /= len(val_loader)

train_losses.append(train_loss)

val_losses.append(val_loss)

scheduler.step(val_loss)

print(f"エポック{epoch+1}: 学習={train_loss:.4f}, 検証={val_loss:.4f}")

if early_stopping(val_loss, model, epoch):

print(f"エポック{epoch+1}で早期停止")

break

return train_losses, val_losses

Dropout + L2正則化の例

class RegularizedModel(nn.Module):

def __init__(self, input_dim, hidden_dim, output_dim, dropout_rate=0.3):

super().__init__()

self.network = nn.Sequential(

nn.Linear(input_dim, hidden_dim),

nn.BatchNorm1d(hidden_dim),

nn.ReLU(),

nn.Dropout(p=dropout_rate),

nn.Linear(hidden_dim, hidden_dim),

nn.BatchNorm1d(hidden_dim),

nn.ReLU(),

nn.Dropout(p=dropout_rate),

nn.Linear(hidden_dim, output_dim)

)

def forward(self, x):

return self.network(x)

オプティマイザーのweight_decayによるL2正則化

optimizer = torch.optim.AdamW(

model.parameters(),

lr=1e-3,

weight_decay=1e-4

)

データ拡張戦略

train_transform = transforms.Compose([

transforms.RandomHorizontalFlip(p=0.5),

transforms.RandomRotation(degrees=15),

transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),

transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),

transforms.RandomGrayscale(p=0.1),

transforms.ToTensor(),

transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

])

val_transform = transforms.Compose([

transforms.Resize(256),

transforms.CenterCrop(224),

transforms.ToTensor(),

transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

])

def mixup_data(x, y, alpha=0.2, device='cuda'):

"""

Mixup拡張: 2つのサンプルを線形補間して新しいサンプルを生成する

"""

if alpha > 0:

lam = np.random.beta(alpha, alpha)

else:

lam = 1

batch_size = x.size(0)

index = torch.randperm(batch_size).to(device)

mixed_x = lam * x + (1 - lam) * x[index, :]

y_a, y_b = y, y[index]

return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):

return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

5. 学習速度の問題

データローディングのボトルネックを解決する

from torch.utils.data import DataLoader

def profile_dataloader(dataset, batch_size=32, num_workers_list=[0, 2, 4, 8]):

"""

異なるnum_workers設定でデータローディング速度を比較する

"""

results = {}

for num_workers in num_workers_list:

loader = DataLoader(

dataset,

batch_size=batch_size,

num_workers=num_workers,

pin_memory=True,

prefetch_factor=2 if num_workers > 0 else None,

persistent_workers=True if num_workers > 0 else False

)

start = time.time()

for i, batch in enumerate(loader):

if i >= 10:

break

elapsed = time.time() - start

results[num_workers] = elapsed

print(f"num_workers={num_workers}: {elapsed:.3f}秒(10バッチ)")

best_workers = min(results, key=results.get)

print(f"\n最適なnum_workers: {best_workers}")

return results

def create_optimized_dataloader(dataset, batch_size, is_train=True):

return DataLoader(

dataset,

batch_size=batch_size,

shuffle=is_train,

num_workers=4,

pin_memory=True,

prefetch_factor=2,

persistent_workers=True,

drop_last=is_train

)

混合精度学習

from torch.cuda.amp import autocast, GradScaler

def train_mixed_precision(model, loader, optimizer, loss_fn, device, epochs=10):

"""

FP16混合精度学習で2〜3倍の速度向上

"""

scaler = GradScaler()

model.train()

for epoch in range(epochs):

for X, y in loader:

X, y = X.to(device), y.to(device)

optimizer.zero_grad()

with autocast(device_type='cuda', dtype=torch.float16):

output = model(X)

loss = loss_fn(output, y)

scaler.scale(loss).backward()

scaler.unscale_(optimizer)

torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

scaler.step(optimizer)

scaler.update()

print(f"エポック{epoch+1}完了, scalerスケール: {scaler.get_scale()}")

torch.compileの適用

model = MyModel().to(device)

デフォルトコンパイル

compiled_model = torch.compile(model)

最大性能モード(コンパイル時間が長くなる)

compiled_model = torch.compile(model, mode='max-autotune')

入力サイズが頻繁に変化する場合

compiled_model = torch.compile(model, dynamic=True)

def benchmark_model(model, inputs, n_iters=100):

ウォームアップ

for _ in range(10):

_ = model(inputs)

torch.cuda.synchronize()

start = time.time()

for _ in range(n_iters):

_ = model(inputs)

torch.cuda.synchronize()

elapsed = time.time() - start

return elapsed / n_iters

6. メモリ不足(OOM)の解決策

GPUメモリ分析

def print_gpu_memory_summary(device=0):

"""

GPUメモリ使用状況の詳細を出力する

"""

if not torch.cuda.is_available():

print("CUDAが利用できません。")

return

print(f"=== GPU {device} メモリサマリー ===")

print(f"総メモリ: {torch.cuda.get_device_properties(device).total_memory / 1e9:.2f} GB")

print(f"予約済みメモリ: {torch.cuda.memory_reserved(device) / 1e9:.2f} GB")

print(f"割り当て済みメモリ: {torch.cuda.memory_allocated(device) / 1e9:.2f} GB")

print(f"キャッシュメモリ: {(torch.cuda.memory_reserved(device) - torch.cuda.memory_allocated(device)) / 1e9:.2f} GB")

print()

print(torch.cuda.memory_summary(device=device, abbreviated=False))

def clear_gpu_memory():

"""

GPUメモリキャッシュをクリアする

"""

gc.collect()

torch.cuda.empty_cache()

torch.cuda.synchronize()

print(f"クリーンアップ後のGPUメモリ: {torch.cuda.memory_allocated() / 1e9:.2f} GB")

勾配チェックポイントの実装

from torch.utils.checkpoint import checkpoint, checkpoint_sequential

class MemoryEfficientModel(nn.Module):

"""

勾配チェックポイントを使用したメモリ効率的なモデル

再計算によりメモリを節約するが計算時間が増加する

"""

def __init__(self, layers):

super().__init__()

self.layers = nn.ModuleList(layers)

def forward(self, x):

checkpoint_sequential: 逐次的な層に自動的にチェックポイントを適用

segments: チャンクの数(多いほどメモリ節約、遅くなる)

return checkpoint_sequential(self.layers, segments=4, input=x)

def forward_with_manual_checkpoints(self, x):

x = self.layers[0](x)

for layer in self.layers[1:-1]:

x = checkpoint(layer, x)

x = self.layers[-1](x)

return x

TransformersでGradient Checkpointingを有効化

from transformers import AutoModelForCausalLM

model = AutoModelForCausalLM.from_pretrained("gpt2")

model.gradient_checkpointing_enable() # Hugging Faceモデルへの簡単な適用

自動バッチサイズ探索

def find_optimal_batch_size(model, loss_fn, device,

start_batch=8, max_batch=512):

"""

OOMなしで使用できる最大バッチサイズを見つける

"""

batch_size = start_batch

optimal_batch_size = start_batch

while batch_size <= max_batch:

try:

dummy_input = torch.randn(batch_size, 3, 224, 224).to(device)

dummy_target = torch.randint(0, 1000, (batch_size,)).to(device)

output = model(dummy_input)

loss = loss_fn(output, dummy_target)

loss.backward()

optimal_batch_size = batch_size

print(f"バッチサイズ{batch_size}: 成功")

batch_size *= 2

del dummy_input, dummy_target, output, loss

torch.cuda.empty_cache()

except RuntimeError as e:

if "out of memory" in str(e):

print(f"バッチサイズ{batch_size}: OOM")

torch.cuda.empty_cache()

break

else:

raise e

print(f"\n推奨バッチサイズ: {optimal_batch_size} (安全マージン: {optimal_batch_size // 2})")

return optimal_batch_size

7. データパイプラインのデバッグ

データサンプルの可視化

from collections import Counter

def visualize_batch(loader, num_samples=16, class_names=None):

"""

データバッチサンプルを可視化して前処理結果を確認する

"""

X, y = next(iter(loader))

fig, axes = plt.subplots(4, 4, figsize=(12, 12))

axes = axes.flatten()

for i in range(min(num_samples, len(X))):

img = X[i].numpy()

ImageNet正規化を逆適用

mean = np.array([0.485, 0.456, 0.406])

std = np.array([0.229, 0.224, 0.225])

img = std[:, None, None] * img + mean[:, None, None]

img = np.clip(img, 0, 1)

axes[i].imshow(img.transpose(1, 2, 0))

label = y[i].item()

title = class_names[label] if class_names else f"ラベル: {label}"

axes[i].set_title(title)

axes[i].axis('off')

plt.tight_layout()

plt.savefig('data_samples.png')

plt.show()

def check_label_distribution(dataset):

"""

ラベル分布を確認してクラス不均衡を検出する

"""

labels = [dataset[i][1] for i in range(len(dataset))]

counter = Counter(labels)

classes = sorted(counter.keys())

counts = [counter[c] for c in classes]

total = sum(counts)

print("ラベル分布:")

for cls, count in zip(classes, counts):

pct = count / total * 100

bar = '#' * int(pct / 2)

print(f" クラス{cls}: {count:5d} ({pct:.1f}%) {bar}")

max_count = max(counts)

min_count = min(counts)

imbalance_ratio = max_count / min_count

if imbalance_ratio > 10:

print(f"\n警告: 深刻なクラス不均衡! (比率: {imbalance_ratio:.1f}:1)")

print("解決策: 加重サンプリングまたはクラス加重損失関数を検討してください。")

return counter

def create_weighted_sampler(dataset):

"""

クラス不均衡に対処するための加重サンプラーを作成する

"""

labels = [dataset[i][1] for i in range(len(dataset))]

class_counts = Counter(labels)

weights = [1.0 / class_counts[label] for label in labels]

weights = torch.DoubleTensor(weights)

sampler = torch.utils.data.WeightedRandomSampler(

weights=weights,

num_samples=len(weights),

replacement=True

)

return sampler

8. モデルアーキテクチャのデバッグ

torchinfoによるモデル構造分析

from torchinfo import summary

def analyze_model(model, input_size):

"""

モデル構造を分析してボトルネック層を特定する

"""

model_stats = summary(

model,

input_size=input_size,

col_names=["input_size", "output_size", "num_params", "kernel_size",

"mult_adds"],

verbose=1

)

print("\n層別パラメータ分布:")

for name, module in model.named_modules():

num_params = sum(p.numel() for p in module.parameters(recurse=False))

if num_params > 0:

print(f" {name}: {num_params:,}パラメータ")

return model_stats

def monitor_activations(model, X):

"""

中間活性化値を監視して死んだニューロンを検出する

"""

activations = {}

def make_activation_hook(name):

def hook(module, input, output):

activations[name] = output.detach()

return hook

hooks = []

for name, module in model.named_modules():

if isinstance(module, (nn.ReLU, nn.GELU, nn.Tanh, nn.Sigmoid)):

hook = module.register_forward_hook(make_activation_hook(name))

hooks.append(hook)

with torch.no_grad():

model(X)

print("\n活性化統計:")

for name, act in activations.items():

dead_neurons = (act == 0).float().mean().item()

print(f" {name}:")

print(f" mean: {act.mean():.4f}, std: {act.std():.4f}")

print(f" 死んだニューロン率: {dead_neurons:.2%}")

if dead_neurons > 0.5:

print(f" 警告: ニューロンの{dead_neurons:.0%}が非活性!")

for hook in hooks:

hook.remove()

return activations

9. 学習モニタリング

TensorBoardの使用

from torch.utils.tensorboard import SummaryWriter

class TensorBoardLogger:

def __init__(self, log_dir='runs/experiment'):

self.writer = SummaryWriter(log_dir)

def log_scalars(self, metrics: dict, epoch: int):

for name, value in metrics.items():

self.writer.add_scalar(name, value, epoch)

def log_model_gradients(self, model, epoch: int):

for name, param in model.named_parameters():

if param.grad is not None:

self.writer.add_histogram(f'gradients/{name}', param.grad, epoch)

self.writer.add_histogram(f'weights/{name}', param.data, epoch)

def log_learning_rate(self, optimizer, epoch: int):

for i, pg in enumerate(optimizer.param_groups):

self.writer.add_scalar(f'lr/group_{i}', pg['lr'], epoch)

def close(self):

self.writer.close()

def train_with_tensorboard(model, train_loader, val_loader,

optimizer, loss_fn, device, epochs=50):

logger = TensorBoardLogger(log_dir='runs/debug_session')

for epoch in range(epochs):

model.train()

train_loss, train_correct = 0, 0

for X, y in train_loader:

X, y = X.to(device), y.to(device)

optimizer.zero_grad()

output = model(X)

loss = loss_fn(output, y)

loss.backward()

optimizer.step()

train_loss += loss.item()

train_correct += (output.argmax(1) == y).sum().item()

train_loss /= len(train_loader)

train_acc = train_correct / len(train_loader.dataset)

model.eval()

val_loss, val_correct = 0, 0

with torch.no_grad():

for X, y in val_loader:

X, y = X.to(device), y.to(device)

output = model(X)

val_loss += loss_fn(output, y).item()

val_correct += (output.argmax(1) == y).sum().item()

val_loss /= len(val_loader)

val_acc = val_correct / len(val_loader.dataset)

logger.log_scalars({

'Loss/Train': train_loss,

'Loss/Val': val_loss,

'Accuracy/Train': train_acc,

'Accuracy/Val': val_acc,

}, epoch)

logger.log_model_gradients(model, epoch)

logger.log_learning_rate(optimizer, epoch)

logger.close()

print("TensorBoard: 'tensorboard --logdir=runs'で起動")

10. 再現性の確保

def set_seed(seed: int = 42):

"""

完全な再現性のためすべての乱数ジェネレーターを固定する

"""

random.seed(seed)

np.random.seed(seed)

torch.manual_seed(seed)

torch.cuda.manual_seed(seed)

torch.cuda.manual_seed_all(seed) # マルチGPU

torch.backends.cudnn.deterministic = True

torch.backends.cudnn.benchmark = False

torch.use_deterministic_algorithms(True)

os.environ['PYTHONHASHSEED'] = str(seed)

os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'

print(f"すべての乱数ジェネレーターをシード{seed}で初期化しました")

def save_experiment_config(config: dict, save_path: str = 'experiment_config.json'):

"""

完全な実験環境を記録する

"""

full_config = config.copy()

full_config['environment'] = {

'python': subprocess.getoutput('python --version'),

'torch': torch.__version__,

'cuda': torch.version.cuda,

'cudnn': str(torch.backends.cudnn.version()),

'gpu': torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'

}

try:

full_config['git_hash'] = subprocess.getoutput('git rev-parse HEAD')

except Exception:

full_config['git_hash'] = 'unknown'

with open(save_path, 'w') as f:

json.dump(full_config, f, indent=2)

print(f"実験設定を保存しました: {save_path}")

return full_config

def test_reproducibility(model_fn, train_fn, seed=42, n_runs=3):

"""

同じシードで複数回実行して再現性を確認する

"""

results = []

for run in range(n_runs):

set_seed(seed)

model = model_fn()

loss = train_fn(model)

results.append(loss)

print(f"実行{run+1}: 最終Loss = {loss:.6f}")

max_diff = max(results) - min(results)

print(f"\n最大差異: {max_diff:.8f}")

if max_diff < 1e-5:

print("再現性チェック合格!")

else:

print("警告: 再現性の問題を検出。")

return results

11. 分散学習のデバッグ

from torch.nn.parallel import DistributedDataParallel as DDP

def setup_distributed(rank, world_size, backend='nccl'):

"""

分散学習環境を初期化する

"""

os.environ['MASTER_ADDR'] = 'localhost'

os.environ['MASTER_PORT'] = '12355'

dist.init_process_group(

backend=backend,

rank=rank,

world_size=world_size

)

torch.cuda.set_device(rank)

print(f"プロセス{rank}/{world_size}初期化完了")

def cleanup_distributed():

dist.destroy_process_group()

def debug_ddp_training(rank, world_size, model, dataset):

"""

DDP学習デバッグの例

"""

setup_distributed(rank, world_size)

device = torch.device(f'cuda:{rank}')

model = model.to(device)

model = DDP(model, device_ids=[rank], find_unused_parameters=True)

sampler = torch.utils.data.distributed.DistributedSampler(

dataset,

num_replicas=world_size,

rank=rank,

shuffle=True

)

loader = torch.utils.data.DataLoader(

dataset,

batch_size=32,

sampler=sampler,

num_workers=4

)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

loss_fn = torch.nn.CrossEntropyLoss()

for epoch in range(10):

重要: エポック毎にサンプラーのシードを更新

sampler.set_epoch(epoch)

for X, y in loader:

X, y = X.to(device), y.to(device)

optimizer.zero_grad()

output = model(X)

loss = loss_fn(output, y)

loss.backward()

optimizer.step()

ランク0からのみログを出力

if rank == 0:

print(f"エポック{epoch+1}完了")

torch.save(model.module.state_dict(), f'checkpoint_epoch{epoch}.pt')

すべてのランクを同期

dist.barrier()

cleanup_distributed()

12. MLflowと実験管理

def train_with_mlflow(model, train_loader, val_loader, optimizer,

loss_fn, device, params: dict):

"""

MLflowによる実験追跡とモデルバージョン管理

"""

mlflow.set_tracking_uri("http://localhost:5000")

mlflow.set_experiment("deep-learning-debug")

with mlflow.start_run():

mlflow.log_params(params)

best_val_loss = float('inf')

for epoch in range(params['epochs']):

model.train()

train_loss = 0

for X, y in train_loader:

X, y = X.to(device), y.to(device)

optimizer.zero_grad()

output = model(X)

loss = loss_fn(output, y)

loss.backward()

optimizer.step()

train_loss += loss.item()

train_loss /= len(train_loader)

model.eval()

val_loss = 0

with torch.no_grad():

for X, y in val_loader:

X, y = X.to(device), y.to(device)

output = model(X)

val_loss += loss_fn(output, y).item()

val_loss /= len(val_loader)

mlflow.log_metrics({

'train_loss': train_loss,

'val_loss': val_loss

}, step=epoch)

if val_loss < best_val_loss:

best_val_loss = val_loss

mlflow.pytorch.log_model(model, "best_model")

mlflow.log_metric("best_val_loss", best_val_loss)

return best_val_loss

def hyperparameter_optimization_with_optuna(model_fn, train_loader,

val_loader, device, n_trials=50):

"""

Optunaによるハイパーパラメータ最適化

"""

def objective(trial):

lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)

weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True)

dropout = trial.suggest_float('dropout', 0.0, 0.5)

batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])

model = model_fn(dropout=dropout).to(device)

optimizer = torch.optim.AdamW(model.parameters(),

lr=lr, weight_decay=weight_decay)

loss_fn = torch.nn.CrossEntropyLoss()

val_loss = train_with_mlflow(

model, train_loader, val_loader, optimizer, loss_fn, device,

params={'lr': lr, 'weight_decay': weight_decay,

'dropout': dropout, 'batch_size': batch_size, 'epochs': 10}

)

return val_loss

study = optuna.create_study(

direction='minimize',

sampler=optuna.samplers.TPESampler(seed=42),

pruner=optuna.pruners.MedianPruner()

)

study.optimize(objective, n_trials=n_trials)

print("\n最良のハイパーパラメータ:")

for key, value in study.best_params.items():

print(f" {key}: {value}")

print(f"最良の検証Loss: {study.best_value:.4f}")

return study.best_params

まとめ:ディープラーニングデバッグワークフロー

ディープラーニングのデバッグには体系的なアプローチが不可欠です。以下の順序で問題を診断してください:

1. **データから始める**: 問題の80%はデータが原因です。まずNaN、間違ったラベル、正規化エラーを確認してください。

2. **小さく始める**: フルバッチ学習の前に、モデルが単一バッチを過学習できるかテストしてください。

3. **勾配を確認する**: 損失関数の後、勾配が正しく流れているか確認してください。

4. **モニタリングツールを使用する**: TensorBoard、W&B、MLflowのいずれかを選択してすべての実験を追跡してください。

5. **再現性を確保する**: 固定されたシードなしでのデバッグは非常に困難です。常にシードを設定してください。

def minimum_debug_checklist(model, train_loader, device):

"""

学習開始前に確認する最小チェックリスト

"""

print("ディープラーニング学習前チェックリスト")

print("=" * 50)

1. 単一バッチ過学習テスト

print("[1] 単一バッチ過学習テスト中...")

X, y = next(iter(train_loader))

X, y = X.to(device), y.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

loss_fn = torch.nn.CrossEntropyLoss()

initial_loss = None

for step in range(100):

optimizer.zero_grad()

output = model(X)

loss = loss_fn(output, y)

if initial_loss is None:

initial_loss = loss.item()

loss.backward()

optimizer.step()

final_loss = loss.item()

overfit_ratio = initial_loss / final_loss if final_loss > 0 else float('inf')

if overfit_ratio > 10:

print(f" 合格: Lossが{initial_loss:.4f}から{final_loss:.4f}に減少(比率: {overfit_ratio:.1f}x)")

else:

print(f" 警告: 単一バッチを過学習できません(比率: {overfit_ratio:.1f}x)")

print(" -> モデル容量、学習率、データエラーを確認してください")

print("\nチェックリスト完了!")

このガイドで解説した技術を体系的に適用することで、ディープラーニング学習中に発生するほとんどの問題を素早く診断・解決することができます。デバッグは経験とともに速くなりますが、適切なツールと方法論を持つことが最も重要です。

현재 단락 (1/930)

ディープラーニングモデルの学習では、予期しない失敗に頻繁に遭遇します。Lossが突然NaNになる、いくら待ってもモデルが収束しない、メモリ不足エラーが発生する——これらはすべてのディープラーニング開発...

작성 글자: 0원문 글자: 27,249작성 단락: 0/930