Skip to content

Split View: 딥러닝 학습 방법론 완전 정복: 최적화부터 분산 학습까지

|

딥러닝 학습 방법론 완전 정복: 최적화부터 분산 학습까지

서론

딥러닝은 지난 10여 년간 컴퓨터 비전, 자연어 처리, 음성 인식, 강화학습 등 거의 모든 AI 분야에서 혁명적인 성과를 이루어냈습니다. 하지만 단순히 신경망 구조를 설계하는 것만으로는 좋은 모델을 만들 수 없습니다. 어떻게 학습시키느냐가 모델의 성능을 결정짓는 핵심 요소입니다.

이 글에서는 딥러닝 모델을 효과적으로 학습시키기 위한 모든 기법을 체계적으로 다룹니다. 기초적인 경사 하강법부터 시작해 최신 옵티마이저, 학습률 스케줄링, 다양한 정규화 기법, 전이학습, 혼합 정밀도 학습, 그리고 대규모 분산 학습까지 실전 코드와 함께 배웁니다.


1. 경사 하강법(Gradient Descent) 기초

1.1 손실 함수(Loss Function)의 개념

딥러닝에서 **손실 함수(Loss Function)**는 모델의 예측값과 실제 정답 사이의 오차를 수치로 표현하는 함수입니다. 모델 학습의 목표는 이 손실값을 최소화하는 파라미터(가중치)를 찾는 것입니다.

손실 함수 L은 모델 파라미터 theta와 데이터 (x, y)에 의존합니다. 수식으로 표현하면 다음과 같습니다.

L(theta) = (1/N) * sum_{i=1}^{N} l(f(x_i; theta), y_i)

여기서 f는 모델 함수, l은 개별 샘플의 손실, N은 데이터 개수입니다.

1.2 경사 하강법 직관적 이해

경사 하강법을 직관적으로 이해하려면 산에서 눈을 감고 내려오는 등산객을 상상하면 됩니다. 등산객은 현재 위치에서 가장 가파른 내리막 방향(기울기의 반대 방향)으로 한 걸음씩 이동합니다. 이를 반복하면 결국 골짜기(최솟값)에 도달하게 됩니다.

수학적으로는 다음 업데이트 규칙을 따릅니다.

theta_{t+1} = theta_t - lr * grad_L(theta_t)

여기서 lr은 학습률(learning rate), grad_L은 손실 함수의 그래디언트입니다.

1.3 Batch GD vs Mini-batch GD vs SGD

Batch Gradient Descent (전체 배치)

  • 전체 데이터셋으로 그래디언트 계산
  • 안정적이지만 메모리 소모가 크고 느림
  • 대규모 데이터셋에서 비실용적

Stochastic Gradient Descent (확률적 경사 하강법, SGD)

  • 샘플 1개로 그래디언트 계산
  • 빠르지만 노이즈가 많아 불안정
  • 온라인 학습에 적합

Mini-batch Gradient Descent (미니배치)

  • 보통 32~512개 샘플로 그래디언트 계산
  • 배치 GD와 SGD의 장점을 결합
  • 실제로 가장 많이 사용되는 방식
import torch
import torch.nn as nn
import numpy as np

# 간단한 선형 회귀로 경사 하강법 구현
class LinearRegression(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.linear = nn.Linear(input_dim, 1)

    def forward(self, x):
        return self.linear(x)

# 데이터 생성
torch.manual_seed(42)
X = torch.randn(1000, 10)
true_w = torch.randn(10, 1)
y = X @ true_w + 0.1 * torch.randn(1000, 1)

# 미니배치 경사 하강법 구현
def train_minibatch(model, X, y, batch_size=32, lr=0.01, epochs=100):
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    criterion = nn.MSELoss()
    losses = []

    N = len(X)
    for epoch in range(epochs):
        # 셔플
        perm = torch.randperm(N)
        X_shuffled = X[perm]
        y_shuffled = y[perm]

        epoch_loss = 0
        for i in range(0, N, batch_size):
            x_batch = X_shuffled[i:i+batch_size]
            y_batch = y_shuffled[i:i+batch_size]

            optimizer.zero_grad()
            pred = model(x_batch)
            loss = criterion(pred, y_batch)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        losses.append(epoch_loss / (N // batch_size))
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Loss = {losses[-1]:.4f}")

    return losses

model = LinearRegression(10)
losses = train_minibatch(model, X, y)

1.4 학습률(Learning Rate)의 중요성

학습률은 딥러닝에서 가장 중요한 하이퍼파라미터 중 하나입니다.

  • 학습률이 너무 크면: 손실값이 발산하거나 최솟값 주변에서 진동
  • 학습률이 너무 작으면: 학습이 매우 느리고, 지역 최솟값에 갇힐 가능성 증가
  • 적절한 학습률: 빠르게 수렴하면서 좋은 최솟값에 도달

일반적으로 0.1, 0.01, 0.001 등의 값에서 시작하며, 네트워크 구조와 데이터에 따라 달라집니다.

1.5 수학적 유도 (편미분, 체인 룰)

신경망에서 역전파(Backpropagation)는 체인 룰(Chain Rule)을 이용해 각 레이어의 그래디언트를 계산합니다.

3레이어 네트워크를 예로 들면 다음과 같습니다.

forward: x -> z1=W1*x -> a1=relu(z1) -> z2=W2*a1 -> output
loss: L = MSE(output, y)

backward (chain rule):
dL/dW2 = dL/d_output * d_output/dz2 * dz2/dW2
dL/dW1 = dL/d_output * ... * da1/dz1 * dz1/dW1
# NumPy로 직접 역전파 구현
import numpy as np

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_deriv(x):
    s = sigmoid(x)
    return s * (1 - s)

class SimpleNet:
    def __init__(self, input_dim, hidden_dim, output_dim):
        # He 초기화
        self.W1 = np.random.randn(input_dim, hidden_dim) * np.sqrt(2/input_dim)
        self.b1 = np.zeros(hidden_dim)
        self.W2 = np.random.randn(hidden_dim, output_dim) * np.sqrt(2/hidden_dim)
        self.b2 = np.zeros(output_dim)

    def forward(self, x):
        self.x = x
        self.z1 = x @ self.W1 + self.b1
        self.a1 = sigmoid(self.z1)
        self.z2 = self.a1 @ self.W2 + self.b2
        return self.z2

    def backward(self, y, lr=0.01):
        N = len(y)
        # 출력층 그래디언트 (MSE loss)
        dL_dz2 = 2 * (self.z2 - y.reshape(-1, 1)) / N

        # W2, b2 그래디언트
        dL_dW2 = self.a1.T @ dL_dz2
        dL_db2 = dL_dz2.sum(axis=0)

        # 은닉층으로 역전파
        dL_da1 = dL_dz2 @ self.W2.T
        dL_dz1 = dL_da1 * sigmoid_deriv(self.z1)

        # W1, b1 그래디언트
        dL_dW1 = self.x.T @ dL_dz1
        dL_db1 = dL_dz1.sum(axis=0)

        # 파라미터 업데이트
        self.W2 -= lr * dL_dW2
        self.b2 -= lr * dL_db2
        self.W1 -= lr * dL_dW1
        self.b1 -= lr * dL_db1

# 테스트
net = SimpleNet(10, 32, 1)
X_np = np.random.randn(100, 10)
y_np = np.random.randn(100)

for i in range(100):
    pred = net.forward(X_np)
    loss = np.mean((pred.flatten() - y_np) ** 2)
    net.backward(y_np)
    if i % 20 == 0:
        print(f"Step {i}: MSE = {loss:.4f}")

2. 고급 옵티마이저(Optimizers)

2.1 Momentum SGD

일반 SGD는 기울기 방향으로만 이동하므로, 좁은 계곡 모양의 손실 지형에서 지그재그 이동을 하게 됩니다. Momentum은 물리학의 관성 개념을 도입해 이전 이동 방향을 기억하게 합니다.

v_t = beta * v_{t-1} + (1 - beta) * grad_t
theta_{t+1} = theta_t - lr * v_t

베타(momentum) 값은 보통 0.9를 사용합니다.

import torch
import torch.nn as nn

# Momentum SGD
optimizer_momentum = torch.optim.SGD(
    model.parameters(),
    lr=0.01,
    momentum=0.9,
    nesterov=False  # Nesterov Momentum 사용 여부
)

# Nesterov Momentum (NAG) - 더 정확한 방향 예측
optimizer_nag = torch.optim.SGD(
    model.parameters(),
    lr=0.01,
    momentum=0.9,
    nesterov=True
)

2.2 Adagrad (적응적 학습률)

Adagrad는 각 파라미터마다 개별 학습률을 적용합니다. 자주 업데이트되는 파라미터는 학습률을 줄이고, 드물게 업데이트되는 파라미터는 학습률을 유지합니다.

G_t = G_{t-1} + grad_t^2
theta_{t+1} = theta_t - (lr / sqrt(G_t + epsilon)) * grad_t

희소(sparse) 데이터에 효과적이지만, G_t가 계속 누적되어 학습률이 0에 수렴하는 문제가 있습니다.

optimizer_adagrad = torch.optim.Adagrad(
    model.parameters(),
    lr=0.01,
    eps=1e-8,
    weight_decay=0
)

2.3 RMSprop

Adagrad의 학습률 소멸 문제를 해결하기 위해 **지수이동평균(Exponential Moving Average)**을 사용합니다.

E[g^2]_t = rho * E[g^2]_{t-1} + (1 - rho) * grad_t^2
theta_{t+1} = theta_t - (lr / sqrt(E[g^2]_t + epsilon)) * grad_t
optimizer_rmsprop = torch.optim.RMSprop(
    model.parameters(),
    lr=0.001,
    alpha=0.99,  # rho (decay factor)
    eps=1e-8,
    momentum=0,
    centered=False
)

2.4 Adam (Adaptive Moment Estimation)

Adam은 Momentum과 RMSprop을 결합한 옵티마이저로, 현재 가장 널리 사용됩니다. 1차 모멘트(평균)와 2차 모멘트(분산)를 모두 추적합니다.

알고리즘 수식은 다음과 같습니다.

m_t = beta1 * m_{t-1} + (1 - beta1) * g_t        # 1모멘트 (편향 보정 전)
v_t = beta2 * v_{t-1} + (1 - beta2) * g_t^2      # 2모멘트 (편향 보정 전)

m_hat = m_t / (1 - beta1^t)                        # 편향 보정
v_hat = v_t / (1 - beta2^t)                        # 편향 보정

theta_{t+1} = theta_t - lr * m_hat / (sqrt(v_hat) + epsilon)

기본 하이퍼파라미터: lr=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8

optimizer_adam = torch.optim.Adam(
    model.parameters(),
    lr=1e-3,
    betas=(0.9, 0.999),
    eps=1e-8,
    weight_decay=0
)

2.5 AdamW (Weight Decay 분리)

표준 Adam에서 L2 정규화는 그래디언트와 결합되어 있어 적응적 학습률의 영향을 받습니다. AdamW는 가중치 감쇠(weight decay)를 파라미터 업데이트에 직접 적용합니다.

theta_{t+1} = theta_t - lr * (m_hat / (sqrt(v_hat) + epsilon) + lambda * theta_t)

Transformer 모델 학습에서 AdamW가 표준이 되었습니다 (BERT, GPT 등).

optimizer_adamw = torch.optim.AdamW(
    model.parameters(),
    lr=1e-4,
    betas=(0.9, 0.999),
    eps=1e-8,
    weight_decay=0.01  # L2 정규화 강도
)

2.6 LARS와 LAMB (대규모 배치 학습)

대규모 배치(수천 개)를 사용할 때 일반 Adam은 성능이 저하됩니다. **LARS(Layer-wise Adaptive Rate Scaling)**와 LAMB는 레이어별로 학습률을 조정합니다.

LARS: lr_l = lr * ||w_l|| / (||g_l|| + lambda * ||w_l||)
LAMB: Adam 업데이트에 레이어별 신뢰 비율 적용
# pip install lars (또는 직접 구현)
# LAMB는 Hugging Face transformers에 포함
from transformers import optimization

# LAMB optimizer (transformers 라이브러리 활용)
# 또는 apex 라이브러리의 FusedLAMB 사용

2.7 Lion Optimizer (2023)

Google Brain이 2023년 발표한 **Lion(EvoLved Sign Momentum)**은 Adam보다 메모리를 적게 사용하면서 경쟁력 있는 성능을 보입니다. 부호(sign)만 사용하므로 업데이트가 항상 같은 크기입니다.

class Lion(torch.optim.Optimizer):
    def __init__(self, params, lr=1e-4, betas=(0.9, 0.99), weight_decay=0.0):
        defaults = dict(lr=lr, betas=betas, weight_decay=weight_decay)
        super().__init__(params, defaults)

    def step(self, closure=None):
        loss = None
        if closure is not None:
            with torch.enable_grad():
                loss = closure()

        for group in self.param_groups:
            for p in group['params']:
                if p.grad is None:
                    continue

                grad = p.grad
                lr = group['lr']
                beta1, beta2 = group['betas']
                wd = group['weight_decay']

                state = self.state[p]
                if len(state) == 0:
                    state['exp_avg'] = torch.zeros_like(p)

                exp_avg = state['exp_avg']

                # Lion 업데이트
                update = exp_avg * beta1 + grad * (1 - beta1)
                p.data.mul_(1 - lr * wd)
                p.data.add_(update.sign_(), alpha=-lr)

                # 모멘텀 업데이트
                exp_avg.mul_(beta2).add_(grad, alpha=1 - beta2)

        return loss

2.8 옵티마이저 비교 실험

import torch
import torch.nn as nn
import matplotlib.pyplot as plt

# 간단한 모델로 옵티마이저 비교
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(2, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.net(x)

def train_and_compare(optimizers_dict, X, y, epochs=200):
    results = {}

    for name, opt_fn in optimizers_dict.items():
        model = MLP()
        optimizer = opt_fn(model.parameters())
        criterion = nn.MSELoss()
        losses = []

        for epoch in range(epochs):
            optimizer.zero_grad()
            pred = model(X)
            loss = criterion(pred, y)
            loss.backward()
            optimizer.step()
            losses.append(loss.item())

        results[name] = losses
        print(f"{name}: Final Loss = {losses[-1]:.4f}")

    return results

# 데이터 생성
X = torch.randn(500, 2)
y = (X[:, 0] * 2 + X[:, 1] * 3 + torch.randn(500) * 0.1).unsqueeze(1)

optimizers = {
    'SGD': lambda p: torch.optim.SGD(p, lr=0.01),
    'SGD+Momentum': lambda p: torch.optim.SGD(p, lr=0.01, momentum=0.9),
    'Adam': lambda p: torch.optim.Adam(p, lr=0.001),
    'AdamW': lambda p: torch.optim.AdamW(p, lr=0.001, weight_decay=0.01),
    'RMSprop': lambda p: torch.optim.RMSprop(p, lr=0.001),
}

results = train_and_compare(optimizers, X, y)

3. 학습률 스케줄링(LR Scheduling)

고정된 학습률은 최적이 아닙니다. 학습률 스케줄링을 통해 학습 과정에서 학습률을 동적으로 조절하면 더 빠른 수렴과 더 좋은 성능을 얻을 수 있습니다.

3.1 Step Decay와 Exponential Decay

import torch
import torch.optim as optim

model = MLP()
optimizer = optim.SGD(model.parameters(), lr=0.1)

# Step Decay: 일정 에폭마다 학습률을 gamma배로 감소
step_scheduler = optim.lr_scheduler.StepLR(
    optimizer,
    step_size=30,   # 30 에폭마다
    gamma=0.1       # 10배 감소
)

# MultiStep Decay: 지정된 에폭에서 감소
multistep_scheduler = optim.lr_scheduler.MultiStepLR(
    optimizer,
    milestones=[30, 60, 80],
    gamma=0.1
)

# Exponential Decay: 매 에폭마다 지수적으로 감소
exp_scheduler = optim.lr_scheduler.ExponentialLR(
    optimizer,
    gamma=0.95  # 매 에폭 5% 감소
)

3.2 Cosine Annealing

Cosine Annealing은 학습률을 코사인 함수를 따라 부드럽게 감소시킵니다. 주기적으로 학습률을 재시작하는 Cosine Annealing with Warm Restarts도 자주 사용됩니다.

# Cosine Annealing
cosine_scheduler = optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=100,      # 주기 (에폭 수)
    eta_min=1e-6    # 최소 학습률
)

# Cosine Annealing with Warm Restarts (SGDR)
cosine_restart = optim.lr_scheduler.CosineAnnealingWarmRestarts(
    optimizer,
    T_0=10,     # 초기 주기
    T_mult=2,   # 주기 배수
    eta_min=1e-6
)

3.3 Warmup + Cosine Schedule

Transformer 모델 학습에서 표준이 된 스케줄입니다. 초기에는 학습률을 선형적으로 증가(워밍업)시키고, 이후 코사인 스케줄로 감소시킵니다.

import math
from torch.optim.lr_scheduler import LambdaLR

def get_cosine_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps, num_cycles=0.5):
    def lr_lambda(current_step):
        # Warmup 구간
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1, num_warmup_steps))

        # Cosine 감소 구간
        progress = float(current_step - num_warmup_steps) / float(
            max(1, num_training_steps - num_warmup_steps)
        )
        return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress)))

    return LambdaLR(optimizer, lr_lambda)

