Skip to content

Split View: 딥러닝 디버깅 완전 가이드: 학습 실패 진단부터 성능 최적화까지

|

딥러닝 디버깅 완전 가이드: 학습 실패 진단부터 성능 최적화까지

딥러닝 모델을 학습시키다 보면 예상치 못한 실패를 자주 경험하게 됩니다. Loss가 갑자기 NaN이 되거나, 아무리 기다려도 모델이 수렴하지 않거나, GPU 메모리가 부족하다는 오류가 뜨는 상황은 딥러닝 개발자라면 누구나 겪어본 경험입니다. 이 가이드는 딥러닝 학습 과정에서 발생할 수 있는 모든 주요 문제를 체계적으로 진단하고 해결하는 방법을 실전 코드와 함께 제공합니다.

1. 딥러닝 학습의 흔한 실패 패턴

딥러닝 학습 실패는 크게 세 가지 범주로 나눌 수 있습니다.

Loss가 감소하지 않는 경우

훈련을 시작했는데 Loss가 전혀 줄어들지 않거나, 초기값에서 거의 변화가 없는 경우입니다. 가장 흔한 원인은 학습률이 너무 낮거나, 모델 구현에 버그가 있거나, 데이터 전처리에 문제가 있는 경우입니다.

Loss가 NaN이 되는 경우

Loss가 갑자기 NaN(Not a Number) 또는 Inf(Infinity)로 바뀌는 경우입니다. 이는 수치 불안정성(Numerical Instability) 때문에 발생하며, 보통 학습률이 너무 높거나 데이터에 이상값이 포함되어 있을 때 나타납니다.

훈련 Loss는 줄지만 검증 Loss는 증가하는 경우

과적합(Overfitting) 현상입니다. 모델이 훈련 데이터를 암기하고 있지만, 새로운 데이터에 일반화하지 못하는 상태입니다.

체크리스트 기반 진단 프레임워크

문제를 빠르게 진단하기 위한 체크리스트입니다:

def diagnose_training(model, train_loader, val_loader, optimizer, loss_fn, device):
    """
    학습 시작 전 빠른 진단을 수행하는 함수
    """
    print("=== 딥러닝 학습 진단 체크리스트 ===\n")

    # 1. 데이터 검증
    print("[1] 데이터 검증 중...")
    batch = next(iter(train_loader))
    X, y = batch
    print(f"  입력 형태: {X.shape}")
    print(f"  레이블 형태: {y.shape}")
    print(f"  입력 범위: [{X.min():.4f}, {X.max():.4f}]")
    print(f"  입력에 NaN 존재: {torch.isnan(X).any()}")
    print(f"  입력에 Inf 존재: {torch.isinf(X).any()}")

    # 2. 모델 파라미터 검증
    print("\n[2] 모델 파라미터 검증 중...")
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"  전체 파라미터: {total_params:,}")
    print(f"  학습 가능 파라미터: {trainable_params:,}")

    # 3. 순전파 테스트
    print("\n[3] 순전파 테스트 중...")
    model.eval()
    with torch.no_grad():
        try:
            output = model(X.to(device))
            print(f"  출력 형태: {output.shape}")
            print(f"  출력에 NaN 존재: {torch.isnan(output).any()}")
            loss = loss_fn(output, y.to(device))
            print(f"  초기 Loss: {loss.item():.4f}")
        except Exception as e:
            print(f"  순전파 실패: {e}")

    # 4. 역전파 테스트
    print("\n[4] 역전파 테스트 중...")
    model.train()
    optimizer.zero_grad()
    output = model(X.to(device))
    loss = loss_fn(output, y.to(device))
    loss.backward()

    # 그래디언트 검증
    grad_norms = []
    for name, param in model.named_parameters():
        if param.grad is not None:
            grad_norms.append((name, param.grad.norm().item()))

    if grad_norms:
        print("  레이어별 그래디언트 노름 (상위 5개):")
        for name, norm in sorted(grad_norms, key=lambda x: x[1], reverse=True)[:5]:
            print(f"    {name}: {norm:.6f}")

    print("\n진단 완료!")

2. 손실(Loss) 문제 진단

NaN Loss 원인과 해결책

NaN Loss는 딥러닝에서 가장 좌절스러운 문제 중 하나입니다. 여러 원인이 있으며, 각각 다른 접근 방식으로 해결해야 합니다.

너무 높은 학습률

가장 흔한 NaN Loss 원인입니다. 학습률이 너무 높으면 파라미터 업데이트 크기가 지나치게 커져서 Loss가 폭발합니다.

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt


def find_learning_rate(model, train_loader, loss_fn, device,
                        start_lr=1e-7, end_lr=10, num_iter=100):
    """
    LR Range Test를 통해 최적 학습률 범위를 찾습니다.
    """
    optimizer = optim.SGD(model.parameters(), lr=start_lr)

    lr_multiplier = (end_lr / start_lr) ** (1 / num_iter)

    lrs = []
    losses = []
    best_loss = float('inf')

    model.train()
    data_iter = iter(train_loader)

    for i in range(num_iter):
        try:
            X, y = next(data_iter)
        except StopIteration:
            data_iter = iter(train_loader)
            X, y = next(data_iter)

        X, y = X.to(device), y.to(device)

        optimizer.zero_grad()
        output = model(X)
        loss = loss_fn(output, y)

        if torch.isnan(loss) or loss.item() > best_loss * 4:
            print(f"Loss 폭발 감지 at lr={optimizer.param_groups[0]['lr']:.2e}")
            break

        if loss.item() < best_loss:
            best_loss = loss.item()

        lrs.append(optimizer.param_groups[0]['lr'])
        losses.append(loss.item())

        loss.backward()
        optimizer.step()

        for pg in optimizer.param_groups:
            pg['lr'] *= lr_multiplier

    plt.figure(figsize=(10, 4))
    plt.plot(lrs, losses)
    plt.xscale('log')
    plt.xlabel('Learning Rate')
    plt.ylabel('Loss')
    plt.title('LR Range Test')
    plt.grid(True)
    plt.savefig('lr_range_test.png')
    plt.show()

    return lrs, losses


def safe_training_step(model, X, y, optimizer, loss_fn, scaler=None):
    """
    NaN Loss를 감지하고 스킵하는 안전한 학습 스텝
    """
    optimizer.zero_grad()

    # 입력 검증
    if torch.isnan(X).any() or torch.isinf(X).any():
        print("경고: 입력에 NaN/Inf 존재, 스텝 스킵")
        return None

    if scaler is not None:
        with torch.cuda.amp.autocast():
            output = model(X)
            loss = loss_fn(output, y)

        if torch.isnan(loss) or torch.isinf(loss):
            print(f"경고: Loss가 {loss.item()}, 스텝 스킵")
            return None

        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()
    else:
        output = model(X)
        loss = loss_fn(output, y)

        if torch.isnan(loss) or torch.isinf(loss):
            print(f"경고: Loss가 {loss.item()}, 스텝 스킵")
            return None

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

    return loss.item()

로그(0) 계산 방지

크로스 엔트로피 Loss나 로그 기반 손실 함수에서 log(0)은 -Inf를 반환하여 NaN을 유발합니다.

# 잘못된 방법: log(0) 가능
def bad_cross_entropy(pred, target):
    return -torch.sum(target * torch.log(pred))

# 올바른 방법: eps로 수치 안정성 확보
def safe_cross_entropy(pred, target, eps=1e-8):
    pred = torch.clamp(pred, min=eps, max=1-eps)
    return -torch.sum(target * torch.log(pred))

# 더 좋은 방법: PyTorch 내장 함수 사용 (내부적으로 log-sum-exp 트릭 적용)
loss_fn = nn.CrossEntropyLoss()  # 수치적으로 안정적
log_softmax = nn.LogSoftmax(dim=1)  # log + softmax 결합

# 커스텀 로그 손실 (수치 안정적)
def numerically_stable_log_loss(logits, targets):
    """
    log-sum-exp 트릭을 활용한 수치 안정적 크로스 엔트로피
    """
    # F.cross_entropy는 내부적으로 이 트릭을 적용합니다
    import torch.nn.functional as F
    return F.cross_entropy(logits, targets)

torch.autograd.set_detect_anomaly 활용

import torch

# 이상 감지 모드 활성화 (개발/디버깅 시에만 사용)
# 성능이 느려지므로 프로덕션에서는 비활성화
with torch.autograd.detect_anomaly():
    output = model(X)
    loss = loss_fn(output, y)
    loss.backward()  # 여기서 NaN/Inf 발생 시 정확한 위치 출력

# 또는 전역으로 설정
torch.autograd.set_detect_anomaly(True)

# 학습 루프에 적용 예제
def train_with_anomaly_detection(model, loader, optimizer, loss_fn, device, epochs=5):
    model.train()
    for epoch in range(epochs):
        for batch_idx, (X, y) in enumerate(loader):
            X, y = X.to(device), y.to(device)

            with torch.autograd.detect_anomaly():
                optimizer.zero_grad()
                output = model(X)
                loss = loss_fn(output, y)

                if torch.isnan(loss):
                    print(f"NaN Loss at epoch {epoch}, batch {batch_idx}")
                    # 입력 상태 출력
                    print(f"Input stats: mean={X.mean():.4f}, std={X.std():.4f}")
                    print(f"Output stats: mean={output.mean():.4f}, std={output.std():.4f}")
                    break

                loss.backward()
                optimizer.step()

3. 그래디언트 문제 해결

그래디언트 소실(Vanishing Gradient) 진단

그래디언트 소실은 깊은 네트워크에서 역전파 시 그래디언트가 초기 레이어로 갈수록 극도로 작아지는 현상입니다.

import torch
import torch.nn as nn
import matplotlib.pyplot as plt


def check_gradient_flow(model):
    """
    각 레이어의 그래디언트 크기를 시각화하여 소실/폭발 진단
    """
    ave_grads = []
    max_grads = []
    layers = []

    for name, param in model.named_parameters():
        if param.requires_grad and param.grad is not None:
            layers.append(name)
            ave_grads.append(param.grad.abs().mean().item())
            max_grads.append(param.grad.abs().max().item())

    plt.figure(figsize=(12, 6))
    plt.bar(range(len(ave_grads)), ave_grads, alpha=0.5, label='평균 그래디언트')
    plt.bar(range(len(max_grads)), max_grads, alpha=0.5, label='최대 그래디언트')
    plt.xticks(range(len(layers)), layers, rotation=90)
    plt.xlabel("레이어")
    plt.ylabel("그래디언트 크기")
    plt.title("레이어별 그래디언트 흐름")
    plt.legend()
    plt.yscale('log')
    plt.tight_layout()
    plt.savefig('gradient_flow.png')

    # 소실 감지
    for name, avg_grad in zip(layers, ave_grads):
        if avg_grad < 1e-6:
            print(f"경고: {name} 레이어의 그래디언트 소실 가능 (avg={avg_grad:.2e})")

    return layers, ave_grads, max_grads


