- Authors

- Name
- Youngju Kim
- @fjvbn20031
서론
딥러닝은 지난 10여 년간 컴퓨터 비전, 자연어 처리, 음성 인식, 강화학습 등 거의 모든 AI 분야에서 혁명적인 성과를 이루어냈습니다. 하지만 단순히 신경망 구조를 설계하는 것만으로는 좋은 모델을 만들 수 없습니다. 어떻게 학습시키느냐가 모델의 성능을 결정짓는 핵심 요소입니다.
이 글에서는 딥러닝 모델을 효과적으로 학습시키기 위한 모든 기법을 체계적으로 다룹니다. 기초적인 경사 하강법부터 시작해 최신 옵티마이저, 학습률 스케줄링, 다양한 정규화 기법, 전이학습, 혼합 정밀도 학습, 그리고 대규모 분산 학습까지 실전 코드와 함께 배웁니다.
1. 경사 하강법(Gradient Descent) 기초
1.1 손실 함수(Loss Function)의 개념
딥러닝에서 **손실 함수(Loss Function)**는 모델의 예측값과 실제 정답 사이의 오차를 수치로 표현하는 함수입니다. 모델 학습의 목표는 이 손실값을 최소화하는 파라미터(가중치)를 찾는 것입니다.
손실 함수 L은 모델 파라미터 theta와 데이터 (x, y)에 의존합니다. 수식으로 표현하면 다음과 같습니다.
L(theta) = (1/N) * sum_{i=1}^{N} l(f(x_i; theta), y_i)
여기서 f는 모델 함수, l은 개별 샘플의 손실, N은 데이터 개수입니다.
1.2 경사 하강법 직관적 이해
경사 하강법을 직관적으로 이해하려면 산에서 눈을 감고 내려오는 등산객을 상상하면 됩니다. 등산객은 현재 위치에서 가장 가파른 내리막 방향(기울기의 반대 방향)으로 한 걸음씩 이동합니다. 이를 반복하면 결국 골짜기(최솟값)에 도달하게 됩니다.
수학적으로는 다음 업데이트 규칙을 따릅니다.
theta_{t+1} = theta_t - lr * grad_L(theta_t)
여기서 lr은 학습률(learning rate), grad_L은 손실 함수의 그래디언트입니다.
1.3 Batch GD vs Mini-batch GD vs SGD
Batch Gradient Descent (전체 배치)
- 전체 데이터셋으로 그래디언트 계산
- 안정적이지만 메모리 소모가 크고 느림
- 대규모 데이터셋에서 비실용적
Stochastic Gradient Descent (확률적 경사 하강법, SGD)
- 샘플 1개로 그래디언트 계산
- 빠르지만 노이즈가 많아 불안정
- 온라인 학습에 적합
Mini-batch Gradient Descent (미니배치)
- 보통 32~512개 샘플로 그래디언트 계산
- 배치 GD와 SGD의 장점을 결합
- 실제로 가장 많이 사용되는 방식
import torch
import torch.nn as nn
import numpy as np
# 간단한 선형 회귀로 경사 하강법 구현
class LinearRegression(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.linear = nn.Linear(input_dim, 1)
def forward(self, x):
return self.linear(x)
# 데이터 생성
torch.manual_seed(42)
X = torch.randn(1000, 10)
true_w = torch.randn(10, 1)
y = X @ true_w + 0.1 * torch.randn(1000, 1)
# 미니배치 경사 하강법 구현
def train_minibatch(model, X, y, batch_size=32, lr=0.01, epochs=100):
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
criterion = nn.MSELoss()
losses = []
N = len(X)
for epoch in range(epochs):
# 셔플
perm = torch.randperm(N)
X_shuffled = X[perm]
y_shuffled = y[perm]
epoch_loss = 0
for i in range(0, N, batch_size):
x_batch = X_shuffled[i:i+batch_size]
y_batch = y_shuffled[i:i+batch_size]
optimizer.zero_grad()
pred = model(x_batch)
loss = criterion(pred, y_batch)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
losses.append(epoch_loss / (N // batch_size))
if epoch % 10 == 0:
print(f"Epoch {epoch}: Loss = {losses[-1]:.4f}")
return losses
model = LinearRegression(10)
losses = train_minibatch(model, X, y)
1.4 학습률(Learning Rate)의 중요성
학습률은 딥러닝에서 가장 중요한 하이퍼파라미터 중 하나입니다.
- 학습률이 너무 크면: 손실값이 발산하거나 최솟값 주변에서 진동
- 학습률이 너무 작으면: 학습이 매우 느리고, 지역 최솟값에 갇힐 가능성 증가
- 적절한 학습률: 빠르게 수렴하면서 좋은 최솟값에 도달
일반적으로 0.1, 0.01, 0.001 등의 값에서 시작하며, 네트워크 구조와 데이터에 따라 달라집니다.
1.5 수학적 유도 (편미분, 체인 룰)
신경망에서 역전파(Backpropagation)는 체인 룰(Chain Rule)을 이용해 각 레이어의 그래디언트를 계산합니다.
3레이어 네트워크를 예로 들면 다음과 같습니다.
forward: x -> z1=W1*x -> a1=relu(z1) -> z2=W2*a1 -> output
loss: L = MSE(output, y)
backward (chain rule):
dL/dW2 = dL/d_output * d_output/dz2 * dz2/dW2
dL/dW1 = dL/d_output * ... * da1/dz1 * dz1/dW1
# NumPy로 직접 역전파 구현
import numpy as np
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def sigmoid_deriv(x):
s = sigmoid(x)
return s * (1 - s)
class SimpleNet:
def __init__(self, input_dim, hidden_dim, output_dim):
# He 초기화
self.W1 = np.random.randn(input_dim, hidden_dim) * np.sqrt(2/input_dim)
self.b1 = np.zeros(hidden_dim)
self.W2 = np.random.randn(hidden_dim, output_dim) * np.sqrt(2/hidden_dim)
self.b2 = np.zeros(output_dim)
def forward(self, x):
self.x = x
self.z1 = x @ self.W1 + self.b1
self.a1 = sigmoid(self.z1)
self.z2 = self.a1 @ self.W2 + self.b2
return self.z2
def backward(self, y, lr=0.01):
N = len(y)
# 출력층 그래디언트 (MSE loss)
dL_dz2 = 2 * (self.z2 - y.reshape(-1, 1)) / N
# W2, b2 그래디언트
dL_dW2 = self.a1.T @ dL_dz2
dL_db2 = dL_dz2.sum(axis=0)
# 은닉층으로 역전파
dL_da1 = dL_dz2 @ self.W2.T
dL_dz1 = dL_da1 * sigmoid_deriv(self.z1)
# W1, b1 그래디언트
dL_dW1 = self.x.T @ dL_dz1
dL_db1 = dL_dz1.sum(axis=0)
# 파라미터 업데이트
self.W2 -= lr * dL_dW2
self.b2 -= lr * dL_db2
self.W1 -= lr * dL_dW1
self.b1 -= lr * dL_db1
# 테스트
net = SimpleNet(10, 32, 1)
X_np = np.random.randn(100, 10)
y_np = np.random.randn(100)
for i in range(100):
pred = net.forward(X_np)
loss = np.mean((pred.flatten() - y_np) ** 2)
net.backward(y_np)
if i % 20 == 0:
print(f"Step {i}: MSE = {loss:.4f}")
2. 고급 옵티마이저(Optimizers)
2.1 Momentum SGD
일반 SGD는 기울기 방향으로만 이동하므로, 좁은 계곡 모양의 손실 지형에서 지그재그 이동을 하게 됩니다. Momentum은 물리학의 관성 개념을 도입해 이전 이동 방향을 기억하게 합니다.
v_t = beta * v_{t-1} + (1 - beta) * grad_t
theta_{t+1} = theta_t - lr * v_t
베타(momentum) 값은 보통 0.9를 사용합니다.
import torch
import torch.nn as nn
# Momentum SGD
optimizer_momentum = torch.optim.SGD(
model.parameters(),
lr=0.01,
momentum=0.9,
nesterov=False # Nesterov Momentum 사용 여부
)
# Nesterov Momentum (NAG) - 더 정확한 방향 예측
optimizer_nag = torch.optim.SGD(
model.parameters(),
lr=0.01,
momentum=0.9,
nesterov=True
)
2.2 Adagrad (적응적 학습률)
Adagrad는 각 파라미터마다 개별 학습률을 적용합니다. 자주 업데이트되는 파라미터는 학습률을 줄이고, 드물게 업데이트되는 파라미터는 학습률을 유지합니다.
G_t = G_{t-1} + grad_t^2
theta_{t+1} = theta_t - (lr / sqrt(G_t + epsilon)) * grad_t
희소(sparse) 데이터에 효과적이지만, G_t가 계속 누적되어 학습률이 0에 수렴하는 문제가 있습니다.
optimizer_adagrad = torch.optim.Adagrad(
model.parameters(),
lr=0.01,
eps=1e-8,
weight_decay=0
)
2.3 RMSprop
Adagrad의 학습률 소멸 문제를 해결하기 위해 **지수이동평균(Exponential Moving Average)**을 사용합니다.
E[g^2]_t = rho * E[g^2]_{t-1} + (1 - rho) * grad_t^2
theta_{t+1} = theta_t - (lr / sqrt(E[g^2]_t + epsilon)) * grad_t
optimizer_rmsprop = torch.optim.RMSprop(
model.parameters(),
lr=0.001,
alpha=0.99, # rho (decay factor)
eps=1e-8,
momentum=0,
centered=False
)
2.4 Adam (Adaptive Moment Estimation)
Adam은 Momentum과 RMSprop을 결합한 옵티마이저로, 현재 가장 널리 사용됩니다. 1차 모멘트(평균)와 2차 모멘트(분산)를 모두 추적합니다.
알고리즘 수식은 다음과 같습니다.
m_t = beta1 * m_{t-1} + (1 - beta1) * g_t # 1차 모멘트 (편향 보정 전)
v_t = beta2 * v_{t-1} + (1 - beta2) * g_t^2 # 2차 모멘트 (편향 보정 전)
m_hat = m_t / (1 - beta1^t) # 편향 보정
v_hat = v_t / (1 - beta2^t) # 편향 보정
theta_{t+1} = theta_t - lr * m_hat / (sqrt(v_hat) + epsilon)
기본 하이퍼파라미터: lr=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8
optimizer_adam = torch.optim.Adam(
model.parameters(),
lr=1e-3,
betas=(0.9, 0.999),
eps=1e-8,
weight_decay=0
)
2.5 AdamW (Weight Decay 분리)
표준 Adam에서 L2 정규화는 그래디언트와 결합되어 있어 적응적 학습률의 영향을 받습니다. AdamW는 가중치 감쇠(weight decay)를 파라미터 업데이트에 직접 적용합니다.
theta_{t+1} = theta_t - lr * (m_hat / (sqrt(v_hat) + epsilon) + lambda * theta_t)
Transformer 모델 학습에서 AdamW가 표준이 되었습니다 (BERT, GPT 등).
optimizer_adamw = torch.optim.AdamW(
model.parameters(),
lr=1e-4,
betas=(0.9, 0.999),
eps=1e-8,
weight_decay=0.01 # L2 정규화 강도
)
2.6 LARS와 LAMB (대규모 배치 학습)
대규모 배치(수천 개)를 사용할 때 일반 Adam은 성능이 저하됩니다. **LARS(Layer-wise Adaptive Rate Scaling)**와 LAMB는 레이어별로 학습률을 조정합니다.
LARS: lr_l = lr * ||w_l|| / (||g_l|| + lambda * ||w_l||)
LAMB: Adam 업데이트에 레이어별 신뢰 비율 적용
# pip install lars (또는 직접 구현)
# LAMB는 Hugging Face transformers에 포함
from transformers import optimization
# LAMB optimizer (transformers 라이브러리 활용)
# 또는 apex 라이브러리의 FusedLAMB 사용
2.7 Lion Optimizer (2023)
Google Brain이 2023년 발표한 **Lion(EvoLved Sign Momentum)**은 Adam보다 메모리를 적게 사용하면서 경쟁력 있는 성능을 보입니다. 부호(sign)만 사용하므로 업데이트가 항상 같은 크기입니다.
class Lion(torch.optim.Optimizer):
def __init__(self, params, lr=1e-4, betas=(0.9, 0.99), weight_decay=0.0):
defaults = dict(lr=lr, betas=betas, weight_decay=weight_decay)
super().__init__(params, defaults)
def step(self, closure=None):
loss = None
if closure is not None:
with torch.enable_grad():
loss = closure()
for group in self.param_groups:
for p in group['params']:
if p.grad is None:
continue
grad = p.grad
lr = group['lr']
beta1, beta2 = group['betas']
wd = group['weight_decay']
state = self.state[p]
if len(state) == 0:
state['exp_avg'] = torch.zeros_like(p)
exp_avg = state['exp_avg']
# Lion 업데이트
update = exp_avg * beta1 + grad * (1 - beta1)
p.data.mul_(1 - lr * wd)
p.data.add_(update.sign_(), alpha=-lr)
# 모멘텀 업데이트
exp_avg.mul_(beta2).add_(grad, alpha=1 - beta2)
return loss
2.8 옵티마이저 비교 실험
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
# 간단한 모델로 옵티마이저 비교
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(2, 64),
nn.ReLU(),
nn.Linear(64, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
def forward(self, x):
return self.net(x)
def train_and_compare(optimizers_dict, X, y, epochs=200):
results = {}
for name, opt_fn in optimizers_dict.items():
model = MLP()
optimizer = opt_fn(model.parameters())
criterion = nn.MSELoss()
losses = []
for epoch in range(epochs):
optimizer.zero_grad()
pred = model(X)
loss = criterion(pred, y)
loss.backward()
optimizer.step()
losses.append(loss.item())
results[name] = losses
print(f"{name}: Final Loss = {losses[-1]:.4f}")
return results
# 데이터 생성
X = torch.randn(500, 2)
y = (X[:, 0] * 2 + X[:, 1] * 3 + torch.randn(500) * 0.1).unsqueeze(1)
optimizers = {
'SGD': lambda p: torch.optim.SGD(p, lr=0.01),
'SGD+Momentum': lambda p: torch.optim.SGD(p, lr=0.01, momentum=0.9),
'Adam': lambda p: torch.optim.Adam(p, lr=0.001),
'AdamW': lambda p: torch.optim.AdamW(p, lr=0.001, weight_decay=0.01),
'RMSprop': lambda p: torch.optim.RMSprop(p, lr=0.001),
}
results = train_and_compare(optimizers, X, y)
3. 학습률 스케줄링(LR Scheduling)
고정된 학습률은 최적이 아닙니다. 학습률 스케줄링을 통해 학습 과정에서 학습률을 동적으로 조절하면 더 빠른 수렴과 더 좋은 성능을 얻을 수 있습니다.
3.1 Step Decay와 Exponential Decay
import torch
import torch.optim as optim
model = MLP()
optimizer = optim.SGD(model.parameters(), lr=0.1)
# Step Decay: 일정 에폭마다 학습률을 gamma배로 감소
step_scheduler = optim.lr_scheduler.StepLR(
optimizer,
step_size=30, # 30 에폭마다
gamma=0.1 # 10배 감소
)
# MultiStep Decay: 지정된 에폭에서 감소
multistep_scheduler = optim.lr_scheduler.MultiStepLR(
optimizer,
milestones=[30, 60, 80],
gamma=0.1
)
# Exponential Decay: 매 에폭마다 지수적으로 감소
exp_scheduler = optim.lr_scheduler.ExponentialLR(
optimizer,
gamma=0.95 # 매 에폭 5% 감소
)
3.2 Cosine Annealing
Cosine Annealing은 학습률을 코사인 함수를 따라 부드럽게 감소시킵니다. 주기적으로 학습률을 재시작하는 Cosine Annealing with Warm Restarts도 자주 사용됩니다.
# Cosine Annealing
cosine_scheduler = optim.lr_scheduler.CosineAnnealingLR(
optimizer,
T_max=100, # 주기 (에폭 수)
eta_min=1e-6 # 최소 학습률
)
# Cosine Annealing with Warm Restarts (SGDR)
cosine_restart = optim.lr_scheduler.CosineAnnealingWarmRestarts(
optimizer,
T_0=10, # 초기 주기
T_mult=2, # 주기 배수
eta_min=1e-6
)
3.3 Warmup + Cosine Schedule
Transformer 모델 학습에서 표준이 된 스케줄입니다. 초기에는 학습률을 선형적으로 증가(워밍업)시키고, 이후 코사인 스케줄로 감소시킵니다.
import math
from torch.optim.lr_scheduler import LambdaLR
def get_cosine_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps, num_cycles=0.5):
def lr_lambda(current_step):
# Warmup 구간
if current_step < num_warmup_steps:
return float(current_step) / float(max(1, num_warmup_steps))
# Cosine 감소 구간
progress = float(current_step - num_warmup_steps) / float(
max(1, num_training_steps - num_warmup_steps)
)
return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress)))
return LambdaLR(optimizer, lr_lambda)
# 사용 예시
optimizer = optim.AdamW(model.parameters(), lr=5e-5)
scheduler = get_cosine_schedule_with_warmup(
optimizer,
num_warmup_steps=1000,
num_training_steps=10000
)
3.4 OneCycleLR
OneCycleLR은 빠른 수렴을 위한 스케줄로, 학습률을 빠르게 올렸다가 내리는 방식을 사용합니다. Leslie Smith의 논문에서 소개되었으며 FastAI에서 대중화되었습니다.
optimizer = optim.SGD(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.OneCycleLR(
optimizer,
max_lr=0.1,
steps_per_epoch=len(train_loader),
epochs=10,
pct_start=0.3, # warmup 비율
anneal_strategy='cos', # 감소 방식
div_factor=25.0, # 초기 lr = max_lr / div_factor
final_div_factor=1e4 # 최종 lr = max_lr / (div_factor * final_div_factor)
)
# 학습 루프
for epoch in range(10):
for batch in train_loader:
optimizer.zero_grad()
loss = criterion(model(batch[0]), batch[1])
loss.backward()
optimizer.step()
scheduler.step() # OneCycleLR은 배치마다 호출
3.5 Learning Rate Finder
Learning Rate Finder는 적절한 학습률 범위를 자동으로 찾아주는 기법입니다.
from torch_lr_finder import LRFinder
model = MLP()
optimizer = optim.SGD(model.parameters(), lr=1e-7, weight_decay=1e-2)
criterion = nn.MSELoss()
# LR Finder 실행
lr_finder = LRFinder(model, optimizer, criterion, device="cuda")
lr_finder.range_test(train_loader, end_lr=100, num_iter=100)
lr_finder.plot() # 손실-학습률 그래프 표시
lr_finder.reset() # 옵티마이저를 초기 상태로 복원
# 그래프에서 손실이 가장 가파르게 하락하는 지점의 학습률을 선택
# 일반적으로 최솟값의 1/10 ~ 1/3 정도를 사용
4. 손실 함수(Loss Functions)
4.1 회귀 손실 함수
import torch
import torch.nn as nn
import torch.nn.functional as F
# MSE (Mean Squared Error) - 이상치에 민감
mse_loss = nn.MSELoss()
# MAE (Mean Absolute Error) - 이상치에 강건
mae_loss = nn.L1Loss()
# Huber Loss - MSE와 MAE의 절충점
# |y - y_hat| < delta: 0.5 * (y - y_hat)^2
# |y - y_hat| >= delta: delta * (|y - y_hat| - 0.5 * delta)
huber_loss = nn.HuberLoss(delta=1.0)
# 직접 구현
def huber_loss_manual(pred, target, delta=1.0):
residual = torch.abs(pred - target)
condition = residual < delta
squared_loss = 0.5 * residual ** 2
linear_loss = delta * residual - 0.5 * delta ** 2
return torch.where(condition, squared_loss, linear_loss).mean()
4.2 분류 손실 함수
# Cross-Entropy Loss (다중 분류)
ce_loss = nn.CrossEntropyLoss()
# Binary Cross-Entropy (이진 분류)
bce_loss = nn.BCEWithLogitsLoss()
# Label Smoothing Cross-Entropy (과적합 방지)
ce_smooth = nn.CrossEntropyLoss(label_smoothing=0.1)
# Focal Loss (클래스 불균형 해결)
class FocalLoss(nn.Module):
def __init__(self, gamma=2.0, alpha=None, reduction='mean'):
super().__init__()
self.gamma = gamma
self.alpha = alpha
self.reduction = reduction
def forward(self, inputs, targets):
# inputs: (N, C) logits, targets: (N,) class indices
ce_loss = F.cross_entropy(inputs, targets, reduction='none')
pt = torch.exp(-ce_loss) # p_t = 모델이 정답 클래스에 할당한 확률
focal_loss = ((1 - pt) ** self.gamma) * ce_loss
if self.alpha is not None:
alpha_t = self.alpha[targets]
focal_loss = alpha_t * focal_loss
if self.reduction == 'mean':
return focal_loss.mean()
elif self.reduction == 'sum':
return focal_loss.sum()
return focal_loss
4.3 세그멘테이션 손실 함수
# BCE Loss for binary segmentation
def bce_loss(pred, target):
return F.binary_cross_entropy_with_logits(pred, target)
# Dice Loss (클래스 불균형에 강건)
def dice_loss(pred, target, smooth=1.0):
pred = torch.sigmoid(pred)
pred_flat = pred.view(-1)
target_flat = target.view(-1)
intersection = (pred_flat * target_flat).sum()
dice = (2. * intersection + smooth) / (pred_flat.sum() + target_flat.sum() + smooth)
return 1 - dice
# BCE + Dice 결합 (세그멘테이션에서 자주 사용)
def bce_dice_loss(pred, target, bce_weight=0.5):
bce = bce_loss(pred, target)
dice = dice_loss(pred, target)
return bce_weight * bce + (1 - bce_weight) * dice
4.4 메트릭 학습 손실 함수
# Contrastive Loss (유사한 샘플은 가깝게, 다른 샘플은 멀게)
class ContrastiveLoss(nn.Module):
def __init__(self, margin=1.0):
super().__init__()
self.margin = margin
def forward(self, output1, output2, label):
# label=1: 같은 클래스, label=0: 다른 클래스
euclidean_dist = F.pairwise_distance(output1, output2)
loss = (label * euclidean_dist.pow(2) +
(1 - label) * F.relu(self.margin - euclidean_dist).pow(2))
return loss.mean()
# Triplet Loss (anchor, positive, negative)
class TripletLoss(nn.Module):
def __init__(self, margin=0.3):
super().__init__()
self.margin = margin
def forward(self, anchor, positive, negative):
pos_dist = F.pairwise_distance(anchor, positive)
neg_dist = F.pairwise_distance(anchor, negative)
loss = F.relu(pos_dist - neg_dist + self.margin)
return loss.mean()
5. 정규화 기법(Regularization)
과적합(Overfitting)을 방지하고 모델의 일반화 능력을 높이는 기법들입니다.
5.1 L1/L2 정규화
# L2 Regularization (Weight Decay)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
# L1 Regularization (직접 구현)
def l1_regularization(model, lambda_l1):
l1_penalty = 0
for param in model.parameters():
l1_penalty += torch.abs(param).sum()
return lambda_l1 * l1_penalty
# L1 + L2 (Elastic Net)
def elastic_net_loss(model, criterion, outputs, targets, lambda_l1=1e-5, lambda_l2=1e-4):
# 기본 손실
base_loss = criterion(outputs, targets)
# L1 패널티
l1_penalty = sum(torch.abs(p).sum() for p in model.parameters())
# L2 패널티
l2_penalty = sum((p ** 2).sum() for p in model.parameters())
return base_loss + lambda_l1 * l1_penalty + lambda_l2 * l2_penalty
5.2 Dropout
Dropout은 학습 중 무작위로 뉴런을 비활성화하여 공동 적응(co-adaptation)을 방지합니다. Inverted Dropout은 추론 시 스케일링이 필요 없도록 학습 시 p로 나눕니다.
class ModelWithDropout(nn.Module):
def __init__(self, dropout_rate=0.5):
super().__init__()
self.net = nn.Sequential(
nn.Linear(784, 512),
nn.ReLU(),
nn.Dropout(p=dropout_rate), # Inverted Dropout (PyTorch 기본)
nn.Linear(512, 256),
nn.ReLU(),
nn.Dropout(p=dropout_rate),
nn.Linear(256, 10)
)
def forward(self, x):
return self.net(x)
# 학습 모드: dropout 활성화
model.train()
# 추론 모드: dropout 비활성화
model.eval()
# DropConnect (가중치를 무작위로 0으로 설정)
class DropConnect(nn.Module):
def __init__(self, p=0.5):
super().__init__()
self.p = p
def forward(self, x):
if not self.training:
return x
# 가중치 마스킹은 nn.Linear 레이어 수준에서 적용
mask = torch.bernoulli(torch.ones_like(x) * (1 - self.p))
return x * mask / (1 - self.p)
5.3 Data Augmentation
from torchvision import transforms
import torchvision.transforms.functional as TF
# 기본 이미지 증강
train_transform = transforms.Compose([
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomCrop(32, padding=4),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
transforms.RandomRotation(degrees=15),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Mixup Augmentation
def mixup_data(x, y, alpha=1.0):
if alpha > 0:
lam = np.random.beta(alpha, alpha)
else:
lam = 1
batch_size = x.size()[0]
index = torch.randperm(batch_size)
mixed_x = lam * x + (1 - lam) * x[index]
y_a, y_b = y, y[index]
return mixed_x, y_a, y_b, lam
def mixup_criterion(criterion, pred, y_a, y_b, lam):
return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)
# CutMix Augmentation
def cutmix_data(x, y, alpha=1.0):
lam = np.random.beta(alpha, alpha)
batch_size, C, H, W = x.size()
index = torch.randperm(batch_size)
# 랜덤 박스 좌표 계산
cut_ratio = np.sqrt(1. - lam)
cut_w = int(W * cut_ratio)
cut_h = int(H * cut_ratio)
cx = np.random.randint(W)
cy = np.random.randint(H)
bbx1 = np.clip(cx - cut_w // 2, 0, W)
bby1 = np.clip(cy - cut_h // 2, 0, H)
bbx2 = np.clip(cx + cut_w // 2, 0, W)
bby2 = np.clip(cy + cut_h // 2, 0, H)
mixed_x = x.clone()
mixed_x[:, :, bby1:bby2, bbx1:bbx2] = x[index, :, bby1:bby2, bbx1:bbx2]
# 실제 박스 크기에 맞게 lambda 재계산
lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (W * H))
return mixed_x, y, y[index], lam
5.4 Early Stopping
class EarlyStopping:
def __init__(self, patience=10, min_delta=0.001, restore_best_weights=True):
self.patience = patience
self.min_delta = min_delta
self.restore_best_weights = restore_best_weights
self.counter = 0
self.best_loss = None
self.best_weights = None
self.early_stop = False
def __call__(self, val_loss, model):
if self.best_loss is None:
self.best_loss = val_loss
self.best_weights = {k: v.clone() for k, v in model.state_dict().items()}
elif val_loss > self.best_loss - self.min_delta:
self.counter += 1
print(f"EarlyStopping counter: {self.counter}/{self.patience}")
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_loss = val_loss
self.best_weights = {k: v.clone() for k, v in model.state_dict().items()}
self.counter = 0
def restore(self, model):
if self.restore_best_weights and self.best_weights:
model.load_state_dict(self.best_weights)
print("Restored best model weights")
# 사용 예시
early_stopping = EarlyStopping(patience=10)
for epoch in range(max_epochs):
train_loss = train_one_epoch(model, train_loader, optimizer)
val_loss = evaluate(model, val_loader)
early_stopping(val_loss, model)
if early_stopping.early_stop:
print("Early stopping triggered!")
early_stopping.restore(model)
break
6. 정규화 레이어(Normalization Layers)
6.1 Batch Normalization
**Batch Normalization(배치 정규화)**은 2015년 Sergey Ioffe와 Christian Szegedy가 제안했습니다. 각 미니배치 내에서 특성을 정규화하여 내부 공변량 이동(Internal Covariate Shift) 문제를 해결합니다.
배치 정규화 과정은 다음과 같습니다.
1. 미니배치 평균: mu_B = (1/m) * sum(x_i)
2. 미니배치 분산: sigma_B^2 = (1/m) * sum((x_i - mu_B)^2)
3. 정규화: x_hat_i = (x_i - mu_B) / sqrt(sigma_B^2 + epsilon)
4. 스케일 및 이동: y_i = gamma * x_hat_i + beta
여기서 gamma(스케일)와 beta(이동)는 학습 가능한 파라미터입니다.
import torch
import torch.nn as nn
class BatchNormNet(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(784, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Linear(256, 10)
)
def forward(self, x):
return self.net(x)
# Conv 레이어에서 BatchNorm 사용
class ConvBNNet(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 64, 3, padding=1)
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
return self.relu(self.bn1(self.conv1(x)))
# 직접 구현
class BatchNorm(nn.Module):
def __init__(self, num_features, eps=1e-5, momentum=0.1):
super().__init__()
self.gamma = nn.Parameter(torch.ones(num_features))
self.beta = nn.Parameter(torch.zeros(num_features))
self.eps = eps
self.momentum = momentum
self.register_buffer('running_mean', torch.zeros(num_features))
self.register_buffer('running_var', torch.ones(num_features))
def forward(self, x):
if self.training:
mean = x.mean(dim=0)
var = x.var(dim=0, unbiased=False)
# 이동 평균 업데이트
self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean
self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var
else:
mean = self.running_mean
var = self.running_var
x_norm = (x - mean) / torch.sqrt(var + self.eps)
return self.gamma * x_norm + self.beta
6.2 Layer Normalization (Transformer에서 사용)
Layer Normalization은 배치 차원이 아닌 특성 차원에서 정규화합니다. 배치 크기에 독립적이어서 RNN, Transformer에 적합합니다.
class LayerNorm(nn.Module):
def __init__(self, normalized_shape, eps=1e-5):
super().__init__()
if isinstance(normalized_shape, int):
normalized_shape = (normalized_shape,)
self.normalized_shape = normalized_shape
self.gamma = nn.Parameter(torch.ones(normalized_shape))
self.beta = nn.Parameter(torch.zeros(normalized_shape))
self.eps = eps
def forward(self, x):
# 마지막 len(normalized_shape)개 차원에서 정규화
mean = x.mean(dim=-1, keepdim=True)
var = x.var(dim=-1, keepdim=True, unbiased=False)
x_norm = (x - mean) / torch.sqrt(var + self.eps)
return self.gamma * x_norm + self.beta
# PyTorch 내장 LayerNorm
layer_norm = nn.LayerNorm(512)
# Transformer 블록에서 사용
class TransformerBlock(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward):
super().__init__()
self.attention = nn.MultiheadAttention(d_model, nhead)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.ffn = nn.Sequential(
nn.Linear(d_model, dim_feedforward),
nn.GELU(),
nn.Linear(dim_feedforward, d_model)
)
def forward(self, x):
# Pre-LayerNorm (최신 GPT 스타일)
attn_out, _ = self.attention(self.norm1(x), self.norm1(x), self.norm1(x))
x = x + attn_out
x = x + self.ffn(self.norm2(x))
return x
6.3 Instance, Group, RMS Normalization
# Instance Normalization (각 샘플, 각 채널 독립적으로 정규화)
# 스타일 전이(Style Transfer)에 효과적
instance_norm = nn.InstanceNorm2d(64)
# Group Normalization (채널을 그룹으로 나누어 정규화)
# 배치 크기가 작을 때 BN 대안
group_norm = nn.GroupNorm(num_groups=8, num_channels=64)
# RMS Normalization (LLaMA, T5에서 사용)
# LayerNorm에서 평균 제거, 학습 속도 향상
class RMSNorm(nn.Module):
def __init__(self, dim, eps=1e-6):
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(dim))
def _norm(self, x):
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)
def forward(self, x):
return self.weight * self._norm(x.float()).type_as(x)
# 정규화 방법 비교 요약
# BatchNorm: 배치 차원 정규화, CNN에 적합, 배치 크기에 의존
# LayerNorm: 특성 차원 정규화, Transformer/RNN에 적합
# InstanceNorm: 채널별 정규화, 스타일 전이에 적합
# GroupNorm: 채널 그룹 정규화, 작은 배치에 적합
# RMSNorm: 경량화된 LayerNorm, LLM에 적합
7. 가중치 초기화(Weight Initialization)
7.1 Xavier/He 초기화
가중치 초기화는 학습의 시작점을 결정합니다. 잘못된 초기화는 기울기 소실이나 폭발을 유발할 수 있습니다.
import torch
import torch.nn as nn
import math
class WeightInitDemo(nn.Module):
def __init__(self, init_method='xavier'):
super().__init__()
self.layers = nn.ModuleList([
nn.Linear(256, 256) for _ in range(5)
])
self.apply_init(init_method)
def apply_init(self, method):
for layer in self.layers:
if method == 'zeros':
nn.init.zeros_(layer.weight) # 나쁜 초기화: 대칭성 문제
elif method == 'random_small':
nn.init.normal_(layer.weight, std=0.01)
elif method == 'xavier_uniform':
nn.init.xavier_uniform_(layer.weight) # sigmoid/tanh 활성화에 적합
elif method == 'xavier_normal':
nn.init.xavier_normal_(layer.weight)
elif method == 'kaiming_uniform':
nn.init.kaiming_uniform_(layer.weight, mode='fan_in', nonlinearity='relu')
elif method == 'kaiming_normal':
nn.init.kaiming_normal_(layer.weight, mode='fan_out', nonlinearity='relu') # ReLU에 적합
nn.init.zeros_(layer.bias)
def forward(self, x):
for layer in self.layers:
x = torch.relu(layer(x))
return x
# 초기화 비교 실험
import matplotlib.pyplot as plt
def check_activations(model, x):
activations = []
hooks = []
def hook(module, input, output):
activations.append(output.detach())
for layer in model.layers:
hooks.append(layer.register_forward_hook(hook))
with torch.no_grad():
model(x)
for hook in hooks:
hook.remove()
return activations
x = torch.randn(100, 256)
for method in ['zeros', 'random_small', 'xavier_uniform', 'kaiming_normal']:
model = WeightInitDemo(method)
acts = check_activations(model, x)
print(f"{method}:")
for i, act in enumerate(acts):
print(f" Layer {i+1}: mean={act.mean():.4f}, std={act.std():.4f}")
8. 그래디언트 문제 해결
8.1 기울기 소실과 폭발
기울기 소실(Vanishing Gradient): 역전파 시 기울기가 레이어를 거칠수록 0에 가까워져 초기 레이어가 학습되지 않는 문제입니다. sigmoid나 tanh 활성화에서 주로 발생합니다.
기울기 폭발(Exploding Gradient): 기울기가 기하급수적으로 커져 NaN이나 Inf가 발생하는 문제입니다. RNN에서 자주 발생합니다.
# Gradient Clipping
import torch.nn.utils as utils
# 방법 1: 기울기 노름(norm) 클리핑
max_norm = 1.0
total_norm = utils.clip_grad_norm_(model.parameters(), max_norm)
print(f"Gradient norm: {total_norm:.4f}")
# 방법 2: 기울기 값 클리핑
utils.clip_grad_value_(model.parameters(), clip_value=0.5)
# 학습 루프에서의 사용
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
for batch in train_loader:
optimizer.zero_grad()
loss = criterion(model(batch[0]), batch[1])
loss.backward()
# 역전파 후, 옵티마이저 step 전에 클리핑
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
8.2 Residual Connection (Skip Connection)
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
# 차원이 다를 경우 shortcut
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels, 1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
out = self.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x) # Skip Connection
out = self.relu(out)
return out
8.3 Gradient Checkpointing
매우 깊은 모델에서 메모리를 절약하기 위해 일부 활성화를 저장하지 않고, 역전파 시 재계산합니다.
from torch.utils.checkpoint import checkpoint, checkpoint_sequential
class DeepModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(*[
nn.Sequential(nn.Linear(512, 512), nn.ReLU())
for _ in range(20)
])
def forward(self, x):
# 일반: 모든 활성화 저장 (메모리 O(N))
# return self.layers(x)
# Gradient Checkpointing: 메모리 O(sqrt(N))
return checkpoint_sequential(self.layers, segments=5, input=x)
9. 전이학습(Transfer Learning) & 파인튜닝
9.1 Feature Extraction vs Fine-tuning
import torchvision.models as models
# Feature Extraction: 사전학습 가중치 동결
def feature_extraction(num_classes):
model = models.resnet50(pretrained=True)
# 모든 파라미터 동결
for param in model.parameters():
param.requires_grad = False
# 마지막 분류 레이어만 교체 (학습 가능)
model.fc = nn.Linear(model.fc.in_features, num_classes)
return model
# Fine-tuning: 일부 또는 전체 레이어 학습
def fine_tuning(num_classes, unfreeze_layers=None):
model = models.resnet50(pretrained=True)
# 처음에는 모두 동결
for param in model.parameters():
param.requires_grad = False
# 마지막 레이어 교체 및 활성화
model.fc = nn.Linear(model.fc.in_features, num_classes)
# 지정된 레이어 활성화
if unfreeze_layers:
for name, param in model.named_parameters():
for layer in unfreeze_layers:
if layer in name:
param.requires_grad = True
return model
9.2 점진적 레이어 해동 & Discriminative Learning Rates
def progressive_unfreezing_setup(model, base_lr=1e-4):
# ResNet50의 레이어 그룹
layer_groups = [
list(model.layer1.parameters()),
list(model.layer2.parameters()),
list(model.layer3.parameters()),
list(model.layer4.parameters()),
list(model.fc.parameters())
]
# 처음에는 fc만 학습
for group in layer_groups[:-1]:
for p in group:
p.requires_grad = False
return layer_groups
def discriminative_lr_optimizer(model, base_lr=1e-4, lr_multiplier=10):
# 레이어별 다른 학습률 설정 (초기 레이어는 낮은 lr, 후기 레이어는 높은 lr)
param_groups = [
{'params': model.layer1.parameters(), 'lr': base_lr / (lr_multiplier**3)},
{'params': model.layer2.parameters(), 'lr': base_lr / (lr_multiplier**2)},
{'params': model.layer3.parameters(), 'lr': base_lr / lr_multiplier},
{'params': model.layer4.parameters(), 'lr': base_lr},
{'params': model.fc.parameters(), 'lr': base_lr * lr_multiplier},
]
return torch.optim.Adam(param_groups)
9.3 LoRA (Low-Rank Adaptation)
LoRA는 대형 언어모델의 파인튜닝을 위한 파라미터 효율적 기법입니다. 원래 가중치 행렬을 동결하고, 낮은 랭크의 행렬 분해를 학습합니다.
원래 가중치 행렬 W의 크기가 d x k일 때, LoRA는 W' = W + BA를 학습합니다. 여기서 B는 d x r, A는 r x k 행렬이며 랭크 r은 d와 k보다 훨씬 작게 설정됩니다.
import torch
import torch.nn as nn
class LoRALayer(nn.Module):
def __init__(self, in_features, out_features, rank=4, alpha=1.0):
super().__init__()
self.rank = rank
self.alpha = alpha
self.scaling = alpha / rank
# 원래 가중치 (동결)
self.weight = nn.Parameter(
torch.randn(out_features, in_features),
requires_grad=False
)
# LoRA 행렬 A (랜덤 초기화)
self.lora_A = nn.Parameter(torch.randn(rank, in_features) * 0.01)
# LoRA 행렬 B (0으로 초기화 -> 학습 시작 시 원래 모델과 동일)
self.lora_B = nn.Parameter(torch.zeros(out_features, rank))
self.bias = nn.Parameter(torch.zeros(out_features))
def forward(self, x):
# 원래 출력 + LoRA 변화량
base_output = nn.functional.linear(x, self.weight, self.bias)
lora_output = (x @ self.lora_A.T @ self.lora_B.T) * self.scaling
return base_output + lora_output
# HuggingFace PEFT 라이브러리 사용 (실제 LLM 파인튜닝)
from peft import get_peft_model, LoraConfig, TaskType
lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=8, # LoRA 랭크
lora_alpha=32, # 스케일링 팩터
target_modules=["q_proj", "v_proj"], # 적용할 모듈
lora_dropout=0.05,
bias="none"
)
# peft_model = get_peft_model(base_model, lora_config)
# peft_model.print_trainable_parameters()
# trainable params: 4,194,304 || all params: 6,742,609,920 || trainable%: 0.062
10. 하이퍼파라미터 튜닝
10.1 Optuna를 활용한 베이지안 최적화
import optuna
import torch
import torch.nn as nn
def objective(trial):
# 하이퍼파라미터 탐색 공간 정의
lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
n_layers = trial.suggest_int('n_layers', 1, 5)
n_units = trial.suggest_categorical('n_units', [64, 128, 256, 512])
dropout_rate = trial.suggest_float('dropout', 0.0, 0.5)
optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'AdamW', 'SGD'])
# 모델 생성
layers = []
in_dim = 784
for _ in range(n_layers):
layers.extend([
nn.Linear(in_dim, n_units),
nn.ReLU(),
nn.Dropout(dropout_rate)
])
in_dim = n_units
layers.append(nn.Linear(in_dim, 10))
model = nn.Sequential(*layers)
# 옵티마이저 선택
if optimizer_name == 'Adam':
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
elif optimizer_name == 'AdamW':
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
else:
optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
# 학습 및 검증
# ... (학습 루프 생략)
val_accuracy = 0.95 # 실제로는 학습 후 계산
return val_accuracy
# Optuna 스터디 생성 및 실행
study = optuna.create_study(
direction='maximize',
sampler=optuna.samplers.TPESampler(), # Tree-structured Parzen Estimator
pruner=optuna.pruners.MedianPruner() # 성능 나쁜 트라이얼 조기 종료
)
study.optimize(objective, n_trials=100, timeout=3600)
print(f"Best trial: {study.best_trial.value:.4f}")
print(f"Best params: {study.best_trial.params}")
# 결과 시각화
# optuna.visualization.plot_optimization_history(study)
# optuna.visualization.plot_param_importances(study)
11. 혼합 정밀도 학습(Mixed Precision Training)
11.1 FP32 vs FP16 vs BF16
| 형식 | 지수 비트 | 가수 비트 | 표현 범위 | 주 용도 |
|---|---|---|---|---|
| FP32 | 8 | 23 | ±3.4e38 | 기본 학습 |
| FP16 | 5 | 10 | ±65504 | 추론/학습 (오버플로우 주의) |
| BF16 | 8 | 7 | ±3.4e38 | LLM 학습 (A100, TPU) |
11.2 PyTorch AMP (Automatic Mixed Precision)
import torch
from torch.cuda.amp import autocast, GradScaler
# GradScaler: FP16 언더플로우 방지를 위한 손실 스케일링
scaler = GradScaler()
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(num_epochs):
for inputs, labels in train_loader:
inputs, labels = inputs.cuda(), labels.cuda()
optimizer.zero_grad()
# autocast 컨텍스트에서 FP16 연산
with autocast(dtype=torch.float16):
outputs = model(inputs)
loss = criterion(outputs, labels)
# 손실 스케일링 후 역전파
scaler.scale(loss).backward()
# 그래디언트 클리핑 (스케일 조정 후)
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 옵티마이저 스텝 (NaN/Inf 그래디언트 자동 건너뜀)
scaler.step(optimizer)
scaler.update()
# BF16 사용 (더 안정적, Ampere 이상 GPU 필요)
with autocast(dtype=torch.bfloat16):
outputs = model(inputs)
loss = criterion(outputs, labels)
12. 분산 학습(Distributed Training)
12.1 데이터 병렬화 (Data Parallelism)
데이터를 여러 GPU에 분산시켜 각 GPU가 독립적으로 순전파와 역전파를 수행하고, 그래디언트를 집계합니다.
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
def setup(rank, world_size):
import os
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group('nccl', rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def train_ddp(rank, world_size, model_class, dataset):
setup(rank, world_size)
# 각 프로세스가 자신의 GPU 사용
device = torch.device(f'cuda:{rank}')
model = model_class().to(device)
# DDP로 감싸기
model = DDP(model, device_ids=[rank])
# DistributedSampler: 각 프로세스가 다른 데이터 샘플링
sampler = DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank,
shuffle=True
)
loader = torch.utils.data.DataLoader(
dataset,
batch_size=32,
sampler=sampler,
num_workers=4,
pin_memory=True
)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3 * world_size)
criterion = nn.CrossEntropyLoss()
for epoch in range(num_epochs):
sampler.set_epoch(epoch) # 에폭마다 셔플 시드 변경
for inputs, labels in loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
if rank == 0:
print(f"Epoch {epoch}: Loss = {loss.item():.4f}")
cleanup()
# 멀티프로세스 실행
import torch.multiprocessing as mp
if __name__ == '__main__':
world_size = torch.cuda.device_count()
mp.spawn(train_ddp, args=(world_size, MyModel, dataset), nprocs=world_size, join=True)
12.2 FSDP (Fully Sharded Data Parallel)
FSDP는 모델 파라미터, 그래디언트, 옵티마이저 상태를 모든 GPU에 분산시켜 메모리를 절약합니다. GPT-3급 초대형 모델 학습에 적합합니다.
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import ShardingStrategy, MixedPrecision
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
import functools
# Mixed Precision 설정
bf16_policy = MixedPrecision(
param_dtype=torch.bfloat16,
reduce_dtype=torch.bfloat16,
buffer_dtype=torch.bfloat16
)
# Transformer 레이어 자동 래핑 정책
auto_wrap_policy = functools.partial(
transformer_auto_wrap_policy,
transformer_layer_cls={TransformerBlock}
)
# FSDP 모델 생성
model = FSDP(
model,
sharding_strategy=ShardingStrategy.FULL_SHARD, # 완전 분산
mixed_precision=bf16_policy,
auto_wrap_policy=auto_wrap_policy,
device_id=rank
)
12.3 Gradient Accumulation
GPU 메모리가 부족할 때 작은 배치를 여러 번 사용하여 큰 배치 효과를 냅니다.
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
# 유효 배치 크기 = micro_batch_size * accumulation_steps
micro_batch_size = 8
accumulation_steps = 8 # 유효 배치 크기: 64
optimizer.zero_grad()
for step, (inputs, labels) in enumerate(train_loader):
inputs, labels = inputs.cuda(), labels.cuda()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
# 손실을 accumulation_steps로 나눔 (평균 유지)
loss = loss / accumulation_steps
scaler.scale(loss).backward()
if (step + 1) % accumulation_steps == 0:
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
13. 대규모 언어모델 학습 기법
13.1 Instruction Tuning
Instruction Tuning은 모델이 자연어 지시문을 따르도록 학습시키는 기법입니다. FLAN, InstructGPT, LLaMA-2의 성공에 핵심적인 역할을 했습니다.
# Instruction Tuning 데이터 포맷 예시
instruction_data = [
{
"instruction": "다음 텍스트의 감정을 분석하세요.",
"input": "오늘 날씨가 너무 좋아서 기분이 최고예요!",
"output": "긍정적인 감정입니다. 날씨에 대한 만족감과 행복감이 표현되어 있습니다."
},
{
"instruction": "주어진 정보를 바탕으로 요약문을 작성하세요.",
"input": "...(긴 텍스트)...",
"output": "...(요약)..."
}
]
# Alpaca 형식으로 프롬프트 구성
def format_instruction(sample):
if sample.get('input'):
return f"""### Instruction:
{sample['instruction']}
### Input:
{sample['input']}
### Response:
{sample['output']}"""
else:
return f"""### Instruction:
{sample['instruction']}
### Response:
{sample['output']}"""
13.2 RLHF (인간 피드백 강화학습)
RLHF는 세 단계로 이루어집니다.
1단계: SFT(Supervised Fine-tuning) - 인간이 작성한 고품질 응답으로 파인튜닝 2단계: Reward Model 학습 - 여러 응답 중 좋은 응답을 선호하도록 학습 3단계: PPO로 정책 최적화 - Reward Model을 사용해 강화학습
# 단계 2: Reward Model (Bradley-Terry 모델)
class RewardModel(nn.Module):
def __init__(self, base_model):
super().__init__()
self.base_model = base_model
self.reward_head = nn.Linear(base_model.config.hidden_size, 1)
def forward(self, input_ids, attention_mask):
outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
# 마지막 토큰의 hidden state 사용
last_hidden = outputs.last_hidden_state[:, -1, :]
reward = self.reward_head(last_hidden).squeeze(-1)
return reward
# Reward Model 학습 (선호도 손실)
def preference_loss(reward_chosen, reward_rejected):
# Bradley-Terry 모델: p(chosen > rejected) = sigmoid(r_chosen - r_rejected)
return -torch.log(torch.sigmoid(reward_chosen - reward_rejected)).mean()
13.3 DPO (Direct Preference Optimization)
DPO는 RLHF의 복잡한 PPO 학습을 단순화하여, 선호도 데이터를 직접 최적화합니다.
import torch
import torch.nn.functional as F
def dpo_loss(
policy_chosen_logps, # 정책 모델의 선호 응답 로그 확률
policy_rejected_logps, # 정책 모델의 비선호 응답 로그 확률
reference_chosen_logps, # 참조 모델의 선호 응답 로그 확률
reference_rejected_logps, # 참조 모델의 비선호 응답 로그 확률
beta=0.1 # KL 페널티 강도
):
# 정책과 참조 모델 간의 log ratio
chosen_rewards = beta * (policy_chosen_logps - reference_chosen_logps)
rejected_rewards = beta * (policy_rejected_logps - reference_rejected_logps)
# DPO 손실: -log(sigmoid(chosen_rewards - rejected_rewards))
loss = -F.logsigmoid(chosen_rewards - rejected_rewards).mean()
# 로깅용 보상
chosen_reward = chosen_rewards.detach().mean()
rejected_reward = rejected_rewards.detach().mean()
reward_accuracy = (chosen_rewards > rejected_rewards).float().mean()
return loss, chosen_reward, rejected_reward, reward_accuracy
14. 실전 학습 파이프라인 완성
14.1 종합 학습 루프
import torch
import torch.nn as nn
from torch.cuda.amp import autocast, GradScaler
import wandb # 실험 추적
class Trainer:
def __init__(
self,
model,
train_loader,
val_loader,
optimizer,
scheduler,
criterion,
device='cuda',
use_amp=True,
grad_clip=1.0,
accumulation_steps=1,
log_wandb=False
):
self.model = model.to(device)
self.train_loader = train_loader
self.val_loader = val_loader
self.optimizer = optimizer
self.scheduler = scheduler
self.criterion = criterion
self.device = device
self.use_amp = use_amp
self.grad_clip = grad_clip
self.accumulation_steps = accumulation_steps
self.scaler = GradScaler() if use_amp else None
self.log_wandb = log_wandb
if log_wandb:
wandb.watch(model, log='all', log_freq=100)
def train_epoch(self):
self.model.train()
total_loss = 0
self.optimizer.zero_grad()
for step, (inputs, labels) in enumerate(self.train_loader):
inputs, labels = inputs.to(self.device), labels.to(self.device)
if self.use_amp:
with autocast():
outputs = self.model(inputs)
loss = self.criterion(outputs, labels) / self.accumulation_steps
self.scaler.scale(loss).backward()
else:
outputs = self.model(inputs)
loss = self.criterion(outputs, labels) / self.accumulation_steps
loss.backward()
if (step + 1) % self.accumulation_steps == 0:
if self.use_amp:
self.scaler.unscale_(self.optimizer)
if self.grad_clip:
nn.utils.clip_grad_norm_(self.model.parameters(), self.grad_clip)
if self.use_amp:
self.scaler.step(self.optimizer)
self.scaler.update()
else:
self.optimizer.step()
if self.scheduler:
self.scheduler.step()
self.optimizer.zero_grad()
total_loss += loss.item() * self.accumulation_steps
return total_loss / len(self.train_loader)
@torch.no_grad()
def evaluate(self):
self.model.eval()
total_loss = 0
correct = 0
total = 0
for inputs, labels in self.val_loader:
inputs, labels = inputs.to(self.device), labels.to(self.device)
with autocast() if self.use_amp else torch.no_grad():
outputs = self.model(inputs)
loss = self.criterion(outputs, labels)
total_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
return total_loss / len(self.val_loader), 100. * correct / total
def fit(self, epochs, save_path=None):
best_val_acc = 0
early_stopping = EarlyStopping(patience=10)
for epoch in range(epochs):
train_loss = self.train_epoch()
val_loss, val_acc = self.evaluate()
print(f"Epoch {epoch+1}/{epochs}: "
f"Train Loss: {train_loss:.4f}, "
f"Val Loss: {val_loss:.4f}, "
f"Val Acc: {val_acc:.2f}%")
if self.log_wandb:
wandb.log({
'train_loss': train_loss,
'val_loss': val_loss,
'val_acc': val_acc,
'lr': self.optimizer.param_groups[0]['lr']
})
if val_acc > best_val_acc:
best_val_acc = val_acc
if save_path:
torch.save(self.model.state_dict(), save_path)
early_stopping(val_loss, self.model)
if early_stopping.early_stop:
print("Early stopping!")
break
return best_val_acc
결론 및 베스트 프랙티스
딥러닝 학습에서 고려해야 할 핵심 원칙을 정리하면 다음과 같습니다.
옵티마이저 선택
- 일반 작업: AdamW (lr=1e-3 ~ 1e-4, weight_decay=0.01)
- Transformer: AdamW + Warmup + Cosine Schedule
- 대규모 배치: LAMB 또는 LARS
- 메모리 제약: Lion
정규화 전략
- Dropout은 주로 0.1 ~ 0.5 사용
- 작은 데이터셋: 강한 정규화 (더 큰 weight decay, 더 높은 dropout)
- 대용량 데이터: 약한 정규화 또는 없음
학습률 스케줄링
- CNN: OneCycleLR 또는 Step Decay
- Transformer: Warmup + Cosine 또는 Inverse Square Root
혼합 정밀도
- 항상 AMP 사용 (속도 1.5~3배, 메모리 2배 절약)
- A100/H100 이상: BF16 권장
- 이전 GPU: FP16 + Loss Scaling
분산 학습
- 다중 GPU 단일 서버: DDP + NCCL
- 수십억 파라미터 모델: FSDP
- 항상 Gradient Accumulation으로 유효 배치 크기 키우기