# 사용 예시
optimizer = optim.AdamW(model.parameters(), lr=5e-5)
scheduler = get_cosine_schedule_with_warmup(
    optimizer,
    num_warmup_steps=1000,
    num_training_steps=10000
)

3.4 OneCycleLR

OneCycleLR은 빠른 수렴을 위한 스케줄로, 학습률을 빠르게 올렸다가 내리는 방식을 사용합니다. Leslie Smith의 논문에서 소개되었으며 FastAI에서 대중화되었습니다.

optimizer = optim.SGD(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=0.1,
    steps_per_epoch=len(train_loader),
    epochs=10,
    pct_start=0.3,          # warmup 비율
    anneal_strategy='cos',  # 감소 방식
    div_factor=25.0,        # 초기 lr = max_lr / div_factor
    final_div_factor=1e4    # 최종 lr = max_lr / (div_factor * final_div_factor)
)

# 학습 루프
for epoch in range(10):
    for batch in train_loader:
        optimizer.zero_grad()
        loss = criterion(model(batch[0]), batch[1])
        loss.backward()
        optimizer.step()
        scheduler.step()  # OneCycleLR은 배치마다 호출

3.5 Learning Rate Finder

Learning Rate Finder는 적절한 학습률 범위를 자동으로 찾아주는 기법입니다.

from torch_lr_finder import LRFinder

model = MLP()
optimizer = optim.SGD(model.parameters(), lr=1e-7, weight_decay=1e-2)
criterion = nn.MSELoss()

# LR Finder 실행
lr_finder = LRFinder(model, optimizer, criterion, device="cuda")
lr_finder.range_test(train_loader, end_lr=100, num_iter=100)
lr_finder.plot()  # 손실-학습률 그래프 표시
lr_finder.reset()  # 옵티마이저를 초기 상태로 복원

# 그래프에서 손실이 가장 가파르게 하락하는 지점의 학습률을 선택
# 일반적으로 최솟값의 1/10 ~ 1/3 정도를 사용

4. 손실 함수(Loss Functions)

4.1 회귀 손실 함수

import torch
import torch.nn as nn
import torch.nn.functional as F

# MSE (Mean Squared Error) - 이상치에 민감
mse_loss = nn.MSELoss()

# MAE (Mean Absolute Error) - 이상치에 강건
mae_loss = nn.L1Loss()

# Huber Loss - MSE와 MAE의 절충점
# |y - y_hat| < delta: 0.5 * (y - y_hat)^2
# |y - y_hat| >= delta: delta * (|y - y_hat| - 0.5 * delta)
huber_loss = nn.HuberLoss(delta=1.0)

# 직접 구현
def huber_loss_manual(pred, target, delta=1.0):
    residual = torch.abs(pred - target)
    condition = residual < delta
    squared_loss = 0.5 * residual ** 2
    linear_loss = delta * residual - 0.5 * delta ** 2
    return torch.where(condition, squared_loss, linear_loss).mean()

4.2 분류 손실 함수

# Cross-Entropy Loss (다중 분류)
ce_loss = nn.CrossEntropyLoss()

# Binary Cross-Entropy (이진 분류)
bce_loss = nn.BCEWithLogitsLoss()

# Label Smoothing Cross-Entropy (과적합 방지)
ce_smooth = nn.CrossEntropyLoss(label_smoothing=0.1)

# Focal Loss (클래스 불균형 해결)
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, alpha=None, reduction='mean'):
        super().__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.reduction = reduction

    def forward(self, inputs, targets):
        # inputs: (N, C) logits, targets: (N,) class indices
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)  # p_t = 모델이 정답 클래스에 할당한 확률
        focal_loss = ((1 - pt) ** self.gamma) * ce_loss

        if self.alpha is not None:
            alpha_t = self.alpha[targets]
            focal_loss = alpha_t * focal_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        return focal_loss

4.3 세그멘테이션 손실 함수

# BCE Loss for binary segmentation
def bce_loss(pred, target):
    return F.binary_cross_entropy_with_logits(pred, target)

# Dice Loss (클래스 불균형에 강건)
def dice_loss(pred, target, smooth=1.0):
    pred = torch.sigmoid(pred)
    pred_flat = pred.view(-1)
    target_flat = target.view(-1)

    intersection = (pred_flat * target_flat).sum()
    dice = (2. * intersection + smooth) / (pred_flat.sum() + target_flat.sum() + smooth)
    return 1 - dice

# BCE + Dice 결합 (세그멘테이션에서 자주 사용)
def bce_dice_loss(pred, target, bce_weight=0.5):
    bce = bce_loss(pred, target)
    dice = dice_loss(pred, target)
    return bce_weight * bce + (1 - bce_weight) * dice

4.4 메트릭 학습 손실 함수

# Contrastive Loss (유사한 샘플은 가깝게, 다른 샘플은 멀게)
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        # label=1: 같은 클래스, label=0: 다른 클래스
        euclidean_dist = F.pairwise_distance(output1, output2)
        loss = (label * euclidean_dist.pow(2) +
                (1 - label) * F.relu(self.margin - euclidean_dist).pow(2))
        return loss.mean()

# Triplet Loss (anchor, positive, negative)
class TripletLoss(nn.Module):
    def __init__(self, margin=0.3):
        super().__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        pos_dist = F.pairwise_distance(anchor, positive)
        neg_dist = F.pairwise_distance(anchor, negative)
        loss = F.relu(pos_dist - neg_dist + self.margin)
        return loss.mean()

5. 정규화 기법(Regularization)

과적합(Overfitting)을 방지하고 모델의 일반화 능력을 높이는 기법들입니다.

5.1 L1/L2 정규화

# L2 Regularization (Weight Decay)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)

# L1 Regularization (직접 구현)
def l1_regularization(model, lambda_l1):
    l1_penalty = 0
    for param in model.parameters():
        l1_penalty += torch.abs(param).sum()
    return lambda_l1 * l1_penalty

# L1 + L2 (Elastic Net)
def elastic_net_loss(model, criterion, outputs, targets, lambda_l1=1e-5, lambda_l2=1e-4):
    # 기본 손실
    base_loss = criterion(outputs, targets)

    # L1 패널티
    l1_penalty = sum(torch.abs(p).sum() for p in model.parameters())

    # L2 패널티
    l2_penalty = sum((p ** 2).sum() for p in model.parameters())

    return base_loss + lambda_l1 * l1_penalty + lambda_l2 * l2_penalty

5.2 Dropout

Dropout은 학습 중 무작위로 뉴런을 비활성화하여 공동 적응(co-adaptation)을 방지합니다. Inverted Dropout은 추론 시 스케일링이 필요 없도록 학습 시 p로 나눕니다.

class ModelWithDropout(nn.Module):
    def __init__(self, dropout_rate=0.5):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),  # Inverted Dropout (PyTorch 기본)
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        return self.net(x)