def register_gradient_hooks(model):
    """
    그래디언트 훅을 등록하여 실시간 모니터링
    """
    gradient_stats = {}

    def make_hook(name):
        def hook(grad):
            gradient_stats[name] = {
                'mean': grad.abs().mean().item(),
                'max': grad.abs().max().item(),
                'std': grad.std().item(),
                'has_nan': torch.isnan(grad).any().item(),
                'has_inf': torch.isinf(grad).any().item()
            }
            if torch.isnan(grad).any():
                print(f"NaN 그래디언트 감지: {name}")
            return grad
        return hook

    hooks = []
    for name, param in model.named_parameters():
        if param.requires_grad:
            hook = param.register_hook(make_hook(name))
            hooks.append(hook)

    return gradient_stats, hooks


# 그래디언트 소실 해결: He 초기화 + BatchNorm + Residual Connection
class ResidualBlock(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.block = nn.Sequential(
            nn.Linear(dim, dim),
            nn.BatchNorm1d(dim),
            nn.ReLU(),
            nn.Linear(dim, dim),
            nn.BatchNorm1d(dim)
        )
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.relu(self.block(x) + x)  # 잔차 연결


# 그래디언트 소실 해결: Xavier/He 초기화
def init_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        if m.bias is not None:
            nn.init.zeros_(m.bias)
    elif isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        if m.bias is not None:
            nn.init.zeros_(m.bias)
    elif isinstance(m, nn.BatchNorm2d):
        nn.init.ones_(m.weight)
        nn.init.zeros_(m.bias)

# model.apply(init_weights) 로 적용

그래디언트 폭발(Exploding Gradient) 해결

그래디언트 클리핑(Gradient Clipping)은 그래디언트 폭발을 방지하는 가장 효과적인 방법입니다.

import torch
import torch.nn as nn


def train_with_gradient_clipping(model, loader, optimizer, loss_fn, device,
                                   max_norm=1.0, epochs=10):
    """
    그래디언트 클리핑을 적용한 안전한 학습 루프
    """
    model.train()
    history = {'train_loss': [], 'grad_norm': []}

    for epoch in range(epochs):
        epoch_loss = 0
        epoch_grad_norms = []

        for X, y in loader:
            X, y = X.to(device), y.to(device)

            optimizer.zero_grad()
            output = model(X)
            loss = loss_fn(output, y)
            loss.backward()

            # 그래디언트 노름 계산 (클리핑 전)
            total_norm = 0
            for p in model.parameters():
                if p.grad is not None:
                    param_norm = p.grad.data.norm(2)
                    total_norm += param_norm.item() ** 2
            total_norm = total_norm ** 0.5
            epoch_grad_norms.append(total_norm)

            # 그래디언트 클리핑 적용
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=max_norm)

            # 또는 값 기반 클리핑
            # torch.nn.utils.clip_grad_value_(model.parameters(), clip_value=0.5)

            optimizer.step()
            epoch_loss += loss.item()

        avg_loss = epoch_loss / len(loader)
        avg_grad_norm = sum(epoch_grad_norms) / len(epoch_grad_norms)

        history['train_loss'].append(avg_loss)
        history['grad_norm'].append(avg_grad_norm)

        print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}, Grad Norm={avg_grad_norm:.4f}")

        if avg_grad_norm > max_norm * 10:
            print(f"경고: 그래디언트 노름이 매우 큽니다 ({avg_grad_norm:.4f}). 학습률을 낮추세요.")

    return history

4. 과적합(Overfitting) 해결

과적합 진단

import matplotlib.pyplot as plt
import numpy as np


def plot_learning_curves(train_losses, val_losses, train_accs=None, val_accs=None):
    """
    학습/검증 Loss와 정확도 곡선으로 과적합 진단
    """
    fig, axes = plt.subplots(1, 2 if train_accs else 1, figsize=(14, 5))

    if not isinstance(axes, np.ndarray):
        axes = [axes]

    # Loss 곡선
    axes[0].plot(train_losses, label='Train Loss', color='blue')
    axes[0].plot(val_losses, label='Val Loss', color='red', linestyle='--')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].set_title('학습/검증 Loss')
    axes[0].legend()
    axes[0].grid(True)

    # 과적합 시점 감지
    min_val_idx = np.argmin(val_losses)
    axes[0].axvline(x=min_val_idx, color='green', linestyle=':', label=f'최적 epoch: {min_val_idx}')
    axes[0].legend()

    # 정확도 곡선 (있을 경우)
    if train_accs and val_accs:
        axes[1].plot(train_accs, label='Train Acc', color='blue')
        axes[1].plot(val_accs, label='Val Acc', color='red', linestyle='--')
        axes[1].set_xlabel('Epoch')
        axes[1].set_ylabel('Accuracy')
        axes[1].set_title('학습/검증 정확도')
        axes[1].legend()
        axes[1].grid(True)

    # 과적합 갭 계산
    final_gap = val_losses[-1] - train_losses[-1]
    print(f"최종 과적합 갭 (Val-Train Loss): {final_gap:.4f}")
    if final_gap > 0.1:
        print("경고: 과적합이 심합니다!")

    plt.tight_layout()
    plt.savefig('learning_curves.png')
    plt.show()

조기 종료(Early Stopping) 구현

class EarlyStopping:
    """
    검증 Loss를 모니터링하여 과적합 시 학습을 조기 종료
    """
    def __init__(self, patience=10, min_delta=0.001, restore_best=True, verbose=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best = restore_best
        self.verbose = verbose

        self.best_loss = float('inf')
        self.best_epoch = 0
        self.counter = 0
        self.best_weights = None
        self.stopped_epoch = 0

    def __call__(self, val_loss, model, epoch):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.best_epoch = epoch
            self.counter = 0
            if self.restore_best:
                import copy
                self.best_weights = copy.deepcopy(model.state_dict())
            if self.verbose:
                print(f"검증 Loss 개선: {val_loss:.6f} (epoch {epoch})")
            return False  # 계속 학습
        else:
            self.counter += 1
            if self.verbose:
                print(f"EarlyStopping counter: {self.counter}/{self.patience}")
            if self.counter >= self.patience:
                self.stopped_epoch = epoch
                if self.restore_best and self.best_weights:
                    model.load_state_dict(self.best_weights)
                    print(f"최적 가중치 복원 (epoch {self.best_epoch})")
                return True  # 학습 중지
        return False


def train_with_regularization(model, train_loader, val_loader,
                               optimizer, loss_fn, device, epochs=100):
    """
    다양한 정규화 기법을 적용한 학습 루프
    """
    early_stopping = EarlyStopping(patience=15, min_delta=0.001)

    # 학습률 스케줄러
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=5, verbose=True
    )

    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        # 학습
        model.train()
        train_loss = 0
        for X, y in train_loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = loss_fn(output, y)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item()

        # 검증
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for X, y in val_loader:
                X, y = X.to(device), y.to(device)
                output = model(X)
                val_loss += loss_fn(output, y).item()

        train_loss /= len(train_loader)
        val_loss /= len(val_loader)

        train_losses.append(train_loss)
        val_losses.append(val_loss)

        scheduler.step(val_loss)

        print(f"Epoch {epoch+1}: Train={train_loss:.4f}, Val={val_loss:.4f}")

        if early_stopping(val_loss, model, epoch):
            print(f"조기 종료 at epoch {epoch+1}")
            break

    return train_losses, val_losses


# 드롭아웃과 L2 정규화 예제
class RegularizedModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_rate=0.3):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),  # 드롭아웃
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.network(x)

# L2 정규화 (Weight Decay)는 optimizer에서 설정
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=1e-3,
    weight_decay=1e-4  # L2 정규화 강도
)

데이터 증강 전략

import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset


# 이미지 분류용 데이터 증강
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomGrayscale(p=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 검증/테스트용 (증강 없이 정규화만)
val_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Mixup 데이터 증강
def mixup_data(x, y, alpha=0.2, device='cuda'):
    """
    Mixup 증강: 두 샘플을 선형 보간하여 새 샘플 생성
    """
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size(0)
    index = torch.randperm(batch_size).to(device)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]

    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

5. 학습 속도 문제

데이터 로딩 병목 해결

import torch
from torch.utils.data import DataLoader
import time


def profile_dataloader(dataset, batch_size=32, num_workers_list=[0, 2, 4, 8]):
    """
    다양한 num_workers 설정으로 데이터 로딩 속도 비교
    """
    results = {}

    for num_workers in num_workers_list:
        loader = DataLoader(
            dataset,
            batch_size=batch_size,
            num_workers=num_workers,
            pin_memory=True,  # GPU 전송 속도 향상
            prefetch_factor=2 if num_workers > 0 else None,
            persistent_workers=True if num_workers > 0 else False
        )

        start = time.time()
        for i, batch in enumerate(loader):
            if i >= 10:  # 10 배치만 측정
                break
        elapsed = time.time() - start

        results[num_workers] = elapsed
        print(f"num_workers={num_workers}: {elapsed:.3f}초 (10 배치)")

    best_workers = min(results, key=results.get)
    print(f"\n최적 num_workers: {best_workers}")
    return results


# 최적화된 DataLoader 설정
def create_optimized_dataloader(dataset, batch_size, is_train=True):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=is_train,
        num_workers=4,           # CPU 코어 수의 절반 권장
        pin_memory=True,         # CUDA pinned memory로 GPU 전송 가속
        prefetch_factor=2,       # 미리 로드할 배치 수
        persistent_workers=True, # worker 프로세스 유지 (재생성 오버헤드 제거)
        drop_last=is_train       # 불완전한 마지막 배치 제거
    )

혼합 정밀도(Mixed Precision) 학습

import torch
from torch.cuda.amp import autocast, GradScaler