# 학습 모드: dropout 활성화
model.train()

# 추론 모드: dropout 비활성화
model.eval()

# DropConnect (가중치를 무작위로 0으로 설정)
class DropConnect(nn.Module):
    def __init__(self, p=0.5):
        super().__init__()
        self.p = p

    def forward(self, x):
        if not self.training:
            return x
        # 가중치 마스킹은 nn.Linear 레이어 수준에서 적용
        mask = torch.bernoulli(torch.ones_like(x) * (1 - self.p))
        return x * mask / (1 - self.p)

5.3 Data Augmentation

from torchvision import transforms
import torchvision.transforms.functional as TF

# 기본 이미지 증강
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomCrop(32, padding=4),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomRotation(degrees=15),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Mixup Augmentation
def mixup_data(x, y, alpha=1.0):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    index = torch.randperm(batch_size)

    mixed_x = lam * x + (1 - lam) * x[index]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

# CutMix Augmentation
def cutmix_data(x, y, alpha=1.0):
    lam = np.random.beta(alpha, alpha)
    batch_size, C, H, W = x.size()
    index = torch.randperm(batch_size)

    # 랜덤 박스 좌표 계산
    cut_ratio = np.sqrt(1. - lam)
    cut_w = int(W * cut_ratio)
    cut_h = int(H * cut_ratio)

    cx = np.random.randint(W)
    cy = np.random.randint(H)

    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)

    mixed_x = x.clone()
    mixed_x[:, :, bby1:bby2, bbx1:bbx2] = x[index, :, bby1:bby2, bbx1:bbx2]

    # 실제 박스 크기에 맞게 lambda 재계산
    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (W * H))

    return mixed_x, y, y[index], lam

5.4 Early Stopping

class EarlyStopping:
    def __init__(self, patience=10, min_delta=0.001, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.counter = 0
        self.best_loss = None
        self.best_weights = None
        self.early_stop = False

    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.best_weights = {k: v.clone() for k, v in model.state_dict().items()}
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            print(f"EarlyStopping counter: {self.counter}/{self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.best_weights = {k: v.clone() for k, v in model.state_dict().items()}
            self.counter = 0

    def restore(self, model):
        if self.restore_best_weights and self.best_weights:
            model.load_state_dict(self.best_weights)
            print("Restored best model weights")

# 사용 예시
early_stopping = EarlyStopping(patience=10)

for epoch in range(max_epochs):
    train_loss = train_one_epoch(model, train_loader, optimizer)
    val_loss = evaluate(model, val_loader)

    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print("Early stopping triggered!")
        early_stopping.restore(model)
        break

6. 정규화 레이어(Normalization Layers)

6.1 Batch Normalization

**Batch Normalization(배치 정규화)**은 2015년 Sergey Ioffe와 Christian Szegedy가 제안했습니다. 각 미니배치 내에서 특성을 정규화하여 내부 공변량 이동(Internal Covariate Shift) 문제를 해결합니다.

배치 정규화 과정은 다음과 같습니다.

1. 미니배치 평균: mu_B = (1/m) * sum(x_i)
2. 미니배치 분산: sigma_B^2 = (1/m) * sum((x_i - mu_B)^2)
3. 정규화: x_hat_i = (x_i - mu_B) / sqrt(sigma_B^2 + epsilon)
4. 스케일 및 이동: y_i = gamma * x_hat_i + beta

여기서 gamma(스케일)와 beta(이동)는 학습 가능한 파라미터입니다.

import torch
import torch.nn as nn

class BatchNormNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(784, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        return self.net(x)

# Conv 레이어에서 BatchNorm 사용
class ConvBNNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 64, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        return self.relu(self.bn1(self.conv1(x)))

# 직접 구현
class BatchNorm(nn.Module):
    def __init__(self, num_features, eps=1e-5, momentum=0.1):
        super().__init__()
        self.gamma = nn.Parameter(torch.ones(num_features))
        self.beta = nn.Parameter(torch.zeros(num_features))
        self.eps = eps
        self.momentum = momentum

        self.register_buffer('running_mean', torch.zeros(num_features))
        self.register_buffer('running_var', torch.ones(num_features))

    def forward(self, x):
        if self.training:
            mean = x.mean(dim=0)
            var = x.var(dim=0, unbiased=False)

            # 이동 평균 업데이트
            self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean
            self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var
        else:
            mean = self.running_mean
            var = self.running_var

        x_norm = (x - mean) / torch.sqrt(var + self.eps)
        return self.gamma * x_norm + self.beta

6.2 Layer Normalization (Transformer에서 사용)

Layer Normalization은 배치 차원이 아닌 특성 차원에서 정규화합니다. 배치 크기에 독립적이어서 RNN, Transformer에 적합합니다.

class LayerNorm(nn.Module):
    def __init__(self, normalized_shape, eps=1e-5):
        super().__init__()
        if isinstance(normalized_shape, int):
            normalized_shape = (normalized_shape,)
        self.normalized_shape = normalized_shape
        self.gamma = nn.Parameter(torch.ones(normalized_shape))
        self.beta = nn.Parameter(torch.zeros(normalized_shape))
        self.eps = eps

    def forward(self, x):
        # 마지막 len(normalized_shape)개 차원에서 정규화
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        x_norm = (x - mean) / torch.sqrt(var + self.eps)
        return self.gamma * x_norm + self.beta

# PyTorch 내장 LayerNorm
layer_norm = nn.LayerNorm(512)

# Transformer 블록에서 사용
class TransformerBlock(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward):
        super().__init__()
        self.attention = nn.MultiheadAttention(d_model, nhead)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.GELU(),
            nn.Linear(dim_feedforward, d_model)
        )

    def forward(self, x):
        # Pre-LayerNorm (최신 GPT 스타일)
        attn_out, _ = self.attention(self.norm1(x), self.norm1(x), self.norm1(x))
        x = x + attn_out
        x = x + self.ffn(self.norm2(x))
        return x

6.3 Instance, Group, RMS Normalization

# Instance Normalization (각 샘플, 각 채널 독립적으로 정규화)
# 스타일 전이(Style Transfer)에 효과적
instance_norm = nn.InstanceNorm2d(64)

# Group Normalization (채널을 그룹으로 나누어 정규화)
# 배치 크기가 작을 때 BN 대안
group_norm = nn.GroupNorm(num_groups=8, num_channels=64)

# RMS Normalization (LLaMA, T5에서 사용)
# LayerNorm에서 평균 제거, 학습 속도 향상
class RMSNorm(nn.Module):
    def __init__(self, dim, eps=1e-6):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(dim))

    def _norm(self, x):
        return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

    def forward(self, x):
        return self.weight * self._norm(x.float()).type_as(x)

# 정규화 방법 비교 요약
# BatchNorm: 배치 차원 정규화, CNN에 적합, 배치 크기에 의존
# LayerNorm: 특성 차원 정규화, Transformer/RNN에 적합
# InstanceNorm: 채널별 정규화, 스타일 전이에 적합
# GroupNorm: 채널 그룹 정규화, 작은 배치에 적합
# RMSNorm: 경량화된 LayerNorm, LLM에 적합

7. 가중치 초기화(Weight Initialization)

7.1 Xavier/He 초기화

가중치 초기화는 학습의 시작점을 결정합니다. 잘못된 초기화는 기울기 소실이나 폭발을 유발할 수 있습니다.

import torch
import torch.nn as nn
import math

class WeightInitDemo(nn.Module):
    def __init__(self, init_method='xavier'):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.Linear(256, 256) for _ in range(5)
        ])
        self.apply_init(init_method)

    def apply_init(self, method):
        for layer in self.layers:
            if method == 'zeros':
                nn.init.zeros_(layer.weight)  # 나쁜 초기화: 대칭성 문제
            elif method == 'random_small':
                nn.init.normal_(layer.weight, std=0.01)
            elif method == 'xavier_uniform':
                nn.init.xavier_uniform_(layer.weight)  # sigmoid/tanh 활성화에 적합
            elif method == 'xavier_normal':
                nn.init.xavier_normal_(layer.weight)
            elif method == 'kaiming_uniform':
                nn.init.kaiming_uniform_(layer.weight, mode='fan_in', nonlinearity='relu')
            elif method == 'kaiming_normal':
                nn.init.kaiming_normal_(layer.weight, mode='fan_out', nonlinearity='relu')  # ReLU에 적합
            nn.init.zeros_(layer.bias)

    def forward(self, x):
        for layer in self.layers:
            x = torch.relu(layer(x))
        return x

# 초기화 비교 실험
import matplotlib.pyplot as plt

def check_activations(model, x):
    activations = []
    hooks = []

    def hook(module, input, output):
        activations.append(output.detach())

    for layer in model.layers:
        hooks.append(layer.register_forward_hook(hook))

    with torch.no_grad():
        model(x)

    for hook in hooks:
        hook.remove()

    return activations

x = torch.randn(100, 256)
for method in ['zeros', 'random_small', 'xavier_uniform', 'kaiming_normal']:
    model = WeightInitDemo(method)
    acts = check_activations(model, x)
    print(f"{method}:")
    for i, act in enumerate(acts):
        print(f"  Layer {i+1}: mean={act.mean():.4f}, std={act.std():.4f}")

8. 그래디언트 문제 해결

8.1 기울기 소실과 폭발

기울기 소실(Vanishing Gradient): 역전파 시 기울기가 레이어를 거칠수록 0에 가까워져 초기 레이어가 학습되지 않는 문제입니다. sigmoid나 tanh 활성화에서 주로 발생합니다.

기울기 폭발(Exploding Gradient): 기울기가 기하급수적으로 커져 NaN이나 Inf가 발생하는 문제입니다. RNN에서 자주 발생합니다.

# Gradient Clipping
import torch.nn.utils as utils

# 방법 1: 기울기 노름(norm) 클리핑
max_norm = 1.0
total_norm = utils.clip_grad_norm_(model.parameters(), max_norm)
print(f"Gradient norm: {total_norm:.4f}")

# 방법 2: 기울기 값 클리핑
utils.clip_grad_value_(model.parameters(), clip_value=0.5)

# 학습 루프에서의 사용
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

for batch in train_loader:
    optimizer.zero_grad()
    loss = criterion(model(batch[0]), batch[1])
    loss.backward()

    # 역전파 후, 옵티마이저 step 전에 클리핑
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

    optimizer.step()

8.2 Residual Connection (Skip Connection)

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # 차원이 다를 경우 shortcut
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)  # Skip Connection
        out = self.relu(out)
        return out

8.3 Gradient Checkpointing

매우 깊은 모델에서 메모리를 절약하기 위해 일부 활성화를 저장하지 않고, 역전파 시 재계산합니다.

from torch.utils.checkpoint import checkpoint, checkpoint_sequential

class DeepModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(*[
            nn.Sequential(nn.Linear(512, 512), nn.ReLU())
            for _ in range(20)
        ])

    def forward(self, x):
        # 일반: 모든 활성화 저장 (메모리 O(N))
        # return self.layers(x)

        # Gradient Checkpointing: 메모리 O(sqrt(N))
        return checkpoint_sequential(self.layers, segments=5, input=x)

9. 전이학습(Transfer Learning) & 파인튜닝

9.1 Feature Extraction vs Fine-tuning

import torchvision.models as models

# Feature Extraction: 사전학습 가중치 동결
def feature_extraction(num_classes):
    model = models.resnet50(pretrained=True)

    # 모든 파라미터 동결
    for param in model.parameters():
        param.requires_grad = False

    # 마지막 분류 레이어만 교체 (학습 가능)
    model.fc = nn.Linear(model.fc.in_features, num_classes)

    return model

# Fine-tuning: 일부 또는 전체 레이어 학습
def fine_tuning(num_classes, unfreeze_layers=None):
    model = models.resnet50(pretrained=True)

    # 처음에는 모두 동결
    for param in model.parameters():
        param.requires_grad = False

    # 마지막 레이어 교체 및 활성화
    model.fc = nn.Linear(model.fc.in_features, num_classes)

    # 지정된 레이어 활성화
    if unfreeze_layers:
        for name, param in model.named_parameters():
            for layer in unfreeze_layers:
                if layer in name:
                    param.requires_grad = True

    return model

9.2 점진적 레이어 해동 & Discriminative Learning Rates

def progressive_unfreezing_setup(model, base_lr=1e-4):
    # ResNet50의 레이어 그룹
    layer_groups = [
        list(model.layer1.parameters()),
        list(model.layer2.parameters()),
        list(model.layer3.parameters()),
        list(model.layer4.parameters()),
        list(model.fc.parameters())
    ]

    # 처음에는 fc만 학습
    for group in layer_groups[:-1]:
        for p in group:
            p.requires_grad = False

    return layer_groups

def discriminative_lr_optimizer(model, base_lr=1e-4, lr_multiplier=10):
    # 레이어별 다른 학습률 설정 (초기 레이어는 낮은 lr, 후기 레이어는 높은 lr)
    param_groups = [
        {'params': model.layer1.parameters(), 'lr': base_lr / (lr_multiplier**3)},
        {'params': model.layer2.parameters(), 'lr': base_lr / (lr_multiplier**2)},
        {'params': model.layer3.parameters(), 'lr': base_lr / lr_multiplier},
        {'params': model.layer4.parameters(), 'lr': base_lr},
        {'params': model.fc.parameters(), 'lr': base_lr * lr_multiplier},
    ]

    return torch.optim.Adam(param_groups)

9.3 LoRA (Low-Rank Adaptation)

LoRA는 대형 언어모델의 파인튜닝을 위한 파라미터 효율적 기법입니다. 원래 가중치 행렬을 동결하고, 낮은 랭크의 행렬 분해를 학습합니다.

원래 가중치 행렬 W의 크기가 d x k일 때, LoRA는 W' = W + BA를 학습합니다. 여기서 B는 d x r, A는 r x k 행렬이며 랭크 r은 d와 k보다 훨씬 작게 설정됩니다.

import torch
import torch.nn as nn

class LoRALayer(nn.Module):
    def __init__(self, in_features, out_features, rank=4, alpha=1.0):
        super().__init__()
        self.rank = rank
        self.alpha = alpha
        self.scaling = alpha / rank

        # 원래 가중치 (동결)
        self.weight = nn.Parameter(
            torch.randn(out_features, in_features),
            requires_grad=False
        )

        # LoRA 행렬 A (랜덤 초기화)
        self.lora_A = nn.Parameter(torch.randn(rank, in_features) * 0.01)
        # LoRA 행렬 B (0으로 초기화 -> 학습 시작 시 원래 모델과 동일)
        self.lora_B = nn.Parameter(torch.zeros(out_features, rank))
        self.bias = nn.Parameter(torch.zeros(out_features))

    def forward(self, x):
        # 원래 출력 + LoRA 변화량
        base_output = nn.functional.linear(x, self.weight, self.bias)
        lora_output = (x @ self.lora_A.T @ self.lora_B.T) * self.scaling
        return base_output + lora_output

# HuggingFace PEFT 라이브러리 사용 (실제 LLM 파인튜닝)
from peft import get_peft_model, LoraConfig, TaskType

lora_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=8,                          # LoRA 랭크
    lora_alpha=32,                # 스케일링 팩터
    target_modules=["q_proj", "v_proj"],  # 적용할 모듈
    lora_dropout=0.05,
    bias="none"
)

# peft_model = get_peft_model(base_model, lora_config)
# peft_model.print_trainable_parameters()
# trainable params: 4,194,304 || all params: 6,742,609,920 || trainable%: 0.062

10. 하이퍼파라미터 튜닝

10.1 Optuna를 활용한 베이지안 최적화

import optuna
import torch
import torch.nn as nn

def objective(trial):
    # 하이퍼파라미터 탐색 공간 정의
    lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
    n_layers = trial.suggest_int('n_layers', 1, 5)
    n_units = trial.suggest_categorical('n_units', [64, 128, 256, 512])
    dropout_rate = trial.suggest_float('dropout', 0.0, 0.5)
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'AdamW', 'SGD'])

    # 모델 생성
    layers = []
    in_dim = 784
    for _ in range(n_layers):
        layers.extend([
            nn.Linear(in_dim, n_units),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        ])
        in_dim = n_units
    layers.append(nn.Linear(in_dim, 10))
    model = nn.Sequential(*layers)

    # 옵티마이저 선택
    if optimizer_name == 'Adam':
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    elif optimizer_name == 'AdamW':
        optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
    else:
        optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    # 학습 및 검증
    # ... (학습 루프 생략)
    val_accuracy = 0.95  # 실제로는 학습 후 계산

    return val_accuracy

# Optuna 스터디 생성 및 실행
study = optuna.create_study(
    direction='maximize',
    sampler=optuna.samplers.TPESampler(),    # Tree-structured Parzen Estimator
    pruner=optuna.pruners.MedianPruner()     # 성능 나쁜 트라이얼 조기 종료
)

study.optimize(objective, n_trials=100, timeout=3600)

print(f"Best trial: {study.best_trial.value:.4f}")
print(f"Best params: {study.best_trial.params}")

# 결과 시각화
# optuna.visualization.plot_optimization_history(study)
# optuna.visualization.plot_param_importances(study)

11. 혼합 정밀도 학습(Mixed Precision Training)

11.1 FP32 vs FP16 vs BF16

형식지수 비트가수 비트표현 범위주 용도
FP32823±3.4e38기본 학습
FP16510±65504추론/학습 (오버플로우 주의)
BF1687±3.4e38LLM 학습 (A100, TPU)

11.2 PyTorch AMP (Automatic Mixed Precision)

import torch
from torch.cuda.amp import autocast, GradScaler

# GradScaler: FP16 언더플로우 방지를 위한 손실 스케일링
scaler = GradScaler()

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        inputs, labels = inputs.cuda(), labels.cuda()

        optimizer.zero_grad()

        # autocast 컨텍스트에서 FP16 연산
        with autocast(dtype=torch.float16):
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        # 손실 스케일링 후 역전파
        scaler.scale(loss).backward()

        # 그래디언트 클리핑 (스케일 조정 후)
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # 옵티마이저 스텝 (NaN/Inf 그래디언트 자동 건너뜀)
        scaler.step(optimizer)
        scaler.update()

# BF16 사용 (더 안정적, Ampere 이상 GPU 필요)
with autocast(dtype=torch.bfloat16):
    outputs = model(inputs)
    loss = criterion(outputs, labels)

12. 분산 학습(Distributed Training)

12.1 데이터 병렬화 (Data Parallelism)

데이터를 여러 GPU에 분산시켜 각 GPU가 독립적으로 순전파와 역전파를 수행하고, 그래디언트를 집계합니다.

import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

def setup(rank, world_size):
    import os
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group('nccl', rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

def train_ddp(rank, world_size, model_class, dataset):
    setup(rank, world_size)

    # 각 프로세스가 자신의 GPU 사용
    device = torch.device(f'cuda:{rank}')
    model = model_class().to(device)

    # DDP로 감싸기
    model = DDP(model, device_ids=[rank])

    # DistributedSampler: 각 프로세스가 다른 데이터 샘플링
    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )

    loader = torch.utils.data.DataLoader(
        dataset,
        batch_size=32,
        sampler=sampler,
        num_workers=4,
        pin_memory=True
    )

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3 * world_size)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        sampler.set_epoch(epoch)  # 에폭마다 셔플 시드 변경

        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()

            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

        if rank == 0:
            print(f"Epoch {epoch}: Loss = {loss.item():.4f}")

    cleanup()

# 멀티프로세스 실행
import torch.multiprocessing as mp

if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_ddp, args=(world_size, MyModel, dataset), nprocs=world_size, join=True)

12.2 FSDP (Fully Sharded Data Parallel)

FSDP는 모델 파라미터, 그래디언트, 옵티마이저 상태를 모든 GPU에 분산시켜 메모리를 절약합니다. GPT-3급 초대형 모델 학습에 적합합니다.

from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import ShardingStrategy, MixedPrecision
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
import functools

# Mixed Precision 설정
bf16_policy = MixedPrecision(
    param_dtype=torch.bfloat16,
    reduce_dtype=torch.bfloat16,
    buffer_dtype=torch.bfloat16
)

# Transformer 레이어 자동 래핑 정책
auto_wrap_policy = functools.partial(
    transformer_auto_wrap_policy,
    transformer_layer_cls={TransformerBlock}
)

# FSDP 모델 생성
model = FSDP(
    model,
    sharding_strategy=ShardingStrategy.FULL_SHARD,    # 완전 분산
    mixed_precision=bf16_policy,
    auto_wrap_policy=auto_wrap_policy,
    device_id=rank
)

12.3 Gradient Accumulation

GPU 메모리가 부족할 때 작은 배치를 여러 번 사용하여 큰 배치 효과를 냅니다.

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

# 유효 배치 크기 = micro_batch_size * accumulation_steps
micro_batch_size = 8
accumulation_steps = 8  # 유효 배치 크기: 64

optimizer.zero_grad()
for step, (inputs, labels) in enumerate(train_loader):
    inputs, labels = inputs.cuda(), labels.cuda()

    with autocast():
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        # 손실을 accumulation_steps로 나눔 (평균 유지)
        loss = loss / accumulation_steps

    scaler.scale(loss).backward()

    if (step + 1) % accumulation_steps == 0:
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

13. 대규모 언어모델 학습 기법

13.1 Instruction Tuning

Instruction Tuning은 모델이 자연어 지시문을 따르도록 학습시키는 기법입니다. FLAN, InstructGPT, LLaMA-2의 성공에 핵심적인 역할을 했습니다.

# Instruction Tuning 데이터 포맷 예시
instruction_data = [
    {
        "instruction": "다음 텍스트의 감정을 분석하세요.",
        "input": "오늘 날씨가 너무 좋아서 기분이 최고예요!",
        "output": "긍정적인 감정입니다. 날씨에 대한 만족감과 행복감이 표현되어 있습니다."
    },
    {
        "instruction": "주어진 정보를 바탕으로 요약문을 작성하세요.",
        "input": "...(긴 텍스트)...",
        "output": "...(요약)..."
    }
]

# Alpaca 형식으로 프롬프트 구성
def format_instruction(sample):
    if sample.get('input'):
        return f"""### Instruction:
{sample['instruction']}

### Input:
{sample['input']}

### Response:
{sample['output']}"""
    else:
        return f"""### Instruction:
{sample['instruction']}

### Response:
{sample['output']}"""

13.2 RLHF (인간 피드백 강화학습)

RLHF는 세 단계로 이루어집니다.

1단계: SFT(Supervised Fine-tuning) - 인간이 작성한 고품질 응답으로 파인튜닝 2단계: Reward Model 학습 - 여러 응답 중 좋은 응답을 선호하도록 학습 3단계: PPO로 정책 최적화 - Reward Model을 사용해 강화학습

# 단계 2: Reward Model (Bradley-Terry 모델)
class RewardModel(nn.Module):
    def __init__(self, base_model):
        super().__init__()
        self.base_model = base_model
        self.reward_head = nn.Linear(base_model.config.hidden_size, 1)

    def forward(self, input_ids, attention_mask):
        outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
        # 마지막 토큰의 hidden state 사용
        last_hidden = outputs.last_hidden_state[:, -1, :]
        reward = self.reward_head(last_hidden).squeeze(-1)
        return reward

# Reward Model 학습 (선호도 손실)
def preference_loss(reward_chosen, reward_rejected):
    # Bradley-Terry 모델: p(chosen > rejected) = sigmoid(r_chosen - r_rejected)
    return -torch.log(torch.sigmoid(reward_chosen - reward_rejected)).mean()

13.3 DPO (Direct Preference Optimization)

DPO는 RLHF의 복잡한 PPO 학습을 단순화하여, 선호도 데이터를 직접 최적화합니다.

import torch
import torch.nn.functional as F