def train_mixed_precision(model, loader, optimizer, loss_fn, device, epochs=10):
    """
    FP16 혼합 정밀도 학습으로 속도 2-3배 향상
    """
    scaler = GradScaler()
    model.train()

    for epoch in range(epochs):
        for X, y in loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()

            # autocast 컨텍스트에서 FP16으로 순전파
            with autocast(device_type='cuda', dtype=torch.float16):
                output = model(X)
                loss = loss_fn(output, y)

            # FP16 그래디언트를 FP32로 스케일 업
            scaler.scale(loss).backward()

            # 그래디언트 클리핑 (스케일 고려)
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            # 파라미터 업데이트
            scaler.step(optimizer)
            scaler.update()

        print(f"Epoch {epoch+1} 완료, scaler scale: {scaler.get_scale()}")

torch.compile 적용

import torch

# PyTorch 2.0+ torch.compile로 속도 향상
# 첫 실행 시 컴파일 시간이 걸리지만 이후 빠름

model = MyModel().to(device)

# 기본 컴파일
compiled_model = torch.compile(model)

# 최대 성능 모드 (더 긴 컴파일 시간)
compiled_model = torch.compile(model, mode='max-autotune')

# 크기가 자주 바뀌는 입력에 적합
compiled_model = torch.compile(model, dynamic=True)

# 학습 시간 비교
import time

def benchmark_model(model, inputs, n_iters=100):
    # 워밍업
    for _ in range(10):
        _ = model(inputs)

    torch.cuda.synchronize()
    start = time.time()
    for _ in range(n_iters):
        _ = model(inputs)
    torch.cuda.synchronize()
    elapsed = time.time() - start

    return elapsed / n_iters

6. 메모리 부족(OOM) 해결

GPU 메모리 분석

import torch
import gc


def print_gpu_memory_summary(device=0):
    """
    GPU 메모리 사용 현황 상세 출력
    """
    if not torch.cuda.is_available():
        print("CUDA를 사용할 수 없습니다.")
        return

    print(f"=== GPU {device} 메모리 요약 ===")
    print(f"총 메모리: {torch.cuda.get_device_properties(device).total_memory / 1e9:.2f} GB")
    print(f"예약된 메모리: {torch.cuda.memory_reserved(device) / 1e9:.2f} GB")
    print(f"사용 중인 메모리: {torch.cuda.memory_allocated(device) / 1e9:.2f} GB")
    print(f"캐시된 메모리: {(torch.cuda.memory_reserved(device) - torch.cuda.memory_allocated(device)) / 1e9:.2f} GB")
    print()
    print(torch.cuda.memory_summary(device=device, abbreviated=False))


def find_memory_leaks(model, loader, device):
    """
    메모리 누수 감지 유틸리티
    """
    import tracemalloc

    initial_memory = torch.cuda.memory_allocated(device)

    for i, (X, y) in enumerate(loader):
        X, y = X.to(device), y.to(device)
        output = model(X)

        current_memory = torch.cuda.memory_allocated(device)
        diff = (current_memory - initial_memory) / 1e6

        if i % 10 == 0:
            print(f"Batch {i}: GPU Memory Delta = {diff:.2f} MB")

        if i >= 50:
            break

    # 메모리 정리
    del X, y, output
    gc.collect()
    torch.cuda.empty_cache()


def clear_gpu_memory():
    """
    GPU 메모리 캐시 정리
    """
    gc.collect()
    torch.cuda.empty_cache()
    torch.cuda.synchronize()
    print(f"정리 후 GPU 메모리: {torch.cuda.memory_allocated() / 1e9:.2f} GB")

Gradient Checkpointing 구현

import torch
import torch.nn as nn
from torch.utils.checkpoint import checkpoint, checkpoint_sequential


class MemoryEfficientModel(nn.Module):
    """
    Gradient Checkpointing을 적용한 메모리 효율적 모델
    메모리를 절약하는 대신 재계산으로 약간의 속도 손실 발생
    """
    def __init__(self, layers):
        super().__init__()
        self.layers = nn.ModuleList(layers)

    def forward(self, x):
        # checkpoint_sequential: 연속된 레이어에 자동으로 checkpointing 적용
        # segments: 몇 개의 청크로 나눌지 (많을수록 메모리 절약, 속도 감소)
        return checkpoint_sequential(self.layers, segments=4, input=x)

    def forward_with_manual_checkpoints(self, x):
        # 특정 레이어에만 선택적으로 적용
        x = self.layers[0](x)  # 첫 레이어는 그냥 실행
        for layer in self.layers[1:-1]:
            x = checkpoint(layer, x)  # 중간 레이어에 checkpointing
        x = self.layers[-1](x)  # 마지막 레이어는 그냥 실행
        return x


# Transformer에서 Gradient Checkpointing 활성화
from transformers import AutoModelForCausalLM

model = AutoModelForCausalLM.from_pretrained("gpt2")
model.gradient_checkpointing_enable()  # Hugging Face 모델에서 간단히 활성화

배치 크기 자동 탐색

def find_optimal_batch_size(model, loss_fn, device,
                              start_batch=8, max_batch=512):
    """
    OOM 없이 사용 가능한 최대 배치 크기를 찾습니다
    """
    batch_size = start_batch
    optimal_batch_size = start_batch

    while batch_size <= max_batch:
        try:
            # 더미 데이터로 테스트
            dummy_input = torch.randn(batch_size, 3, 224, 224).to(device)
            dummy_target = torch.randint(0, 1000, (batch_size,)).to(device)

            output = model(dummy_input)
            loss = loss_fn(output, dummy_target)
            loss.backward()

            optimal_batch_size = batch_size
            print(f"배치 크기 {batch_size}: 성공")

            batch_size *= 2

            # 메모리 정리
            del dummy_input, dummy_target, output, loss
            torch.cuda.empty_cache()

        except RuntimeError as e:
            if "out of memory" in str(e):
                print(f"배치 크기 {batch_size}: OOM")
                torch.cuda.empty_cache()
                break
            else:
                raise e

    print(f"\n권장 배치 크기: {optimal_batch_size} (안전 마진 포함: {optimal_batch_size // 2})")
    return optimal_batch_size

7. 데이터 파이프라인 디버깅

데이터 샘플 시각화

import matplotlib.pyplot as plt
import numpy as np
import torch
from collections import Counter


def visualize_batch(loader, num_samples=16, class_names=None):
    """
    데이터 배치 샘플을 시각화하여 전처리 결과 확인
    """
    X, y = next(iter(loader))

    fig, axes = plt.subplots(4, 4, figsize=(12, 12))
    axes = axes.flatten()

    for i in range(min(num_samples, len(X))):
        img = X[i].numpy()

        # 정규화 역변환 (ImageNet 기준)
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        img = std[:, None, None] * img + mean[:, None, None]
        img = np.clip(img, 0, 1)

        axes[i].imshow(img.transpose(1, 2, 0))
        label = y[i].item()
        title = class_names[label] if class_names else f"Label: {label}"
        axes[i].set_title(title)
        axes[i].axis('off')

    plt.tight_layout()
    plt.savefig('data_samples.png')
    plt.show()


def check_label_distribution(dataset):
    """
    레이블 분포 확인 - 클래스 불균형 감지
    """
    labels = [dataset[i][1] for i in range(len(dataset))]
    counter = Counter(labels)

    classes = sorted(counter.keys())
    counts = [counter[c] for c in classes]
    total = sum(counts)

    print("레이블 분포:")
    for cls, count in zip(classes, counts):
        pct = count / total * 100
        bar = '#' * int(pct / 2)
        print(f"  클래스 {cls}: {count:5d} ({pct:.1f}%) {bar}")

    # 불균형 비율 계산
    max_count = max(counts)
    min_count = min(counts)
    imbalance_ratio = max_count / min_count

    if imbalance_ratio > 10:
        print(f"\n경고: 클래스 불균형이 심합니다! (비율: {imbalance_ratio:.1f}:1)")
        print("해결책: 가중 샘플링 또는 클래스 가중 손실 함수 사용을 권장합니다.")

    return counter


def create_weighted_sampler(dataset):
    """
    클래스 불균형 해결을 위한 가중 샘플러 생성
    """
    labels = [dataset[i][1] for i in range(len(dataset))]
    class_counts = Counter(labels)

    # 각 클래스의 샘플링 가중치 계산
    weights = [1.0 / class_counts[label] for label in labels]
    weights = torch.DoubleTensor(weights)

    sampler = torch.utils.data.WeightedRandomSampler(
        weights=weights,
        num_samples=len(weights),
        replacement=True
    )
    return sampler


def check_normalization(loader, expected_mean=None, expected_std=None):
    """
    데이터 정규화 값 검증
    """
    all_data = []
    for X, _ in loader:
        all_data.append(X)
        if len(all_data) >= 10:  # 10 배치만 샘플링
            break

    all_data = torch.cat(all_data, dim=0)
    actual_mean = all_data.mean(dim=[0, 2, 3]) if all_data.dim() == 4 else all_data.mean()
    actual_std = all_data.std(dim=[0, 2, 3]) if all_data.dim() == 4 else all_data.std()

    print(f"실제 평균: {actual_mean.tolist()}")
    print(f"실제 표준편차: {actual_std.tolist()}")

    if expected_mean:
        mean_diff = abs(actual_mean - torch.tensor(expected_mean)).max().item()
        print(f"기대 평균과의 차이: {mean_diff:.4f}")
        if mean_diff > 0.1:
            print("경고: 정규화 값이 기대치와 다릅니다!")

8. 모델 아키텍처 디버깅

torchinfo로 모델 구조 확인

from torchinfo import summary
import torch
import torch.nn as nn


def analyze_model(model, input_size):
    """
    모델 구조 분석 및 병목 레이어 파악
    """
    # 기본 요약 (모양, 파라미터, 메모리)
    model_stats = summary(
        model,
        input_size=input_size,
        col_names=["input_size", "output_size", "num_params", "kernel_size",
                   "mult_adds"],
        verbose=1
    )

    # 레이어별 파라미터 수 확인
    print("\n레이어별 파라미터 분포:")
    for name, module in model.named_modules():
        num_params = sum(p.numel() for p in module.parameters(recurse=False))
        if num_params > 0:
            print(f"  {name}: {num_params:,} 파라미터")

    return model_stats