def dpo_loss(
    policy_chosen_logps,    # 정책 모델의 선호 응답 로그 확률
    policy_rejected_logps,  # 정책 모델의 비선호 응답 로그 확률
    reference_chosen_logps, # 참조 모델의 선호 응답 로그 확률
    reference_rejected_logps, # 참조 모델의 비선호 응답 로그 확률
    beta=0.1                # KL 페널티 강도
):
    # 정책과 참조 모델 간의 log ratio
    chosen_rewards = beta * (policy_chosen_logps - reference_chosen_logps)
    rejected_rewards = beta * (policy_rejected_logps - reference_rejected_logps)

    # DPO 손실: -log(sigmoid(chosen_rewards - rejected_rewards))
    loss = -F.logsigmoid(chosen_rewards - rejected_rewards).mean()

    # 로깅용 보상
    chosen_reward = chosen_rewards.detach().mean()
    rejected_reward = rejected_rewards.detach().mean()
    reward_accuracy = (chosen_rewards > rejected_rewards).float().mean()

    return loss, chosen_reward, rejected_reward, reward_accuracy

14. 실전 학습 파이프라인 완성

14.1 종합 학습 루프

import torch
import torch.nn as nn
from torch.cuda.amp import autocast, GradScaler
import wandb  # 실험 추적

class Trainer:
    def __init__(
        self,
        model,
        train_loader,
        val_loader,
        optimizer,
        scheduler,
        criterion,
        device='cuda',
        use_amp=True,
        grad_clip=1.0,
        accumulation_steps=1,
        log_wandb=False
    ):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.criterion = criterion
        self.device = device
        self.use_amp = use_amp
        self.grad_clip = grad_clip
        self.accumulation_steps = accumulation_steps
        self.scaler = GradScaler() if use_amp else None
        self.log_wandb = log_wandb

        if log_wandb:
            wandb.watch(model, log='all', log_freq=100)

    def train_epoch(self):
        self.model.train()
        total_loss = 0
        self.optimizer.zero_grad()

        for step, (inputs, labels) in enumerate(self.train_loader):
            inputs, labels = inputs.to(self.device), labels.to(self.device)

            if self.use_amp:
                with autocast():
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, labels) / self.accumulation_steps
                self.scaler.scale(loss).backward()
            else:
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels) / self.accumulation_steps
                loss.backward()

            if (step + 1) % self.accumulation_steps == 0:
                if self.use_amp:
                    self.scaler.unscale_(self.optimizer)

                if self.grad_clip:
                    nn.utils.clip_grad_norm_(self.model.parameters(), self.grad_clip)

                if self.use_amp:
                    self.scaler.step(self.optimizer)
                    self.scaler.update()
                else:
                    self.optimizer.step()

                if self.scheduler:
                    self.scheduler.step()

                self.optimizer.zero_grad()

            total_loss += loss.item() * self.accumulation_steps

        return total_loss / len(self.train_loader)

    @torch.no_grad()
    def evaluate(self):
        self.model.eval()
        total_loss = 0
        correct = 0
        total = 0

        for inputs, labels in self.val_loader:
            inputs, labels = inputs.to(self.device), labels.to(self.device)

            with autocast() if self.use_amp else torch.no_grad():
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        return total_loss / len(self.val_loader), 100. * correct / total

    def fit(self, epochs, save_path=None):
        best_val_acc = 0
        early_stopping = EarlyStopping(patience=10)

        for epoch in range(epochs):
            train_loss = self.train_epoch()
            val_loss, val_acc = self.evaluate()

            print(f"Epoch {epoch+1}/{epochs}: "
                  f"Train Loss: {train_loss:.4f}, "
                  f"Val Loss: {val_loss:.4f}, "
                  f"Val Acc: {val_acc:.2f}%")

            if self.log_wandb:
                wandb.log({
                    'train_loss': train_loss,
                    'val_loss': val_loss,
                    'val_acc': val_acc,
                    'lr': self.optimizer.param_groups[0]['lr']
                })

            if val_acc > best_val_acc:
                best_val_acc = val_acc
                if save_path:
                    torch.save(self.model.state_dict(), save_path)

            early_stopping(val_loss, self.model)
            if early_stopping.early_stop:
                print("Early stopping!")
                break

        return best_val_acc

결론 및 베스트 프랙티스

딥러닝 학습에서 고려해야 할 핵심 원칙을 정리하면 다음과 같습니다.

옵티마이저 선택

  • 일반 작업: AdamW (lr=1e-3 ~ 1e-4, weight_decay=0.01)
  • Transformer: AdamW + Warmup + Cosine Schedule
  • 대규모 배치: LAMB 또는 LARS
  • 메모리 제약: Lion

정규화 전략

  • Dropout은 주로 0.1 ~ 0.5 사용
  • 작은 데이터셋: 강한 정규화 (더 큰 weight decay, 더 높은 dropout)
  • 대용량 데이터: 약한 정규화 또는 없음

학습률 스케줄링

  • CNN: OneCycleLR 또는 Step Decay
  • Transformer: Warmup + Cosine 또는 Inverse Square Root

혼합 정밀도

  • 항상 AMP 사용 (속도 1.5~3배, 메모리 2배 절약)
  • A100/H100 이상: BF16 권장
  • 이전 GPU: FP16 + Loss Scaling

분산 학습

  • 다중 GPU 단일 서버: DDP + NCCL
  • 수십억 파라미터 모델: FSDP
  • 항상 Gradient Accumulation으로 유효 배치 크기 키우기

참고 자료

Deep Learning Training Methods Complete Guide: From Optimization to Distributed Training

Introduction

Over the past decade, deep learning has achieved revolutionary results in virtually every AI domain — computer vision, natural language processing, speech recognition, and reinforcement learning. However, simply designing a neural network architecture is not enough to build a high-performing model. How you train it is the decisive factor.

This guide systematically covers every technique for effectively training deep learning models. Starting from the fundamentals of gradient descent, we progress through advanced optimizers, learning rate scheduling, regularization, transfer learning, mixed precision training, and large-scale distributed training — all with practical code examples.


1. Gradient Descent Fundamentals

1.1 Understanding the Loss Function

In deep learning, the loss function quantifies the discrepancy between model predictions and ground-truth labels. The goal of training is to find model parameters (weights) that minimize this loss value.

The loss function L depends on model parameters theta and data (x, y). Expressed mathematically:

L(theta) = (1/N) * sum_{i=1}^{N} l(f(x_i; theta), y_i)

Here f is the model function, l is the per-sample loss, and N is the dataset size.

1.2 Intuitive Understanding of Gradient Descent

A helpful analogy for gradient descent is a hiker descending a mountain with eyes closed. At each step, the hiker moves in the direction of steepest descent (opposite to the gradient). Repeating this process eventually leads to the valley floor (minimum).

Mathematically, the update rule is:

theta_{t+1} = theta_t - lr * grad_L(theta_t)

Here lr is the learning rate and grad_L is the gradient of the loss function.

1.3 Batch GD vs Mini-batch GD vs SGD

Batch Gradient Descent

  • Computes gradients over the entire dataset
  • Stable but memory-intensive and slow
  • Impractical for large datasets

Stochastic Gradient Descent (SGD)

  • Computes gradients from a single sample
  • Fast but noisy and unstable
  • Suitable for online learning

Mini-batch Gradient Descent

  • Typically uses 32–512 samples per gradient computation
  • Combines advantages of both Batch GD and SGD
  • The most widely used approach in practice
import torch
import torch.nn as nn
import numpy as np