def monitor_activations(model, X):
    """
    중간 활성화 값 모니터링으로 죽은 뉴런(Dead Neurons) 감지
    """
    activations = {}

    def make_activation_hook(name):
        def hook(module, input, output):
            activations[name] = output.detach()
        return hook

    # 훅 등록
    hooks = []
    for name, module in model.named_modules():
        if isinstance(module, (nn.ReLU, nn.GELU, nn.Tanh, nn.Sigmoid)):
            hook = module.register_forward_hook(make_activation_hook(name))
            hooks.append(hook)

    # 순전파
    with torch.no_grad():
        model(X)

    # 활성화 값 분석
    print("\n활성화 통계:")
    for name, act in activations.items():
        dead_neurons = (act == 0).float().mean().item()
        print(f"  {name}:")
        print(f"    평균: {act.mean():.4f}, 표준편차: {act.std():.4f}")
        print(f"    죽은 뉴런 비율: {dead_neurons:.2%}")
        if dead_neurons > 0.5:
            print(f"    경고: 뉴런의 {dead_neurons:.0%}가 비활성화됨!")

    # 훅 제거
    for hook in hooks:
        hook.remove()

    return activations


def visualize_weight_distribution(model):
    """
    가중치 분포 시각화로 초기화 문제 감지
    """
    import matplotlib.pyplot as plt

    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()

    linear_layers = [(name, m) for name, m in model.named_modules()
                     if isinstance(m, (nn.Linear, nn.Conv2d))]

    for i, (name, layer) in enumerate(linear_layers[:6]):
        if i >= len(axes):
            break
        weight_data = layer.weight.data.cpu().numpy().flatten()
        axes[i].hist(weight_data, bins=50, color='blue', alpha=0.7)
        axes[i].set_title(f"{name}\n(mean={weight_data.mean():.4f}, std={weight_data.std():.4f})")
        axes[i].set_xlabel('가중치 값')
        axes[i].set_ylabel('빈도')
        axes[i].grid(True, alpha=0.3)

    plt.suptitle("레이어별 가중치 분포")
    plt.tight_layout()
    plt.savefig('weight_distribution.png')
    plt.show()

9. 학습 과정 모니터링

TensorBoard 활용

from torch.utils.tensorboard import SummaryWriter
import torch
import numpy as np


class TensorBoardLogger:
    def __init__(self, log_dir='runs/experiment'):
        self.writer = SummaryWriter(log_dir)
        self.step = 0

    def log_scalars(self, metrics: dict, epoch: int):
        """스칼라 메트릭 로깅"""
        for name, value in metrics.items():
            self.writer.add_scalar(name, value, epoch)

    def log_model_gradients(self, model, epoch: int):
        """그래디언트 히스토그램 로깅"""
        for name, param in model.named_parameters():
            if param.grad is not None:
                self.writer.add_histogram(f'gradients/{name}',
                                           param.grad, epoch)
                self.writer.add_histogram(f'weights/{name}',
                                           param.data, epoch)

    def log_images(self, images: torch.Tensor, tag: str, epoch: int, n=8):
        """이미지 배치 로깅"""
        self.writer.add_images(tag, images[:n], epoch)

    def log_learning_rate(self, optimizer, epoch: int):
        """학습률 로깅"""
        for i, pg in enumerate(optimizer.param_groups):
            self.writer.add_scalar(f'lr/group_{i}', pg['lr'], epoch)

    def log_confusion_matrix(self, cm, class_names, epoch: int):
        """혼동 행렬 로깅"""
        import matplotlib.pyplot as plt
        import seaborn as sns

        fig, ax = plt.subplots(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                    xticklabels=class_names, yticklabels=class_names)
        ax.set_xlabel('예측')
        ax.set_ylabel('실제')
        ax.set_title('혼동 행렬')

        self.writer.add_figure('confusion_matrix', fig, epoch)
        plt.close()

    def close(self):
        self.writer.close()


def train_with_tensorboard(model, train_loader, val_loader,
                             optimizer, loss_fn, device, epochs=50):
    logger = TensorBoardLogger(log_dir='runs/debug_session')

    for epoch in range(epochs):
        # 학습
        model.train()
        train_loss, train_correct = 0, 0
        for X, y in train_loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = loss_fn(output, y)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_correct += (output.argmax(1) == y).sum().item()

        train_loss /= len(train_loader)
        train_acc = train_correct / len(train_loader.dataset)

        # 검증
        model.eval()
        val_loss, val_correct = 0, 0
        with torch.no_grad():
            for X, y in val_loader:
                X, y = X.to(device), y.to(device)
                output = model(X)
                val_loss += loss_fn(output, y).item()
                val_correct += (output.argmax(1) == y).sum().item()

        val_loss /= len(val_loader)
        val_acc = val_correct / len(val_loader.dataset)

        # TensorBoard 로깅
        logger.log_scalars({
            'Loss/Train': train_loss,
            'Loss/Val': val_loss,
            'Accuracy/Train': train_acc,
            'Accuracy/Val': val_acc,
        }, epoch)
        logger.log_model_gradients(model, epoch)
        logger.log_learning_rate(optimizer, epoch)

    logger.close()
    print("TensorBoard: tensorboard --logdir=runs 명령으로 실행")

Weights & Biases (W&B) 실험 추적

import wandb
import torch


def train_with_wandb(model, train_loader, val_loader, optimizer, loss_fn,
                      device, config=None):
    """
    W&B를 활용한 실험 추적
    """
    if config is None:
        config = {
            'learning_rate': 1e-3,
            'batch_size': 32,
            'epochs': 50,
            'optimizer': 'AdamW',
            'weight_decay': 1e-4,
            'model': model.__class__.__name__
        }

    # W&B 초기화
    run = wandb.init(
        project="deep-learning-debug",
        config=config,
        tags=["debugging", "experiment"]
    )

    # 모델 감시 (그래디언트, 파라미터 자동 로깅)
    wandb.watch(model, log='all', log_freq=100)

    for epoch in range(config['epochs']):
        model.train()
        train_loss = 0

        for batch_idx, (X, y) in enumerate(train_loader):
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = loss_fn(output, y)
            loss.backward()

            # 그래디언트 클리핑
            grad_norm = torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

            train_loss += loss.item()

            # 배치 레벨 로깅
            if batch_idx % 10 == 0:
                wandb.log({
                    'batch/loss': loss.item(),
                    'batch/grad_norm': grad_norm.item(),
                    'batch': epoch * len(train_loader) + batch_idx
                })

        # 에포크 레벨 로깅
        wandb.log({
            'epoch/train_loss': train_loss / len(train_loader),
            'epoch/learning_rate': optimizer.param_groups[0]['lr'],
            'epoch': epoch
        })

    run.finish()

10. 재현성(Reproducibility) 확보

import random
import numpy as np
import torch
import os


def set_seed(seed: int = 42):
    """
    완전한 재현성을 위한 시드 고정
    모든 난수 생성기에 동일한 시드 설정
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # 멀티 GPU

    # cuDNN 결정론적 모드
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False  # 성능 약간 저하

    # PyTorch 결정론적 연산
    torch.use_deterministic_algorithms(True)

    # 환경 변수 설정
    os.environ['PYTHONHASHSEED'] = str(seed)
    os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'

    print(f"시드 {seed}로 모든 난수 생성기 초기화 완료")


def save_experiment_config(config: dict, save_path: str = 'experiment_config.json'):
    """
    실험 환경 전체 기록
    """
    import json
    import subprocess

    full_config = config.copy()

    # Python 및 패키지 버전 기록
    full_config['environment'] = {
        'python': subprocess.getoutput('python --version'),
        'torch': torch.__version__,
        'cuda': torch.version.cuda,
        'cudnn': str(torch.backends.cudnn.version()),
        'gpu': torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'
    }

    # Git 커밋 해시 기록
    try:
        full_config['git_hash'] = subprocess.getoutput('git rev-parse HEAD')
    except Exception:
        full_config['git_hash'] = 'unknown'

    with open(save_path, 'w', encoding='utf-8') as f:
        json.dump(full_config, f, indent=2, ensure_ascii=False)

    print(f"실험 설정 저장: {save_path}")
    return full_config


# 재현성 테스트
def test_reproducibility(model_fn, train_fn, seed=42, n_runs=3):
    """
    동일 시드로 여러 번 실행하여 재현성 검증
    """
    results = []

    for run in range(n_runs):
        set_seed(seed)
        model = model_fn()
        loss = train_fn(model)
        results.append(loss)
        print(f"Run {run+1}: Final Loss = {loss:.6f}")

    max_diff = max(results) - min(results)
    print(f"\n최대 차이: {max_diff:.8f}")

    if max_diff < 1e-5:
        print("재현성 검증 통과!")
    else:
        print("경고: 재현성 문제가 있습니다.")

    return results

11. 분산 학습(Distributed Training) 디버깅

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP


def setup_distributed(rank, world_size, backend='nccl'):
    """
    분산 학습 환경 초기화
    """
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    dist.init_process_group(
        backend=backend,
        rank=rank,
        world_size=world_size
    )

    torch.cuda.set_device(rank)
    print(f"프로세스 {rank}/{world_size} 초기화 완료")


def cleanup_distributed():
    dist.destroy_process_group()


def debug_ddp_training(rank, world_size, model, dataset):
    """
    DDP 학습 디버깅 예제
    """
    setup_distributed(rank, world_size)

    device = torch.device(f'cuda:{rank}')
    model = model.to(device)

    # DDP 래핑
    model = DDP(model, device_ids=[rank], find_unused_parameters=True)

    # 분산 샘플러 (랭크별로 다른 데이터 할당)
    sampler = torch.utils.data.distributed.DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )

    loader = torch.utils.data.DataLoader(
        dataset,
        batch_size=32,
        sampler=sampler,
        num_workers=4
    )

    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
    loss_fn = torch.nn.CrossEntropyLoss()

    for epoch in range(10):
        # 중요: 에포크마다 샘플러 시드 갱신
        sampler.set_epoch(epoch)

        for X, y in loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = loss_fn(output, y)
            loss.backward()
            optimizer.step()

        # 랭크 0에서만 로깅
        if rank == 0:
            print(f"Epoch {epoch+1} 완료")

            # 모델 저장도 랭크 0에서만
            torch.save(model.module.state_dict(), f'checkpoint_epoch{epoch}.pt')

        # 모든 랭크 동기화
        dist.barrier()

    cleanup_distributed()


def check_gradient_sync(model):
    """
    DDP에서 그래디언트 동기화 검증
    """
    for name, param in model.named_parameters():
        if param.grad is not None:
            # 모든 랭크에서 그래디언트 합계 수집
            grad_sum = param.grad.data.clone()
            dist.all_reduce(grad_sum, op=dist.ReduceOp.SUM)

            # 랭크별 그래디언트가 동기화되었는지 확인
            world_size = dist.get_world_size()
            expected = grad_sum / world_size

            if not torch.allclose(param.grad.data, expected, atol=1e-5):
                print(f"경고: {name} 레이어 그래디언트 동기화 불일치!")

12. MLflow와 실험 관리

import mlflow
import mlflow.pytorch
import torch
import optuna


def train_with_mlflow(model, train_loader, val_loader, optimizer,
                       loss_fn, device, params: dict):
    """
    MLflow로 실험 추적 및 모델 버전 관리
    """
    mlflow.set_tracking_uri("http://localhost:5000")
    mlflow.set_experiment("deep-learning-debug")

    with mlflow.start_run():
        # 하이퍼파라미터 기록
        mlflow.log_params(params)

        best_val_loss = float('inf')

        for epoch in range(params['epochs']):
            # 학습
            model.train()
            train_loss = 0
            for X, y in train_loader:
                X, y = X.to(device), y.to(device)
                optimizer.zero_grad()
                output = model(X)
                loss = loss_fn(output, y)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            train_loss /= len(train_loader)

            # 검증
            model.eval()
            val_loss = 0
            with torch.no_grad():
                for X, y in val_loader:
                    X, y = X.to(device), y.to(device)
                    output = model(X)
                    val_loss += loss_fn(output, y).item()
            val_loss /= len(val_loader)

            # 메트릭 기록
            mlflow.log_metrics({
                'train_loss': train_loss,
                'val_loss': val_loss
            }, step=epoch)

            # 최적 모델 저장
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                mlflow.pytorch.log_model(model, "best_model")

        # 최종 메트릭
        mlflow.log_metric("best_val_loss", best_val_loss)

    return best_val_loss


def hyperparameter_optimization_with_optuna(model_fn, train_loader,
                                              val_loader, device, n_trials=50):
    """
    Optuna로 하이퍼파라미터 최적화
    """
    def objective(trial):
        # 탐색할 하이퍼파라미터 범위 정의
        lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
        weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True)
        dropout = trial.suggest_float('dropout', 0.0, 0.5)
        batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])

        model = model_fn(dropout=dropout).to(device)
        optimizer = torch.optim.AdamW(model.parameters(),
                                       lr=lr, weight_decay=weight_decay)
        loss_fn = torch.nn.CrossEntropyLoss()

        # 짧은 학습으로 빠른 평가
        val_loss = train_with_mlflow(
            model, train_loader, val_loader, optimizer, loss_fn, device,
            params={'lr': lr, 'weight_decay': weight_decay,
                    'dropout': dropout, 'batch_size': batch_size, 'epochs': 10}
        )

        return val_loss

    study = optuna.create_study(
        direction='minimize',
        sampler=optuna.samplers.TPESampler(seed=42),
        pruner=optuna.pruners.MedianPruner()
    )

    study.optimize(objective, n_trials=n_trials)

    print("\n최적 하이퍼파라미터:")
    for key, value in study.best_params.items():
        print(f"  {key}: {value}")
    print(f"최적 검증 Loss: {study.best_value:.4f}")

    return study.best_params

마무리: 딥러닝 디버깅 워크플로우

딥러닝 디버깅은 체계적인 접근이 중요합니다. 다음 순서로 문제를 진단하세요:

  1. 데이터 확인부터 시작: 모든 문제의 80%는 데이터에서 시작됩니다. NaN, 잘못된 레이블, 정규화 오류를 먼저 확인하세요.

  2. 작은 것부터 확인: 풀 배치 학습 전에 단일 배치로 오버피팅이 가능한지 먼저 테스트하세요.

  3. 그래디언트 확인: 손실 함수 이후 그래디언트가 정상적으로 흐르는지 체크하세요.

  4. 모니터링 도구 활용: TensorBoard, W&B, MLflow 중 하나를 선택하여 모든 실험을 추적하세요.

  5. 재현성 확보: 시드 고정 없이는 디버깅이 매우 어렵습니다. 항상 시드를 설정하세요.

# 딥러닝 디버깅의 최소 체크리스트
def minimum_debug_checklist(model, train_loader, device):
    """
    학습 시작 전 반드시 확인할 최소 체크리스트
    """
    print("딥러닝 학습 전 체크리스트")
    print("=" * 50)

    # 1. 단일 배치 오버피팅 테스트
    print("[1] 단일 배치 오버피팅 테스트...")
    X, y = next(iter(train_loader))
    X, y = X.to(device), y.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = torch.nn.CrossEntropyLoss()

    initial_loss = None
    for step in range(100):
        optimizer.zero_grad()
        output = model(X)
        loss = loss_fn(output, y)
        if initial_loss is None:
            initial_loss = loss.item()
        loss.backward()
        optimizer.step()

    final_loss = loss.item()
    overfit_ratio = initial_loss / final_loss if final_loss > 0 else float('inf')

    if overfit_ratio > 10:
        print(f"  통과: Loss가 {initial_loss:.4f}에서 {final_loss:.4f}로 감소 (비율: {overfit_ratio:.1f}x)")
    else:
        print(f"  경고: 단일 배치에서 오버피팅이 안됨 (비율: {overfit_ratio:.1f}x)")
        print("  → 모델 용량, 학습률, 데이터 오류를 확인하세요")

    print("\n체크리스트 완료!")

이 가이드에서 다룬 기법들을 체계적으로 적용하면, 딥러닝 학습 과정에서 발생하는 대부분의 문제를 신속하게 진단하고 해결할 수 있습니다. 디버깅은 경험이 쌓일수록 빨라지지만, 올바른 도구와 방법론을 갖추는 것이 무엇보다 중요합니다.

Deep Learning Debugging Complete Guide: From Diagnosing Training Failures to Performance Optimization

When training deep learning models, you will frequently encounter unexpected failures. Loss suddenly becoming NaN, models refusing to converge no matter how long you wait, or out-of-memory errors are experiences every deep learning developer has faced. This guide provides systematic methods for diagnosing and resolving all major issues that arise during deep learning training, complete with practical code examples.

1. Common Deep Learning Training Failure Patterns

Deep learning training failures can be broadly categorized into three types.

Loss Not Decreasing

Training starts but Loss barely decreases or stays close to its initial value. The most common causes are learning rate too low, bugs in model implementation, or issues in data preprocessing.

Loss Becoming NaN

Loss suddenly changes to NaN (Not a Number) or Inf (Infinity). This occurs due to numerical instability, typically when the learning rate is too high or the data contains outliers.

Training Loss Decreasing But Validation Loss Increasing

This is overfitting. The model is memorizing training data but failing to generalize to new data.

Checklist-Based Diagnostic Framework

def diagnose_training(model, train_loader, val_loader, optimizer, loss_fn, device):
    """
    Quick diagnostic function to run before training begins
    """
    print("=== Deep Learning Training Diagnostic Checklist ===\n")

    # 1. Data validation
    print("[1] Validating data...")
    batch = next(iter(train_loader))
    X, y = batch
    print(f"  Input shape: {X.shape}")
    print(f"  Label shape: {y.shape}")
    print(f"  Input range: [{X.min():.4f}, {X.max():.4f}]")
    print(f"  Input has NaN: {torch.isnan(X).any()}")
    print(f"  Input has Inf: {torch.isinf(X).any()}")

    # 2. Model parameter validation
    print("\n[2] Validating model parameters...")
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"  Total parameters: {total_params:,}")
    print(f"  Trainable parameters: {trainable_params:,}")

    # 3. Forward pass test
    print("\n[3] Testing forward pass...")
    model.eval()
    with torch.no_grad():
        try:
            output = model(X.to(device))
            print(f"  Output shape: {output.shape}")
            print(f"  Output has NaN: {torch.isnan(output).any()}")
            loss = loss_fn(output, y.to(device))
            print(f"  Initial Loss: {loss.item():.4f}")
        except Exception as e:
            print(f"  Forward pass failed: {e}")

    # 4. Backward pass test
    print("\n[4] Testing backward pass...")
    model.train()
    optimizer.zero_grad()
    output = model(X.to(device))
    loss = loss_fn(output, y.to(device))
    loss.backward()

    # Gradient validation
    grad_norms = []
    for name, param in model.named_parameters():
        if param.grad is not None:
            grad_norms.append((name, param.grad.norm().item()))

    if grad_norms:
        print("  Top 5 gradient norms by layer:")
        for name, norm in sorted(grad_norms, key=lambda x: x[1], reverse=True)[:5]:
            print(f"    {name}: {norm:.6f}")

    print("\nDiagnosis complete!")

2. Loss Problem Diagnosis

NaN Loss Causes and Solutions

NaN Loss is one of the most frustrating problems in deep learning. Multiple causes exist, each requiring a different approach.

Learning Rate Too High

The most common cause of NaN Loss. When the learning rate is too high, parameter update magnitudes become excessive and Loss explodes.

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt


def find_learning_rate(model, train_loader, loss_fn, device,
                        start_lr=1e-7, end_lr=10, num_iter=100):
    """
    Use LR Range Test to find the optimal learning rate range.
    """
    optimizer = optim.SGD(model.parameters(), lr=start_lr)
    lr_multiplier = (end_lr / start_lr) ** (1 / num_iter)

    lrs = []
    losses = []
    best_loss = float('inf')

    model.train()
    data_iter = iter(train_loader)

    for i in range(num_iter):
        try:
            X, y = next(data_iter)
        except StopIteration:
            data_iter = iter(train_loader)
            X, y = next(data_iter)

        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        output = model(X)
        loss = loss_fn(output, y)

        if torch.isnan(loss) or loss.item() > best_loss * 4:
            print(f"Loss explosion detected at lr={optimizer.param_groups[0]['lr']:.2e}")
            break

        if loss.item() < best_loss:
            best_loss = loss.item()

        lrs.append(optimizer.param_groups[0]['lr'])
        losses.append(loss.item())

        loss.backward()
        optimizer.step()

        for pg in optimizer.param_groups:
            pg['lr'] *= lr_multiplier

    plt.figure(figsize=(10, 4))
    plt.plot(lrs, losses)
    plt.xscale('log')
    plt.xlabel('Learning Rate')
    plt.ylabel('Loss')
    plt.title('LR Range Test')
    plt.grid(True)
    plt.savefig('lr_range_test.png')
    plt.show()

    return lrs, losses


def safe_training_step(model, X, y, optimizer, loss_fn, scaler=None):
    """
    Safe training step that detects and skips NaN Loss
    """
    optimizer.zero_grad()

    if torch.isnan(X).any() or torch.isinf(X).any():
        print("Warning: NaN/Inf in input, skipping step")
        return None

    if scaler is not None:
        with torch.cuda.amp.autocast():
            output = model(X)
            loss = loss_fn(output, y)

        if torch.isnan(loss) or torch.isinf(loss):
            print(f"Warning: Loss is {loss.item()}, skipping step")
            return None

        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()
    else:
        output = model(X)
        loss = loss_fn(output, y)

        if torch.isnan(loss) or torch.isinf(loss):
            print(f"Warning: Loss is {loss.item()}, skipping step")
            return None

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

    return loss.item()

Preventing log(0) Computation

In cross-entropy loss or log-based loss functions, log(0) returns -Inf, causing NaN.

# Wrong: log(0) possible
def bad_cross_entropy(pred, target):
    return -torch.sum(target * torch.log(pred))

# Correct: use eps for numerical stability
def safe_cross_entropy(pred, target, eps=1e-8):
    pred = torch.clamp(pred, min=eps, max=1-eps)
    return -torch.sum(target * torch.log(pred))

# Best: use PyTorch built-ins (internally applies log-sum-exp trick)
loss_fn = nn.CrossEntropyLoss()  # numerically stable
log_softmax = nn.LogSoftmax(dim=1)  # combined log + softmax

def numerically_stable_log_loss(logits, targets):
    import torch.nn.functional as F
    return F.cross_entropy(logits, targets)

Using torch.autograd.set_detect_anomaly

import torch

# Enable anomaly detection mode (development/debugging only)
# Slows performance, disable in production
with torch.autograd.detect_anomaly():
    output = model(X)
    loss = loss_fn(output, y)
    loss.backward()  # Prints exact location when NaN/Inf occurs

torch.autograd.set_detect_anomaly(True)

def train_with_anomaly_detection(model, loader, optimizer, loss_fn, device, epochs=5):
    model.train()
    for epoch in range(epochs):
        for batch_idx, (X, y) in enumerate(loader):
            X, y = X.to(device), y.to(device)

            with torch.autograd.detect_anomaly():
                optimizer.zero_grad()
                output = model(X)
                loss = loss_fn(output, y)

                if torch.isnan(loss):
                    print(f"NaN Loss at epoch {epoch}, batch {batch_idx}")
                    print(f"Input stats: mean={X.mean():.4f}, std={X.std():.4f}")
                    print(f"Output stats: mean={output.mean():.4f}, std={output.std():.4f}")
                    break

                loss.backward()
                optimizer.step()

3. Gradient Problems

Diagnosing Vanishing Gradients

Vanishing gradients occur in deep networks when backpropagation causes gradients to become extremely small as they propagate to earlier layers.

import torch
import torch.nn as nn
import matplotlib.pyplot as plt


def check_gradient_flow(model):
    """
    Visualize gradient magnitudes per layer to diagnose vanishing/exploding gradients
    """
    ave_grads = []
    max_grads = []
    layers = []

    for name, param in model.named_parameters():
        if param.requires_grad and param.grad is not None:
            layers.append(name)
            ave_grads.append(param.grad.abs().mean().item())
            max_grads.append(param.grad.abs().max().item())

    plt.figure(figsize=(12, 6))
    plt.bar(range(len(ave_grads)), ave_grads, alpha=0.5, label='Mean Gradient')
    plt.bar(range(len(max_grads)), max_grads, alpha=0.5, label='Max Gradient')
    plt.xticks(range(len(layers)), layers, rotation=90)
    plt.xlabel("Layer")
    plt.ylabel("Gradient Magnitude")
    plt.title("Gradient Flow by Layer")
    plt.legend()
    plt.yscale('log')
    plt.tight_layout()
    plt.savefig('gradient_flow.png')

    for name, avg_grad in zip(layers, ave_grads):
        if avg_grad < 1e-6:
            print(f"Warning: Vanishing gradient possible in {name} (avg={avg_grad:.2e})")

    return layers, ave_grads, max_grads


def register_gradient_hooks(model):
    """
    Register gradient hooks for real-time monitoring
    """
    gradient_stats = {}

    def make_hook(name):
        def hook(grad):
            gradient_stats[name] = {
                'mean': grad.abs().mean().item(),
                'max': grad.abs().max().item(),
                'std': grad.std().item(),
                'has_nan': torch.isnan(grad).any().item(),
                'has_inf': torch.isinf(grad).any().item()
            }
            if torch.isnan(grad).any():
                print(f"NaN gradient detected: {name}")
            return grad
        return hook

    hooks = []
    for name, param in model.named_parameters():
        if param.requires_grad:
            hook = param.register_hook(make_hook(name))
            hooks.append(hook)

    return gradient_stats, hooks


# Fix vanishing gradients: He initialization + BatchNorm + Residual Connection
class ResidualBlock(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.block = nn.Sequential(
            nn.Linear(dim, dim),
            nn.BatchNorm1d(dim),
            nn.ReLU(),
            nn.Linear(dim, dim),
            nn.BatchNorm1d(dim)
        )
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.relu(self.block(x) + x)  # residual connection


def init_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        if m.bias is not None:
            nn.init.zeros_(m.bias)
    elif isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        if m.bias is not None:
            nn.init.zeros_(m.bias)
    elif isinstance(m, nn.BatchNorm2d):
        nn.init.ones_(m.weight)
        nn.init.zeros_(m.bias)

Solving Exploding Gradients with Gradient Clipping

import torch
import torch.nn as nn


def train_with_gradient_clipping(model, loader, optimizer, loss_fn, device,
                                   max_norm=1.0, epochs=10):
    """
    Safe training loop with gradient clipping
    """
    model.train()
    history = {'train_loss': [], 'grad_norm': []}

    for epoch in range(epochs):
        epoch_loss = 0
        epoch_grad_norms = []

        for X, y in loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = loss_fn(output, y)
            loss.backward()

            # Compute gradient norm before clipping
            total_norm = 0
            for p in model.parameters():
                if p.grad is not None:
                    param_norm = p.grad.data.norm(2)
                    total_norm += param_norm.item() ** 2
            total_norm = total_norm ** 0.5
            epoch_grad_norms.append(total_norm)

            # Apply gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=max_norm)
            optimizer.step()
            epoch_loss += loss.item()

        avg_loss = epoch_loss / len(loader)
        avg_grad_norm = sum(epoch_grad_norms) / len(epoch_grad_norms)

        history['train_loss'].append(avg_loss)
        history['grad_norm'].append(avg_grad_norm)

        print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}, Grad Norm={avg_grad_norm:.4f}")

        if avg_grad_norm > max_norm * 10:
            print(f"Warning: Gradient norm very large ({avg_grad_norm:.4f}). Consider reducing learning rate.")

    return history

4. Solving Overfitting

Diagnosing Overfitting

import matplotlib.pyplot as plt
import numpy as np


def plot_learning_curves(train_losses, val_losses, train_accs=None, val_accs=None):
    """
    Diagnose overfitting using training/validation loss and accuracy curves
    """
    fig, axes = plt.subplots(1, 2 if train_accs else 1, figsize=(14, 5))

    if not isinstance(axes, np.ndarray):
        axes = [axes]

    axes[0].plot(train_losses, label='Train Loss', color='blue')
    axes[0].plot(val_losses, label='Val Loss', color='red', linestyle='--')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].set_title('Training/Validation Loss')
    axes[0].legend()
    axes[0].grid(True)

    min_val_idx = np.argmin(val_losses)
    axes[0].axvline(x=min_val_idx, color='green', linestyle=':', label=f'Best epoch: {min_val_idx}')
    axes[0].legend()

    if train_accs and val_accs:
        axes[1].plot(train_accs, label='Train Acc', color='blue')
        axes[1].plot(val_accs, label='Val Acc', color='red', linestyle='--')
        axes[1].set_xlabel('Epoch')
        axes[1].set_ylabel('Accuracy')
        axes[1].set_title('Training/Validation Accuracy')
        axes[1].legend()
        axes[1].grid(True)

    final_gap = val_losses[-1] - train_losses[-1]
    print(f"Final overfitting gap (Val-Train Loss): {final_gap:.4f}")
    if final_gap > 0.1:
        print("Warning: Severe overfitting detected!")

    plt.tight_layout()
    plt.savefig('learning_curves.png')
    plt.show()

Implementing Early Stopping

class EarlyStopping:
    """
    Monitor validation Loss and stop training early when overfitting
    """
    def __init__(self, patience=10, min_delta=0.001, restore_best=True, verbose=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best = restore_best
        self.verbose = verbose

        self.best_loss = float('inf')
        self.best_epoch = 0
        self.counter = 0
        self.best_weights = None
        self.stopped_epoch = 0

    def __call__(self, val_loss, model, epoch):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.best_epoch = epoch
            self.counter = 0
            if self.restore_best:
                import copy
                self.best_weights = copy.deepcopy(model.state_dict())
            if self.verbose:
                print(f"Validation Loss improved: {val_loss:.6f} (epoch {epoch})")
            return False
        else:
            self.counter += 1
            if self.verbose:
                print(f"EarlyStopping counter: {self.counter}/{self.patience}")
            if self.counter >= self.patience:
                self.stopped_epoch = epoch
                if self.restore_best and self.best_weights:
                    model.load_state_dict(self.best_weights)
                    print(f"Best weights restored (epoch {self.best_epoch})")
                return True
        return False


def train_with_regularization(model, train_loader, val_loader,
                               optimizer, loss_fn, device, epochs=100):
    """
    Training loop with various regularization techniques
    """
    early_stopping = EarlyStopping(patience=15, min_delta=0.001)

    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=5, verbose=True
    )

    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for X, y in train_loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = loss_fn(output, y)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item()

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for X, y in val_loader:
                X, y = X.to(device), y.to(device)
                output = model(X)
                val_loss += loss_fn(output, y).item()

        train_loss /= len(train_loader)
        val_loss /= len(val_loader)

        train_losses.append(train_loss)
        val_losses.append(val_loss)

        scheduler.step(val_loss)
        print(f"Epoch {epoch+1}: Train={train_loss:.4f}, Val={val_loss:.4f}")

        if early_stopping(val_loss, model, epoch):
            print(f"Early stopping at epoch {epoch+1}")
            break

    return train_losses, val_losses


# Dropout + L2 Regularization example
class RegularizedModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_rate=0.3):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.network(x)

# L2 regularization via weight_decay in optimizer
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=1e-3,
    weight_decay=1e-4
)

Data Augmentation Strategies

import torchvision.transforms as transforms


train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomGrayscale(p=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


def mixup_data(x, y, alpha=0.2, device='cuda'):
    """
    Mixup augmentation: create new samples by linearly interpolating two samples
    """
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size(0)
    index = torch.randperm(batch_size).to(device)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]

    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

5. Training Speed Issues

Resolving Data Loading Bottlenecks

import torch
from torch.utils.data import DataLoader
import time


def profile_dataloader(dataset, batch_size=32, num_workers_list=[0, 2, 4, 8]):
    """
    Compare data loading speed across different num_workers settings
    """
    results = {}

    for num_workers in num_workers_list:
        loader = DataLoader(
            dataset,
            batch_size=batch_size,
            num_workers=num_workers,
            pin_memory=True,
            prefetch_factor=2 if num_workers > 0 else None,
            persistent_workers=True if num_workers > 0 else False
        )

        start = time.time()
        for i, batch in enumerate(loader):
            if i >= 10:
                break
        elapsed = time.time() - start

        results[num_workers] = elapsed
        print(f"num_workers={num_workers}: {elapsed:.3f}s (10 batches)")

    best_workers = min(results, key=results.get)
    print(f"\nOptimal num_workers: {best_workers}")
    return results


def create_optimized_dataloader(dataset, batch_size, is_train=True):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=is_train,
        num_workers=4,
        pin_memory=True,
        prefetch_factor=2,
        persistent_workers=True,
        drop_last=is_train
    )

Mixed Precision Training

import torch
from torch.cuda.amp import autocast, GradScaler


def train_mixed_precision(model, loader, optimizer, loss_fn, device, epochs=10):
    """
    FP16 mixed precision training for 2-3x speedup
    """
    scaler = GradScaler()
    model.train()

    for epoch in range(epochs):
        for X, y in loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()

            with autocast(device_type='cuda', dtype=torch.float16):
                output = model(X)
                loss = loss_fn(output, y)

            scaler.scale(loss).backward()
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            scaler.step(optimizer)
            scaler.update()

        print(f"Epoch {epoch+1} complete, scaler scale: {scaler.get_scale()}")

Applying torch.compile

import torch

model = MyModel().to(device)

# Default compilation
compiled_model = torch.compile(model)

# Maximum performance mode (longer compile time)
compiled_model = torch.compile(model, mode='max-autotune')

# For frequently changing input sizes
compiled_model = torch.compile(model, dynamic=True)


def benchmark_model(model, inputs, n_iters=100):
    # Warmup
    for _ in range(10):
        _ = model(inputs)

    torch.cuda.synchronize()
    start = time.time()
    for _ in range(n_iters):
        _ = model(inputs)
    torch.cuda.synchronize()
    elapsed = time.time() - start

    return elapsed / n_iters

6. Out-of-Memory (OOM) Solutions

GPU Memory Analysis

import torch
import gc


def print_gpu_memory_summary(device=0):
    """
    Print detailed GPU memory usage status
    """
    if not torch.cuda.is_available():
        print("CUDA is not available.")
        return

    print(f"=== GPU {device} Memory Summary ===")
    print(f"Total memory: {torch.cuda.get_device_properties(device).total_memory / 1e9:.2f} GB")
    print(f"Reserved memory: {torch.cuda.memory_reserved(device) / 1e9:.2f} GB")
    print(f"Allocated memory: {torch.cuda.memory_allocated(device) / 1e9:.2f} GB")
    print(f"Cached memory: {(torch.cuda.memory_reserved(device) - torch.cuda.memory_allocated(device)) / 1e9:.2f} GB")
    print()
    print(torch.cuda.memory_summary(device=device, abbreviated=False))


def clear_gpu_memory():
    """
    Clear GPU memory cache
    """
    gc.collect()
    torch.cuda.empty_cache()
    torch.cuda.synchronize()
    print(f"GPU memory after cleanup: {torch.cuda.memory_allocated() / 1e9:.2f} GB")

Implementing Gradient Checkpointing

import torch
import torch.nn as nn
from torch.utils.checkpoint import checkpoint, checkpoint_sequential


class MemoryEfficientModel(nn.Module):
    """
    Memory-efficient model using Gradient Checkpointing
    Trades computation time for memory savings through recomputation
    """
    def __init__(self, layers):
        super().__init__()
        self.layers = nn.ModuleList(layers)

    def forward(self, x):
        # checkpoint_sequential: automatically applies checkpointing to sequential layers
        # segments: number of chunks (more = more memory saved, slower)
        return checkpoint_sequential(self.layers, segments=4, input=x)

    def forward_with_manual_checkpoints(self, x):
        x = self.layers[0](x)
        for layer in self.layers[1:-1]:
            x = checkpoint(layer, x)
        x = self.layers[-1](x)
        return x


# Enable Gradient Checkpointing in Transformers
from transformers import AutoModelForCausalLM

model = AutoModelForCausalLM.from_pretrained("gpt2")
model.gradient_checkpointing_enable()  # Simple activation for Hugging Face models
def find_optimal_batch_size(model, loss_fn, device,
                              start_batch=8, max_batch=512):
    """
    Finds the maximum batch size usable without OOM
    """
    batch_size = start_batch
    optimal_batch_size = start_batch

    while batch_size <= max_batch:
        try:
            dummy_input = torch.randn(batch_size, 3, 224, 224).to(device)
            dummy_target = torch.randint(0, 1000, (batch_size,)).to(device)

            output = model(dummy_input)
            loss = loss_fn(output, dummy_target)
            loss.backward()

            optimal_batch_size = batch_size
            print(f"Batch size {batch_size}: Success")
            batch_size *= 2

            del dummy_input, dummy_target, output, loss
            torch.cuda.empty_cache()

        except RuntimeError as e:
            if "out of memory" in str(e):
                print(f"Batch size {batch_size}: OOM")
                torch.cuda.empty_cache()
                break
            else:
                raise e

    print(f"\nRecommended batch size: {optimal_batch_size} (with safety margin: {optimal_batch_size // 2})")
    return optimal_batch_size

7. Data Pipeline Debugging

Data Sample Visualization

import matplotlib.pyplot as plt
import numpy as np
import torch
from collections import Counter


def visualize_batch(loader, num_samples=16, class_names=None):
    """
    Visualize data batch samples to verify preprocessing results
    """
    X, y = next(iter(loader))

    fig, axes = plt.subplots(4, 4, figsize=(12, 12))
    axes = axes.flatten()

    for i in range(min(num_samples, len(X))):
        img = X[i].numpy()

        # Reverse ImageNet normalization
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        img = std[:, None, None] * img + mean[:, None, None]
        img = np.clip(img, 0, 1)

        axes[i].imshow(img.transpose(1, 2, 0))
        label = y[i].item()
        title = class_names[label] if class_names else f"Label: {label}"
        axes[i].set_title(title)
        axes[i].axis('off')

    plt.tight_layout()
    plt.savefig('data_samples.png')
    plt.show()


def check_label_distribution(dataset):
    """
    Check label distribution to detect class imbalance
    """
    labels = [dataset[i][1] for i in range(len(dataset))]
    counter = Counter(labels)

    classes = sorted(counter.keys())
    counts = [counter[c] for c in classes]
    total = sum(counts)

    print("Label Distribution:")
    for cls, count in zip(classes, counts):
        pct = count / total * 100
        bar = '#' * int(pct / 2)
        print(f"  Class {cls}: {count:5d} ({pct:.1f}%) {bar}")

    max_count = max(counts)
    min_count = min(counts)
    imbalance_ratio = max_count / min_count

    if imbalance_ratio > 10:
        print(f"\nWarning: Severe class imbalance! (ratio: {imbalance_ratio:.1f}:1)")
        print("Solution: Consider weighted sampling or class-weighted loss function.")

    return counter


def create_weighted_sampler(dataset):
    """
    Create weighted sampler to address class imbalance
    """
    labels = [dataset[i][1] for i in range(len(dataset))]
    class_counts = Counter(labels)

    weights = [1.0 / class_counts[label] for label in labels]
    weights = torch.DoubleTensor(weights)

    sampler = torch.utils.data.WeightedRandomSampler(
        weights=weights,
        num_samples=len(weights),
        replacement=True
    )
    return sampler

8. Model Architecture Debugging

Model Structure Analysis with torchinfo

from torchinfo import summary
import torch
import torch.nn as nn


def analyze_model(model, input_size):
    """
    Analyze model structure and identify bottleneck layers
    """
    model_stats = summary(
        model,
        input_size=input_size,
        col_names=["input_size", "output_size", "num_params", "kernel_size",
                   "mult_adds"],
        verbose=1
    )

    print("\nParameter distribution by layer:")
    for name, module in model.named_modules():
        num_params = sum(p.numel() for p in module.parameters(recurse=False))
        if num_params > 0:
            print(f"  {name}: {num_params:,} parameters")

    return model_stats


def monitor_activations(model, X):
    """
    Monitor intermediate activation values to detect dead neurons
    """
    activations = {}

    def make_activation_hook(name):
        def hook(module, input, output):
            activations[name] = output.detach()
        return hook

    hooks = []
    for name, module in model.named_modules():
        if isinstance(module, (nn.ReLU, nn.GELU, nn.Tanh, nn.Sigmoid)):
            hook = module.register_forward_hook(make_activation_hook(name))
            hooks.append(hook)

    with torch.no_grad():
        model(X)

    print("\nActivation statistics:")
    for name, act in activations.items():
        dead_neurons = (act == 0).float().mean().item()
        print(f"  {name}:")
        print(f"    mean: {act.mean():.4f}, std: {act.std():.4f}")
        print(f"    dead neuron ratio: {dead_neurons:.2%}")
        if dead_neurons > 0.5:
            print(f"    Warning: {dead_neurons:.0%} of neurons are inactive!")

    for hook in hooks:
        hook.remove()

    return activations


def visualize_weight_distribution(model):
    """
    Visualize weight distributions to detect initialization issues
    """
    import matplotlib.pyplot as plt

    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()

    linear_layers = [(name, m) for name, m in model.named_modules()
                     if isinstance(m, (nn.Linear, nn.Conv2d))]

    for i, (name, layer) in enumerate(linear_layers[:6]):
        if i >= len(axes):
            break
        weight_data = layer.weight.data.cpu().numpy().flatten()
        axes[i].hist(weight_data, bins=50, color='blue', alpha=0.7)
        axes[i].set_title(f"{name}\n(mean={weight_data.mean():.4f}, std={weight_data.std():.4f})")
        axes[i].set_xlabel('Weight Value')
        axes[i].set_ylabel('Frequency')
        axes[i].grid(True, alpha=0.3)

    plt.suptitle("Weight Distribution by Layer")
    plt.tight_layout()
    plt.savefig('weight_distribution.png')
    plt.show()

9. Training Monitoring

TensorBoard Usage

from torch.utils.tensorboard import SummaryWriter
import torch
import numpy as np


class TensorBoardLogger:
    def __init__(self, log_dir='runs/experiment'):
        self.writer = SummaryWriter(log_dir)

    def log_scalars(self, metrics: dict, epoch: int):
        for name, value in metrics.items():
            self.writer.add_scalar(name, value, epoch)

    def log_model_gradients(self, model, epoch: int):
        for name, param in model.named_parameters():
            if param.grad is not None:
                self.writer.add_histogram(f'gradients/{name}', param.grad, epoch)
                self.writer.add_histogram(f'weights/{name}', param.data, epoch)

    def log_learning_rate(self, optimizer, epoch: int):
        for i, pg in enumerate(optimizer.param_groups):
            self.writer.add_scalar(f'lr/group_{i}', pg['lr'], epoch)

    def close(self):
        self.writer.close()


def train_with_tensorboard(model, train_loader, val_loader,
                             optimizer, loss_fn, device, epochs=50):
    logger = TensorBoardLogger(log_dir='runs/debug_session')

    for epoch in range(epochs):
        model.train()
        train_loss, train_correct = 0, 0
        for X, y in train_loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = loss_fn(output, y)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_correct += (output.argmax(1) == y).sum().item()

        train_loss /= len(train_loader)
        train_acc = train_correct / len(train_loader.dataset)

        model.eval()
        val_loss, val_correct = 0, 0
        with torch.no_grad():
            for X, y in val_loader:
                X, y = X.to(device), y.to(device)
                output = model(X)
                val_loss += loss_fn(output, y).item()
                val_correct += (output.argmax(1) == y).sum().item()

        val_loss /= len(val_loader)
        val_acc = val_correct / len(val_loader.dataset)

        logger.log_scalars({
            'Loss/Train': train_loss,
            'Loss/Val': val_loss,
            'Accuracy/Train': train_acc,
            'Accuracy/Val': val_acc,
        }, epoch)
        logger.log_model_gradients(model, epoch)
        logger.log_learning_rate(optimizer, epoch)

    logger.close()
    print("TensorBoard: run with 'tensorboard --logdir=runs'")

10. Reproducibility

import random
import numpy as np
import torch
import os


def set_seed(seed: int = 42):
    """
    Fix all random number generators for complete reproducibility
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # Multi-GPU

    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

    torch.use_deterministic_algorithms(True)

    os.environ['PYTHONHASHSEED'] = str(seed)
    os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'

    print(f"All random generators initialized with seed {seed}")