# Gradient descent implementation with simple linear regression
class LinearRegression(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.linear = nn.Linear(input_dim, 1)

    def forward(self, x):
        return self.linear(x)

# Mini-batch gradient descent
def train_minibatch(model, X, y, batch_size=32, lr=0.01, epochs=100):
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    criterion = nn.MSELoss()
    losses = []

    N = len(X)
    for epoch in range(epochs):
        perm = torch.randperm(N)
        X_shuffled = X[perm]
        y_shuffled = y[perm]

        epoch_loss = 0
        for i in range(0, N, batch_size):
            x_batch = X_shuffled[i:i+batch_size]
            y_batch = y_shuffled[i:i+batch_size]

            optimizer.zero_grad()
            pred = model(x_batch)
            loss = criterion(pred, y_batch)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        losses.append(epoch_loss / (N // batch_size))
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Loss = {losses[-1]:.4f}")

    return losses

torch.manual_seed(42)
X = torch.randn(1000, 10)
true_w = torch.randn(10, 1)
y = X @ true_w + 0.1 * torch.randn(1000, 1)

model = LinearRegression(10)
losses = train_minibatch(model, X, y)

1.4 The Critical Role of Learning Rate

The learning rate is one of the most important hyperparameters in deep learning.

  • Too large: Loss diverges or oscillates around the minimum
  • Too small: Training is extremely slow and may get stuck in local minima
  • Just right: Fast convergence to a good minimum

Common starting values are 0.1, 0.01, and 0.001, though the optimal value depends on network architecture and data.

1.5 Mathematical Derivation (Partial Derivatives, Chain Rule)

Backpropagation in neural networks uses the chain rule to compute gradients for each layer.

For a 3-layer network:

forward: x -> z1=W1*x -> a1=relu(z1) -> z2=W2*a1 -> output
loss: L = MSE(output, y)

backward (chain rule):
dL/dW2 = dL/d_output * d_output/dz2 * dz2/dW2
dL/dW1 = dL/d_output * ... * da1/dz1 * dz1/dW1
import numpy as np

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_deriv(x):
    s = sigmoid(x)
    return s * (1 - s)

class SimpleNet:
    def __init__(self, input_dim, hidden_dim, output_dim):
        self.W1 = np.random.randn(input_dim, hidden_dim) * np.sqrt(2/input_dim)
        self.b1 = np.zeros(hidden_dim)
        self.W2 = np.random.randn(hidden_dim, output_dim) * np.sqrt(2/hidden_dim)
        self.b2 = np.zeros(output_dim)

    def forward(self, x):
        self.x = x
        self.z1 = x @ self.W1 + self.b1
        self.a1 = sigmoid(self.z1)
        self.z2 = self.a1 @ self.W2 + self.b2
        return self.z2

    def backward(self, y, lr=0.01):
        N = len(y)
        dL_dz2 = 2 * (self.z2 - y.reshape(-1, 1)) / N

        dL_dW2 = self.a1.T @ dL_dz2
        dL_db2 = dL_dz2.sum(axis=0)

        dL_da1 = dL_dz2 @ self.W2.T
        dL_dz1 = dL_da1 * sigmoid_deriv(self.z1)

        dL_dW1 = self.x.T @ dL_dz1
        dL_db1 = dL_dz1.sum(axis=0)

        self.W2 -= lr * dL_dW2
        self.b2 -= lr * dL_db2
        self.W1 -= lr * dL_dW1
        self.b1 -= lr * dL_db1

# Test
net = SimpleNet(10, 32, 1)
X_np = np.random.randn(100, 10)
y_np = np.random.randn(100)

for i in range(100):
    pred = net.forward(X_np)
    loss = np.mean((pred.flatten() - y_np) ** 2)
    net.backward(y_np)
    if i % 20 == 0:
        print(f"Step {i}: MSE = {loss:.4f}")

2. Advanced Optimizers

2.1 Momentum SGD

Plain SGD follows gradients directly, causing zigzag movement in narrow valley-shaped loss landscapes. Momentum introduces the physics concept of inertia, allowing the optimizer to remember previous movement directions.

v_t = beta * v_{t-1} + (1 - beta) * grad_t
theta_{t+1} = theta_t - lr * v_t

The momentum coefficient (beta) is typically set to 0.9.

import torch
import torch.optim as optim

# Momentum SGD
optimizer_momentum = optim.SGD(
    model.parameters(),
    lr=0.01,
    momentum=0.9,
    nesterov=False
)

# Nesterov Accelerated Gradient (NAG) - look-ahead gradient
optimizer_nag = optim.SGD(
    model.parameters(),
    lr=0.01,
    momentum=0.9,
    nesterov=True
)

2.2 Adagrad (Adaptive Learning Rate)

Adagrad applies individual learning rates to each parameter. Frequently updated parameters receive reduced learning rates, while rarely updated ones maintain their rates.

G_t = G_{t-1} + grad_t^2
theta_{t+1} = theta_t - (lr / sqrt(G_t + epsilon)) * grad_t

Effective for sparse data, but G_t accumulates indefinitely, causing the learning rate to shrink toward zero.

optimizer_adagrad = optim.Adagrad(
    model.parameters(),
    lr=0.01,
    eps=1e-8,
    weight_decay=0
)

2.3 RMSprop

RMSprop resolves Adagrad's learning rate decay problem by using an exponential moving average of squared gradients.

E[g^2]_t = rho * E[g^2]_{t-1} + (1 - rho) * grad_t^2
theta_{t+1} = theta_t - (lr / sqrt(E[g^2]_t + epsilon)) * grad_t
optimizer_rmsprop = optim.RMSprop(
    model.parameters(),
    lr=0.001,
    alpha=0.99,
    eps=1e-8,
    momentum=0,
    centered=False
)

2.4 Adam (Adaptive Moment Estimation)

Adam combines Momentum and RMSprop, tracking both first-order moments (mean) and second-order moments (variance). It is currently the most widely used optimizer.

The algorithm:

m_t = beta1 * m_{t-1} + (1 - beta1) * g_t       # 1st moment (before bias correction)
v_t = beta2 * v_{t-1} + (1 - beta2) * g_t^2     # 2nd moment (before bias correction)

m_hat = m_t / (1 - beta1^t)                       # bias correction
v_hat = v_t / (1 - beta2^t)                       # bias correction

theta_{t+1} = theta_t - lr * m_hat / (sqrt(v_hat) + epsilon)

Default hyperparameters: lr=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8

optimizer_adam = optim.Adam(
    model.parameters(),
    lr=1e-3,
    betas=(0.9, 0.999),
    eps=1e-8,
    weight_decay=0
)

2.5 AdamW (Decoupled Weight Decay)

In standard Adam, L2 regularization is coupled with gradients and thus affected by the adaptive learning rate. AdamW applies weight decay directly to parameter updates, decoupled from the gradient-based update.

theta_{t+1} = theta_t - lr * (m_hat / (sqrt(v_hat) + epsilon) + lambda * theta_t)

AdamW has become the standard for training Transformer models (BERT, GPT, etc.).

optimizer_adamw = optim.AdamW(
    model.parameters(),
    lr=1e-4,
    betas=(0.9, 0.999),
    eps=1e-8,
    weight_decay=0.01
)

2.6 LARS and LAMB (Large-Batch Training)

When using very large batch sizes (thousands), standard Adam degrades in performance. LARS (Layer-wise Adaptive Rate Scaling) and LAMB adjust learning rates per layer.

LARS: lr_l = lr * ||w_l|| / (||g_l|| + lambda * ||w_l||)
LAMB: applies a per-layer trust ratio to the Adam update

2.7 Lion Optimizer (2023)

Google Brain's Lion (EvoLved Sign Momentum) uses only the sign of the gradient update, resulting in lower memory usage than Adam while delivering competitive performance.

class Lion(torch.optim.Optimizer):
    def __init__(self, params, lr=1e-4, betas=(0.9, 0.99), weight_decay=0.0):
        defaults = dict(lr=lr, betas=betas, weight_decay=weight_decay)
        super().__init__(params, defaults)

    def step(self, closure=None):
        loss = None
        if closure is not None:
            with torch.enable_grad():
                loss = closure()

        for group in self.param_groups:
            for p in group['params']:
                if p.grad is None:
                    continue

                grad = p.grad
                lr = group['lr']
                beta1, beta2 = group['betas']
                wd = group['weight_decay']

                state = self.state[p]
                if len(state) == 0:
                    state['exp_avg'] = torch.zeros_like(p)

                exp_avg = state['exp_avg']

                # Lion update
                update = exp_avg * beta1 + grad * (1 - beta1)
                p.data.mul_(1 - lr * wd)
                p.data.add_(update.sign_(), alpha=-lr)

                # Momentum update
                exp_avg.mul_(beta2).add_(grad, alpha=1 - beta2)

        return loss

2.8 Optimizer Comparison Experiment

import torch
import torch.nn as nn

class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(2, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.net(x)

def train_and_compare(optimizers_dict, X, y, epochs=200):
    results = {}

    for name, opt_fn in optimizers_dict.items():
        model = MLP()
        optimizer = opt_fn(model.parameters())
        criterion = nn.MSELoss()
        losses = []

        for epoch in range(epochs):
            optimizer.zero_grad()
            pred = model(X)
            loss = criterion(pred, y)
            loss.backward()
            optimizer.step()
            losses.append(loss.item())

        results[name] = losses
        print(f"{name}: Final Loss = {losses[-1]:.4f}")

    return results

X = torch.randn(500, 2)
y = (X[:, 0] * 2 + X[:, 1] * 3 + torch.randn(500) * 0.1).unsqueeze(1)

optimizers = {
    'SGD': lambda p: torch.optim.SGD(p, lr=0.01),
    'SGD+Momentum': lambda p: torch.optim.SGD(p, lr=0.01, momentum=0.9),
    'Adam': lambda p: torch.optim.Adam(p, lr=0.001),
    'AdamW': lambda p: torch.optim.AdamW(p, lr=0.001, weight_decay=0.01),
    'RMSprop': lambda p: torch.optim.RMSprop(p, lr=0.001),
}

results = train_and_compare(optimizers, X, y)

3. Learning Rate Scheduling

A fixed learning rate is rarely optimal. Learning rate scheduling dynamically adjusts the rate during training to achieve faster convergence and better final performance.

3.1 Step and Exponential Decay

import torch
import torch.optim as optim

model = MLP()
optimizer = optim.SGD(model.parameters(), lr=0.1)

# Step Decay: reduce by gamma every step_size epochs
step_scheduler = optim.lr_scheduler.StepLR(
    optimizer,
    step_size=30,
    gamma=0.1
)

# MultiStep Decay: reduce at specified milestones
multistep_scheduler = optim.lr_scheduler.MultiStepLR(
    optimizer,
    milestones=[30, 60, 80],
    gamma=0.1
)

# Exponential Decay: reduce exponentially every epoch
exp_scheduler = optim.lr_scheduler.ExponentialLR(
    optimizer,
    gamma=0.95
)

3.2 Cosine Annealing

Cosine Annealing smoothly decreases the learning rate following a cosine curve. Cosine Annealing with Warm Restarts periodically resets the learning rate for exploration.

# Cosine Annealing
cosine_scheduler = optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=100,
    eta_min=1e-6
)

# Cosine Annealing with Warm Restarts (SGDR)
cosine_restart = optim.lr_scheduler.CosineAnnealingWarmRestarts(
    optimizer,
    T_0=10,
    T_mult=2,
    eta_min=1e-6
)

3.3 Warmup + Cosine Schedule

The standard schedule for training Transformer models. The learning rate increases linearly during warmup, then decreases following a cosine curve.

import math
from torch.optim.lr_scheduler import LambdaLR

def get_cosine_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps, num_cycles=0.5):
    def lr_lambda(current_step):
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1, num_warmup_steps))
        progress = float(current_step - num_warmup_steps) / float(
            max(1, num_training_steps - num_warmup_steps)
        )
        return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress)))

    return LambdaLR(optimizer, lr_lambda)

optimizer = optim.AdamW(model.parameters(), lr=5e-5)
scheduler = get_cosine_schedule_with_warmup(
    optimizer,
    num_warmup_steps=1000,
    num_training_steps=10000
)

3.4 OneCycleLR

OneCycleLR aggressively ramps the learning rate up and then down for fast convergence. Introduced by Leslie Smith and popularized by FastAI.

optimizer = optim.SGD(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=0.1,
    steps_per_epoch=len(train_loader),
    epochs=10,
    pct_start=0.3,
    anneal_strategy='cos',
    div_factor=25.0,
    final_div_factor=1e4
)

for epoch in range(10):
    for batch in train_loader:
        optimizer.zero_grad()
        loss = criterion(model(batch[0]), batch[1])
        loss.backward()
        optimizer.step()
        scheduler.step()  # OneCycleLR steps per batch

3.5 Learning Rate Finder

Automatically identifies an appropriate learning rate range before training.

from torch_lr_finder import LRFinder

model = MLP()
optimizer = optim.SGD(model.parameters(), lr=1e-7, weight_decay=1e-2)
criterion = nn.MSELoss()

lr_finder = LRFinder(model, optimizer, criterion, device="cuda")
lr_finder.range_test(train_loader, end_lr=100, num_iter=100)
lr_finder.plot()
lr_finder.reset()

# Select the LR at the steepest loss decline
# Typically use 1/10 to 1/3 of the value at the minimum

4. Loss Functions

4.1 Regression Loss Functions

import torch
import torch.nn as nn
import torch.nn.functional as F

# MSE - sensitive to outliers
mse_loss = nn.MSELoss()

# MAE - robust to outliers
mae_loss = nn.L1Loss()

# Huber Loss - compromise between MSE and MAE
# |y - y_hat| < delta: 0.5 * (y - y_hat)^2
# |y - y_hat| >= delta: delta * (|y - y_hat| - 0.5 * delta)
huber_loss = nn.HuberLoss(delta=1.0)

def huber_loss_manual(pred, target, delta=1.0):
    residual = torch.abs(pred - target)
    condition = residual < delta
    squared_loss = 0.5 * residual ** 2
    linear_loss = delta * residual - 0.5 * delta ** 2
    return torch.where(condition, squared_loss, linear_loss).mean()

4.2 Classification Loss Functions

# Cross-Entropy Loss (multi-class)
ce_loss = nn.CrossEntropyLoss()

# Binary Cross-Entropy (binary classification)
bce_loss = nn.BCEWithLogitsLoss()

# Label Smoothing Cross-Entropy (reduces overconfidence)
ce_smooth = nn.CrossEntropyLoss(label_smoothing=0.1)

# Focal Loss (addresses class imbalance)
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, alpha=None, reduction='mean'):
        super().__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = ((1 - pt) ** self.gamma) * ce_loss

        if self.alpha is not None:
            alpha_t = self.alpha[targets]
            focal_loss = alpha_t * focal_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        return focal_loss

4.3 Segmentation Loss Functions

def bce_loss_fn(pred, target):
    return F.binary_cross_entropy_with_logits(pred, target)

# Dice Loss (robust to class imbalance)
def dice_loss(pred, target, smooth=1.0):
    pred = torch.sigmoid(pred)
    pred_flat = pred.view(-1)
    target_flat = target.view(-1)

    intersection = (pred_flat * target_flat).sum()
    dice = (2. * intersection + smooth) / (pred_flat.sum() + target_flat.sum() + smooth)
    return 1 - dice

# BCE + Dice combination (common in segmentation)
def bce_dice_loss(pred, target, bce_weight=0.5):
    bce = bce_loss_fn(pred, target)
    dice = dice_loss(pred, target)
    return bce_weight * bce + (1 - bce_weight) * dice

4.4 Metric Learning Loss Functions

# Contrastive Loss (pull similar pairs together, push dissimilar apart)
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        # label=1: same class, label=0: different class
        euclidean_dist = F.pairwise_distance(output1, output2)
        loss = (label * euclidean_dist.pow(2) +
                (1 - label) * F.relu(self.margin - euclidean_dist).pow(2))
        return loss.mean()

# Triplet Loss (anchor, positive, negative)
class TripletLoss(nn.Module):
    def __init__(self, margin=0.3):
        super().__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        pos_dist = F.pairwise_distance(anchor, positive)
        neg_dist = F.pairwise_distance(anchor, negative)
        loss = F.relu(pos_dist - neg_dist + self.margin)
        return loss.mean()

5. Regularization Techniques

Techniques to prevent overfitting and improve generalization.

5.1 L1/L2 Regularization

# L2 Regularization (Weight Decay)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)

# L1 Regularization (manual implementation)
def l1_regularization(model, lambda_l1):
    l1_penalty = 0
    for param in model.parameters():
        l1_penalty += torch.abs(param).sum()
    return lambda_l1 * l1_penalty