def save_experiment_config(config: dict, save_path: str = 'experiment_config.json'):
    """
    Record the complete experiment environment
    """
    import json
    import subprocess

    full_config = config.copy()

    full_config['environment'] = {
        'python': subprocess.getoutput('python --version'),
        'torch': torch.__version__,
        'cuda': torch.version.cuda,
        'cudnn': str(torch.backends.cudnn.version()),
        'gpu': torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'
    }

    try:
        full_config['git_hash'] = subprocess.getoutput('git rev-parse HEAD')
    except Exception:
        full_config['git_hash'] = 'unknown'

    with open(save_path, 'w') as f:
        json.dump(full_config, f, indent=2)

    print(f"Experiment config saved: {save_path}")
    return full_config


def test_reproducibility(model_fn, train_fn, seed=42, n_runs=3):
    """
    Verify reproducibility by running multiple times with same seed
    """
    results = []

    for run in range(n_runs):
        set_seed(seed)
        model = model_fn()
        loss = train_fn(model)
        results.append(loss)
        print(f"Run {run+1}: Final Loss = {loss:.6f}")

    max_diff = max(results) - min(results)
    print(f"\nMax difference: {max_diff:.8f}")

    if max_diff < 1e-5:
        print("Reproducibility check passed!")
    else:
        print("Warning: Reproducibility issues detected.")

    return results