# Elastic Net (L1 + L2)
def elastic_net_loss(model, criterion, outputs, targets, lambda_l1=1e-5, lambda_l2=1e-4):
    base_loss = criterion(outputs, targets)
    l1_penalty = sum(torch.abs(p).sum() for p in model.parameters())
    l2_penalty = sum((p ** 2).sum() for p in model.parameters())
    return base_loss + lambda_l1 * l1_penalty + lambda_l2 * l2_penalty

5.2 Dropout

Dropout randomly deactivates neurons during training to prevent co-adaptation. Inverted Dropout divides by the keep probability during training, so no scaling is needed at inference.

class ModelWithDropout(nn.Module):
    def __init__(self, dropout_rate=0.5):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        return self.net(x)

# Training mode: dropout is active
model.train()

# Inference mode: dropout is disabled
model.eval()

5.3 Data Augmentation

from torchvision import transforms

train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomCrop(32, padding=4),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomRotation(degrees=15),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Mixup Augmentation
def mixup_data(x, y, alpha=1.0):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    index = torch.randperm(batch_size)

    mixed_x = lam * x + (1 - lam) * x[index]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

# CutMix Augmentation
def cutmix_data(x, y, alpha=1.0):
    lam = np.random.beta(alpha, alpha)
    batch_size, C, H, W = x.size()
    index = torch.randperm(batch_size)

    cut_ratio = np.sqrt(1. - lam)
    cut_w = int(W * cut_ratio)
    cut_h = int(H * cut_ratio)

    cx = np.random.randint(W)
    cy = np.random.randint(H)

    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)

    mixed_x = x.clone()
    mixed_x[:, :, bby1:bby2, bbx1:bbx2] = x[index, :, bby1:bby2, bbx1:bbx2]
    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (W * H))

    return mixed_x, y, y[index], lam

5.4 Early Stopping

class EarlyStopping:
    def __init__(self, patience=10, min_delta=0.001, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.counter = 0
        self.best_loss = None
        self.best_weights = None
        self.early_stop = False

    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.best_weights = {k: v.clone() for k, v in model.state_dict().items()}
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            print(f"EarlyStopping counter: {self.counter}/{self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.best_weights = {k: v.clone() for k, v in model.state_dict().items()}
            self.counter = 0

    def restore(self, model):
        if self.restore_best_weights and self.best_weights:
            model.load_state_dict(self.best_weights)
            print("Restored best model weights")

6. Normalization Layers

6.1 Batch Normalization

Proposed by Sergey Ioffe and Christian Szegedy in 2015, Batch Normalization normalizes features within each mini-batch to address the internal covariate shift problem.

The process:

1. Mini-batch mean: mu_B = (1/m) * sum(x_i)
2. Mini-batch variance: sigma_B^2 = (1/m) * sum((x_i - mu_B)^2)
3. Normalize: x_hat_i = (x_i - mu_B) / sqrt(sigma_B^2 + epsilon)
4. Scale and shift: y_i = gamma * x_hat_i + beta

gamma (scale) and beta (shift) are learnable parameters.

import torch
import torch.nn as nn

class BatchNormNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(784, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        return self.net(x)

# Manual BatchNorm implementation
class BatchNorm(nn.Module):
    def __init__(self, num_features, eps=1e-5, momentum=0.1):
        super().__init__()
        self.gamma = nn.Parameter(torch.ones(num_features))
        self.beta = nn.Parameter(torch.zeros(num_features))
        self.eps = eps
        self.momentum = momentum

        self.register_buffer('running_mean', torch.zeros(num_features))
        self.register_buffer('running_var', torch.ones(num_features))

    def forward(self, x):
        if self.training:
            mean = x.mean(dim=0)
            var = x.var(dim=0, unbiased=False)
            self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean
            self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var
        else:
            mean = self.running_mean
            var = self.running_var

        x_norm = (x - mean) / torch.sqrt(var + self.eps)
        return self.gamma * x_norm + self.beta

6.2 Layer Normalization (Transformer Standard)

Layer Normalization normalizes across the feature dimension rather than the batch dimension. It is independent of batch size, making it suitable for RNNs and Transformers.

class LayerNorm(nn.Module):
    def __init__(self, normalized_shape, eps=1e-5):
        super().__init__()
        if isinstance(normalized_shape, int):
            normalized_shape = (normalized_shape,)
        self.normalized_shape = normalized_shape
        self.gamma = nn.Parameter(torch.ones(normalized_shape))
        self.beta = nn.Parameter(torch.zeros(normalized_shape))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        x_norm = (x - mean) / torch.sqrt(var + self.eps)
        return self.gamma * x_norm + self.beta

# Transformer Block with Pre-LayerNorm (modern GPT-style)
class TransformerBlock(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward):
        super().__init__()
        self.attention = nn.MultiheadAttention(d_model, nhead)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.GELU(),
            nn.Linear(dim_feedforward, d_model)
        )

    def forward(self, x):
        attn_out, _ = self.attention(self.norm1(x), self.norm1(x), self.norm1(x))
        x = x + attn_out
        x = x + self.ffn(self.norm2(x))
        return x

6.3 Instance, Group, and RMS Normalization

# Instance Normalization (per-sample, per-channel)
# Effective for style transfer
instance_norm = nn.InstanceNorm2d(64)

# Group Normalization (normalize within channel groups)
# Alternative to BN when batch size is small
group_norm = nn.GroupNorm(num_groups=8, num_channels=64)

# RMS Normalization (used in LLaMA, T5)
# Removes mean centering from LayerNorm for speed
class RMSNorm(nn.Module):
    def __init__(self, dim, eps=1e-6):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(dim))

    def _norm(self, x):
        return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

    def forward(self, x):
        return self.weight * self._norm(x.float()).type_as(x)

# Summary of when to use each normalization:
# BatchNorm: CNN, batch-dependent, best with batch size >= 16
# LayerNorm: Transformer/RNN, batch-independent
# InstanceNorm: style transfer, per-sample per-channel
# GroupNorm: small batches, detection/segmentation
# RMSNorm: LLMs, lightweight LayerNorm alternative

7. Weight Initialization

7.1 Xavier/He Initialization

Weight initialization sets the starting point for training. Poor initialization can trigger vanishing or exploding gradients.

import torch
import torch.nn as nn

class WeightInitDemo(nn.Module):
    def __init__(self, init_method='xavier'):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.Linear(256, 256) for _ in range(5)
        ])
        self.apply_init(init_method)

    def apply_init(self, method):
        for layer in self.layers:
            if method == 'zeros':
                nn.init.zeros_(layer.weight)        # Bad: symmetry problem
            elif method == 'random_small':
                nn.init.normal_(layer.weight, std=0.01)
            elif method == 'xavier_uniform':
                nn.init.xavier_uniform_(layer.weight)  # For sigmoid/tanh
            elif method == 'xavier_normal':
                nn.init.xavier_normal_(layer.weight)
            elif method == 'kaiming_uniform':
                nn.init.kaiming_uniform_(layer.weight, mode='fan_in', nonlinearity='relu')
            elif method == 'kaiming_normal':
                nn.init.kaiming_normal_(layer.weight, mode='fan_out', nonlinearity='relu')  # For ReLU
            nn.init.zeros_(layer.bias)

    def forward(self, x):
        for layer in self.layers:
            x = torch.relu(layer(x))
        return x

# Initialization comparison
x = torch.randn(100, 256)
for method in ['zeros', 'random_small', 'xavier_uniform', 'kaiming_normal']:
    model = WeightInitDemo(method)
    with torch.no_grad():
        out = model(x)
    print(f"{method}: output mean={out.mean():.4f}, std={out.std():.4f}")

Xavier/Glorot Initialization is designed for sigmoid/tanh activations:

  • Uniform: weights drawn from Uniform(-limit, +limit) where limit = sqrt(6 / (fan_in + fan_out))

He/Kaiming Initialization is designed for ReLU activations:

  • Normal: weights drawn from Normal(0, sqrt(2 / fan_in))

8. Gradient Problem Solutions

8.1 Vanishing and Exploding Gradients

Vanishing Gradient: Gradients shrink toward zero as they propagate back through layers, preventing early layers from learning. Common with sigmoid and tanh activations in deep networks.

Exploding Gradient: Gradients grow exponentially, causing NaN or Inf values. Common in RNNs with long sequences.

import torch.nn.utils as utils

# Method 1: Gradient norm clipping
max_norm = 1.0
total_norm = utils.clip_grad_norm_(model.parameters(), max_norm)
print(f"Gradient norm: {total_norm:.4f}")

# Method 2: Gradient value clipping
utils.clip_grad_value_(model.parameters(), clip_value=0.5)

# Usage in training loop
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

for batch in train_loader:
    optimizer.zero_grad()
    loss = criterion(model(batch[0]), batch[1])
    loss.backward()

    # Clip after backward, before optimizer step
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

    optimizer.step()

8.2 Residual Connections (Skip Connections)

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)  # Skip Connection
        out = self.relu(out)
        return out

8.3 Gradient Checkpointing

For very deep models, trade compute for memory: discard intermediate activations and recompute them during the backward pass.

from torch.utils.checkpoint import checkpoint, checkpoint_sequential

class DeepModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(*[
            nn.Sequential(nn.Linear(512, 512), nn.ReLU())
            for _ in range(20)
        ])

    def forward(self, x):
        # Standard: stores all activations O(N) memory
        # return self.layers(x)

        # Gradient Checkpointing: O(sqrt(N)) memory
        return checkpoint_sequential(self.layers, segments=5, input=x)

9. Transfer Learning and Fine-tuning

9.1 Feature Extraction vs Fine-tuning

import torchvision.models as models

# Feature Extraction: freeze pretrained weights
def feature_extraction(num_classes):
    model = models.resnet50(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False

    # Replace only the classifier head
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model

# Fine-tuning: selectively unfreeze layers
def fine_tuning(num_classes, unfreeze_layers=None):
    model = models.resnet50(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False

    model.fc = nn.Linear(model.fc.in_features, num_classes)

    if unfreeze_layers:
        for name, param in model.named_parameters():
            for layer in unfreeze_layers:
                if layer in name:
                    param.requires_grad = True

    return model

9.2 Progressive Unfreezing and Discriminative Learning Rates

def discriminative_lr_optimizer(model, base_lr=1e-4, lr_multiplier=10):
    # Assign lower LR to early layers, higher LR to later layers
    param_groups = [
        {'params': model.layer1.parameters(), 'lr': base_lr / (lr_multiplier**3)},
        {'params': model.layer2.parameters(), 'lr': base_lr / (lr_multiplier**2)},
        {'params': model.layer3.parameters(), 'lr': base_lr / lr_multiplier},
        {'params': model.layer4.parameters(), 'lr': base_lr},
        {'params': model.fc.parameters(), 'lr': base_lr * lr_multiplier},
    ]
    return torch.optim.Adam(param_groups)

9.3 LoRA (Low-Rank Adaptation)

LoRA is a parameter-efficient fine-tuning technique for large language models. It freezes the original weight matrices and learns a low-rank decomposition.

For an original weight matrix W with shape d by k, LoRA learns W' = W + BA, where B has shape d by r and A has shape r by k. The rank r is chosen to be much smaller than both d and k.

class LoRALayer(nn.Module):
    def __init__(self, in_features, out_features, rank=4, alpha=1.0):
        super().__init__()
        self.rank = rank
        self.alpha = alpha
        self.scaling = alpha / rank

        # Frozen original weights
        self.weight = nn.Parameter(
            torch.randn(out_features, in_features),
            requires_grad=False
        )

        # LoRA matrix A (random init)
        self.lora_A = nn.Parameter(torch.randn(rank, in_features) * 0.01)
        # LoRA matrix B (zero init -> identical to original at start)
        self.lora_B = nn.Parameter(torch.zeros(out_features, rank))
        self.bias = nn.Parameter(torch.zeros(out_features))

    def forward(self, x):
        base_output = nn.functional.linear(x, self.weight, self.bias)
        lora_output = (x @ self.lora_A.T @ self.lora_B.T) * self.scaling
        return base_output + lora_output

# Using HuggingFace PEFT library
from peft import get_peft_model, LoraConfig, TaskType

lora_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=8,
    lora_alpha=32,
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.05,
    bias="none"
)

# peft_model = get_peft_model(base_model, lora_config)
# peft_model.print_trainable_parameters()
# trainable params: 4,194,304 || all params: 6,742,609,920 || trainable%: 0.062

10. Hyperparameter Tuning

10.1 Bayesian Optimization with Optuna

import optuna
import torch
import torch.nn as nn

def objective(trial):
    lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
    n_layers = trial.suggest_int('n_layers', 1, 5)
    n_units = trial.suggest_categorical('n_units', [64, 128, 256, 512])
    dropout_rate = trial.suggest_float('dropout', 0.0, 0.5)
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'AdamW', 'SGD'])

    layers = []
    in_dim = 784
    for _ in range(n_layers):
        layers.extend([
            nn.Linear(in_dim, n_units),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        ])
        in_dim = n_units
    layers.append(nn.Linear(in_dim, 10))
    model = nn.Sequential(*layers)

    if optimizer_name == 'Adam':
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    elif optimizer_name == 'AdamW':
        optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
    else:
        optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    val_accuracy = 0.95  # replace with actual training
    return val_accuracy