11. Distributed Training Debugging

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import os


def setup_distributed(rank, world_size, backend='nccl'):
    """
    Initialize distributed training environment
    """
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    dist.init_process_group(
        backend=backend,
        rank=rank,
        world_size=world_size
    )

    torch.cuda.set_device(rank)
    print(f"Process {rank}/{world_size} initialized")


def cleanup_distributed():
    dist.destroy_process_group()


def debug_ddp_training(rank, world_size, model, dataset):
    """
    DDP training debugging example
    """
    setup_distributed(rank, world_size)

    device = torch.device(f'cuda:{rank}')
    model = model.to(device)
    model = DDP(model, device_ids=[rank], find_unused_parameters=True)

    sampler = torch.utils.data.distributed.DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )

    loader = torch.utils.data.DataLoader(
        dataset,
        batch_size=32,
        sampler=sampler,
        num_workers=4
    )

    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
    loss_fn = torch.nn.CrossEntropyLoss()

    for epoch in range(10):
        # Critical: update sampler seed each epoch
        sampler.set_epoch(epoch)

        for X, y in loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = loss_fn(output, y)
            loss.backward()
            optimizer.step()

        # Only log from rank 0
        if rank == 0:
            print(f"Epoch {epoch+1} complete")
            torch.save(model.module.state_dict(), f'checkpoint_epoch{epoch}.pt')

        # Synchronize all ranks
        dist.barrier()

    cleanup_distributed()

12. MLflow and Experiment Management

import mlflow
import mlflow.pytorch
import torch
import optuna


def train_with_mlflow(model, train_loader, val_loader, optimizer,
                       loss_fn, device, params: dict):
    """
    Experiment tracking and model version management with MLflow
    """
    mlflow.set_tracking_uri("http://localhost:5000")
    mlflow.set_experiment("deep-learning-debug")

    with mlflow.start_run():
        mlflow.log_params(params)
        best_val_loss = float('inf')

        for epoch in range(params['epochs']):
            model.train()
            train_loss = 0
            for X, y in train_loader:
                X, y = X.to(device), y.to(device)
                optimizer.zero_grad()
                output = model(X)
                loss = loss_fn(output, y)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            train_loss /= len(train_loader)

            model.eval()
            val_loss = 0
            with torch.no_grad():
                for X, y in val_loader:
                    X, y = X.to(device), y.to(device)
                    output = model(X)
                    val_loss += loss_fn(output, y).item()
            val_loss /= len(val_loader)

            mlflow.log_metrics({
                'train_loss': train_loss,
                'val_loss': val_loss
            }, step=epoch)

            if val_loss < best_val_loss:
                best_val_loss = val_loss
                mlflow.pytorch.log_model(model, "best_model")

        mlflow.log_metric("best_val_loss", best_val_loss)

    return best_val_loss


def hyperparameter_optimization_with_optuna(model_fn, train_loader,
                                              val_loader, device, n_trials=50):
    """
    Hyperparameter optimization with Optuna
    """
    def objective(trial):
        lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
        weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True)
        dropout = trial.suggest_float('dropout', 0.0, 0.5)
        batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])

        model = model_fn(dropout=dropout).to(device)
        optimizer = torch.optim.AdamW(model.parameters(),
                                       lr=lr, weight_decay=weight_decay)
        loss_fn = torch.nn.CrossEntropyLoss()

        val_loss = train_with_mlflow(
            model, train_loader, val_loader, optimizer, loss_fn, device,
            params={'lr': lr, 'weight_decay': weight_decay,
                    'dropout': dropout, 'batch_size': batch_size, 'epochs': 10}
        )

        return val_loss

    study = optuna.create_study(
        direction='minimize',
        sampler=optuna.samplers.TPESampler(seed=42),
        pruner=optuna.pruners.MedianPruner()
    )

    study.optimize(objective, n_trials=n_trials)

    print("\nBest hyperparameters:")
    for key, value in study.best_params.items():
        print(f"  {key}: {value}")
    print(f"Best validation Loss: {study.best_value:.4f}")

    return study.best_params

Conclusion: Deep Learning Debugging Workflow

A systematic approach is critical for deep learning debugging. Follow this order to diagnose problems:

  1. Start with data: 80% of problems originate with data. Check for NaN, wrong labels, and normalization errors first.

  2. Start small: Before full-batch training, test whether the model can overfit a single batch.

  3. Check gradients: Verify that gradients flow correctly after the loss function.

  4. Use monitoring tools: Choose one of TensorBoard, W&B, or MLflow and track all experiments.

  5. Ensure reproducibility: Debugging without fixed seeds is extremely difficult. Always set seeds.

def minimum_debug_checklist(model, train_loader, device):
    """
    Minimum checklist to verify before training starts
    """
    print("Deep Learning Pre-Training Checklist")
    print("=" * 50)

    # 1. Single-batch overfitting test
    print("[1] Single-batch overfitting test...")
    X, y = next(iter(train_loader))
    X, y = X.to(device), y.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = torch.nn.CrossEntropyLoss()

    initial_loss = None
    for step in range(100):
        optimizer.zero_grad()
        output = model(X)
        loss = loss_fn(output, y)
        if initial_loss is None:
            initial_loss = loss.item()
        loss.backward()
        optimizer.step()

    final_loss = loss.item()
    overfit_ratio = initial_loss / final_loss if final_loss > 0 else float('inf')

    if overfit_ratio > 10:
        print(f"  Pass: Loss decreased from {initial_loss:.4f} to {final_loss:.4f} (ratio: {overfit_ratio:.1f}x)")
    else:
        print(f"  Warning: Cannot overfit a single batch (ratio: {overfit_ratio:.1f}x)")
        print("  -> Check model capacity, learning rate, and data errors")

    print("\nChecklist complete!")

By systematically applying the techniques covered in this guide, you can quickly diagnose and resolve most issues that arise during deep learning training. Debugging becomes faster with experience, but having the right tools and methodology is paramount.