study = optuna.create_study(
    direction='maximize',
    sampler=optuna.samplers.TPESampler(),
    pruner=optuna.pruners.MedianPruner()
)

study.optimize(objective, n_trials=100, timeout=3600)

print(f"Best trial: {study.best_trial.value:.4f}")
print(f"Best params: {study.best_trial.params}")

11. Mixed Precision Training

11.1 FP32 vs FP16 vs BF16

FormatExponent bitsMantissa bitsRangePrimary use
FP32823+-3.4e38Default training
FP16510+-65504Inference / training (overflow risk)
BF1687+-3.4e38LLM training (A100, TPU)

11.2 PyTorch AMP (Automatic Mixed Precision)

import torch
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        inputs, labels = inputs.cuda(), labels.cuda()
        optimizer.zero_grad()

        # FP16 computation within autocast context
        with autocast(dtype=torch.float16):
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        # Scale loss, then backprop
        scaler.scale(loss).backward()

        # Unscale for gradient clipping
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # Step (skips if NaN/Inf gradients detected)
        scaler.step(optimizer)
        scaler.update()

# BF16 (more stable, requires Ampere or newer GPU)
with autocast(dtype=torch.bfloat16):
    outputs = model(inputs)
    loss = criterion(outputs, labels)

Key benefits of AMP:

  • Memory reduction: roughly 2x (FP16 parameters)
  • Speed improvement: 1.5x–3x on Tensor Core GPUs
  • Near-identical accuracy to FP32 in most tasks

12. Distributed Training

12.1 Data Parallelism with DDP

Distribute data across multiple GPUs. Each GPU independently computes forward and backward passes, then gradients are aggregated.

import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

def setup(rank, world_size):
    import os
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group('nccl', rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

def train_ddp(rank, world_size, model_class, dataset):
    setup(rank, world_size)

    device = torch.device(f'cuda:{rank}')
    model = model_class().to(device)
    model = DDP(model, device_ids=[rank])

    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )

    loader = torch.utils.data.DataLoader(
        dataset,
        batch_size=32,
        sampler=sampler,
        num_workers=4,
        pin_memory=True
    )

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3 * world_size)
    criterion = nn.CrossEntropyLoss()
    scaler = GradScaler()

    for epoch in range(num_epochs):
        sampler.set_epoch(epoch)  # Different shuffle per epoch

        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()

            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

        if rank == 0:
            print(f"Epoch {epoch}: Loss = {loss.item():.4f}")

    cleanup()

import torch.multiprocessing as mp

if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_ddp, args=(world_size, MyModel, dataset), nprocs=world_size, join=True)

12.2 FSDP (Fully Sharded Data Parallel)

FSDP shards model parameters, gradients, and optimizer states across all GPUs. Essential for training models with billions of parameters that exceed single-GPU memory.

from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import ShardingStrategy, MixedPrecision
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
import functools

bf16_policy = MixedPrecision(
    param_dtype=torch.bfloat16,
    reduce_dtype=torch.bfloat16,
    buffer_dtype=torch.bfloat16
)

auto_wrap_policy = functools.partial(
    transformer_auto_wrap_policy,
    transformer_layer_cls={TransformerBlock}
)

model = FSDP(
    model,
    sharding_strategy=ShardingStrategy.FULL_SHARD,
    mixed_precision=bf16_policy,
    auto_wrap_policy=auto_wrap_policy,
    device_id=rank
)

FSDP sharding strategies:

  • FULL_SHARD: shard params, gradients, and optimizer states (maximum memory savings)
  • SHARD_GRAD_OP: shard gradients and optimizer states only
  • NO_SHARD: equivalent to DDP

12.3 Gradient Accumulation

Simulate large batch sizes on limited GPU memory by accumulating gradients across multiple micro-batches.

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
scaler = GradScaler()

micro_batch_size = 8
accumulation_steps = 8  # Effective batch size: 64

optimizer.zero_grad()
for step, (inputs, labels) in enumerate(train_loader):
    inputs, labels = inputs.cuda(), labels.cuda()

    with autocast():
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss = loss / accumulation_steps  # Normalize loss

    scaler.scale(loss).backward()

    if (step + 1) % accumulation_steps == 0:
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

13. Large Language Model Training Techniques

13.1 Instruction Tuning

Instruction tuning trains models to follow natural language instructions. It was central to the success of FLAN, InstructGPT, and LLaMA-2.

# Instruction tuning data format
instruction_data = [
    {
        "instruction": "Analyze the sentiment of the following text.",
        "input": "The weather today is absolutely gorgeous, I feel wonderful!",
        "output": "Positive sentiment. The text expresses satisfaction with the weather and a feeling of happiness."
    },
    {
        "instruction": "Summarize the following article.",
        "input": "...(long text)...",
        "output": "...(summary)..."
    }
]

# Alpaca-style prompt template
def format_instruction(sample):
    if sample.get('input'):
        return f"""### Instruction:
{sample['instruction']}

### Input:
{sample['input']}

### Response:
{sample['output']}"""
    else:
        return f"""### Instruction:
{sample['instruction']}

### Response:
{sample['output']}"""

13.2 RLHF (Reinforcement Learning from Human Feedback)

RLHF involves three stages:

Stage 1 — SFT (Supervised Fine-tuning): fine-tune on high-quality human demonstrations Stage 2 — Reward Model: train a reward model to predict human preferences Stage 3 — PPO: optimize the policy with RL using the reward model

# Stage 2: Reward Model (Bradley-Terry preference model)
class RewardModel(nn.Module):
    def __init__(self, base_model):
        super().__init__()
        self.base_model = base_model
        self.reward_head = nn.Linear(base_model.config.hidden_size, 1)

    def forward(self, input_ids, attention_mask):
        outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden = outputs.last_hidden_state[:, -1, :]
        reward = self.reward_head(last_hidden).squeeze(-1)
        return reward

# Preference loss (Bradley-Terry model)
def preference_loss(reward_chosen, reward_rejected):
    # p(chosen > rejected) = sigmoid(r_chosen - r_rejected)
    return -torch.log(torch.sigmoid(reward_chosen - reward_rejected)).mean()

13.3 DPO (Direct Preference Optimization)

DPO simplifies RLHF by eliminating the need for PPO, directly optimizing the policy on preference data using a closed-form reparameterization.

import torch
import torch.nn.functional as F

def dpo_loss(
    policy_chosen_logps,
    policy_rejected_logps,
    reference_chosen_logps,
    reference_rejected_logps,
    beta=0.1
):
    # Log ratios between policy and reference model
    chosen_rewards = beta * (policy_chosen_logps - reference_chosen_logps)
    rejected_rewards = beta * (policy_rejected_logps - reference_rejected_logps)

    # DPO loss: -log(sigmoid(chosen_rewards - rejected_rewards))
    loss = -F.logsigmoid(chosen_rewards - rejected_rewards).mean()

    chosen_reward = chosen_rewards.detach().mean()
    rejected_reward = rejected_rewards.detach().mean()
    reward_accuracy = (chosen_rewards > rejected_rewards).float().mean()

    return loss, chosen_reward, rejected_reward, reward_accuracy

DPO advantages over RLHF:

  • No need to train a separate reward model
  • No PPO hyperparameter tuning
  • More stable training
  • Comparable or better alignment results

14. Complete Training Pipeline

14.1 Production-Grade Trainer

import torch
import torch.nn as nn
from torch.cuda.amp import autocast, GradScaler

class Trainer:
    def __init__(
        self,
        model,
        train_loader,
        val_loader,
        optimizer,
        scheduler,
        criterion,
        device='cuda',
        use_amp=True,
        grad_clip=1.0,
        accumulation_steps=1
    ):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.criterion = criterion
        self.device = device
        self.use_amp = use_amp
        self.grad_clip = grad_clip
        self.accumulation_steps = accumulation_steps
        self.scaler = GradScaler() if use_amp else None

    def train_epoch(self):
        self.model.train()
        total_loss = 0
        self.optimizer.zero_grad()

        for step, (inputs, labels) in enumerate(self.train_loader):
            inputs, labels = inputs.to(self.device), labels.to(self.device)

            if self.use_amp:
                with autocast():
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, labels) / self.accumulation_steps
                self.scaler.scale(loss).backward()
            else:
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels) / self.accumulation_steps
                loss.backward()

            if (step + 1) % self.accumulation_steps == 0:
                if self.use_amp:
                    self.scaler.unscale_(self.optimizer)

                if self.grad_clip:
                    nn.utils.clip_grad_norm_(self.model.parameters(), self.grad_clip)

                if self.use_amp:
                    self.scaler.step(self.optimizer)
                    self.scaler.update()
                else:
                    self.optimizer.step()

                if self.scheduler:
                    self.scheduler.step()

                self.optimizer.zero_grad()

            total_loss += loss.item() * self.accumulation_steps

        return total_loss / len(self.train_loader)

    @torch.no_grad()
    def evaluate(self):
        self.model.eval()
        total_loss = 0
        correct = 0
        total = 0

        for inputs, labels in self.val_loader:
            inputs, labels = inputs.to(self.device), labels.to(self.device)

            with autocast() if self.use_amp else torch.no_grad():
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        return total_loss / len(self.val_loader), 100. * correct / total

    def fit(self, epochs, save_path=None):
        best_val_acc = 0
        early_stopping = EarlyStopping(patience=10)

        for epoch in range(epochs):
            train_loss = self.train_epoch()
            val_loss, val_acc = self.evaluate()

            print(f"Epoch {epoch+1}/{epochs}: "
                  f"Train Loss: {train_loss:.4f}, "
                  f"Val Loss: {val_loss:.4f}, "
                  f"Val Acc: {val_acc:.2f}%")

            if val_acc > best_val_acc:
                best_val_acc = val_acc
                if save_path:
                    torch.save(self.model.state_dict(), save_path)

            early_stopping(val_loss, self.model)
            if early_stopping.early_stop:
                print("Early stopping triggered!")
                break

        return best_val_acc

Conclusion and Best Practices

A summary of core principles for effective deep learning training:

Optimizer selection

  • General tasks: AdamW (lr=1e-3 to 1e-4, weight_decay=0.01)
  • Transformers: AdamW + Warmup + Cosine Schedule
  • Large-batch training: LAMB or LARS
  • Memory-constrained: Lion

Regularization strategy

  • Dropout is typically set between 0.1 and 0.5
  • Small datasets: stronger regularization (larger weight decay, higher dropout)
  • Large datasets: weak or no regularization

Learning rate scheduling

  • CNNs: OneCycleLR or Step Decay
  • Transformers: Warmup + Cosine or Inverse Square Root

Mixed precision

  • Always use AMP (1.5x–3x speedup, 2x memory savings)
  • A100/H100 and newer: prefer BF16
  • Older GPUs: use FP16 + Loss Scaling

Distributed training

  • Multi-GPU single server: DDP + NCCL
  • Billion-parameter models: FSDP
  • Always use Gradient Accumulation to increase effective batch size

References