- Authors

- Name
- Youngju Kim
- @fjvbn20031
들어가며
GPT, Llama, Mistral 같은 대규모 언어 모델(LLM)은 현재 AI 혁명의 중심에 있습니다. 하지만 이 모델들이 어떻게 동작하는지 코드 수준에서 이해하는 사람은 많지 않습니다. 이 가이드에서는 "miniGPT"라는 소형 GPT를 처음부터 PyTorch로 직접 구현하며, 현대 LLM의 모든 핵심 개념을 마스터합니다.
1. LLM 전체 아키텍처 개요
1.1 사전학습 언어 모델의 파이프라인
현대 LLM의 개발 파이프라인은 세 단계로 나뉩니다.
-
사전학습 (Pretraining): 수십억 개의 텍스트 토큰으로 다음 토큰 예측(next token prediction)을 학습합니다. 이 과정에서 모델은 언어의 통계적 패턴, 지식, 추론 능력을 습득합니다.
-
지시 파인튜닝 (Instruction Fine-Tuning / SFT): 사람이 작성한 질문-답변 쌍으로 모델이 지시를 따르도록 학습합니다.
-
선호도 학습 (RLHF / DPO): 사람의 선호도를 학습하여 더 유용하고 안전한 응답을 생성하도록 개선합니다.
1.2 miniGPT 사양
이 가이드에서 구현할 miniGPT의 사양:
| 파라미터 | 값 |
|---|---|
| 어휘 크기 | 50,257 (GPT-2와 동일) |
| 컨텍스트 길이 | 1024 |
| 임베딩 차원 | 768 |
| 레이어 수 | 12 |
| Attention 헤드 수 | 12 |
| FFN 확장 비율 | 4x |
| 총 파라미터 | 약 124M |
이는 GPT-2 Small과 동일한 사양으로, 단일 GPU에서 학습 가능합니다.
2. 토크나이저 구현
2.1 BPE 알고리즘
BPE(Byte-Pair Encoding)는 대부분의 현대 LLM이 사용하는 토크나이저 알고리즘입니다.
from collections import defaultdict
from typing import List, Tuple, Dict
class SimpleBPETokenizer:
"""BPE 토크나이저 처음부터 구현"""
def __init__(self):
self.vocab = {}
self.merges = {}
self.special_tokens = {
"<|endoftext|>": 50256,
"<|padding|>": 50257,
}
def get_stats(self, vocab: Dict[str, int]) -> Dict[Tuple, int]:
"""인접한 토큰 쌍의 빈도를 계산"""
pairs = defaultdict(int)
for word, freq in vocab.items():
symbols = word.split()
for i in range(len(symbols) - 1):
pairs[(symbols[i], symbols[i+1])] += freq
return pairs
def merge_vocab(self, pair: Tuple, v_in: Dict) -> Dict:
"""가장 빈번한 쌍을 병합"""
v_out = {}
bigram = " ".join(pair)
replacement = "".join(pair)
for word in v_in:
w_out = word.replace(bigram, replacement)
v_out[w_out] = v_in[word]
return v_out
def train(self, corpus: List[str], vocab_size: int = 1000):
"""BPE 어휘 학습"""
# 1. 문자 단위로 초기 어휘 구성
word_freq = defaultdict(int)
for text in corpus:
for word in text.split():
word_freq[" ".join(list(word)) + " </w>"] += 1
vocab = dict(word_freq)
# 2. vocab_size에 도달할 때까지 반복 병합
for i in range(vocab_size):
pairs = self.get_stats(vocab)
if not pairs:
break
best = max(pairs, key=pairs.get)
vocab = self.merge_vocab(best, vocab)
self.merges[best] = i
print(f"Step {i}: merged {best} (freq={pairs[best]})")
# 3. 최종 어휘 구성
for word in vocab:
for token in word.split():
if token not in self.vocab:
self.vocab[token] = len(self.vocab)
return self.vocab, self.merges
2.2 tiktoken 호환 토크나이저 사용
실제로는 OpenAI의 tiktoken 라이브러리를 사용하는 것이 훨씬 효율적입니다.
import tiktoken
import torch
from typing import List
class GPTTokenizer:
"""tiktoken 기반 GPT 토크나이저 래퍼"""
def __init__(self, encoding_name: str = "gpt2"):
self.enc = tiktoken.get_encoding(encoding_name)
self.vocab_size = self.enc.n_vocab
self.eot_token = self.enc.eot_token # 50256
def encode(self, text: str, add_special_tokens: bool = True) -> List[int]:
"""텍스트를 토큰 ID 리스트로 변환"""
ids = self.enc.encode(text, allowed_special={"<|endoftext|>"})
if add_special_tokens:
ids = [self.eot_token] + ids
return ids
def decode(self, ids: List[int]) -> str:
"""토큰 ID 리스트를 텍스트로 변환"""
return self.enc.decode(ids)
def encode_batch(self, texts: List[str]) -> List[List[int]]:
"""배치 인코딩"""
return [self.encode(t) for t in texts]
def __len__(self):
return self.vocab_size
# 사용 예시
tokenizer = GPTTokenizer()
text = "안녕하세요! GPT를 처음부터 만들어봅시다."
ids = tokenizer.encode(text)
print(f"토큰 수: {len(ids)}")
print(f"토큰 ID: {ids}")
decoded = tokenizer.decode(ids)
print(f"디코딩: {decoded}")
3. 데이터 준비
3.1 텍스트 데이터셋 클래스
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os
class TextDataset(Dataset):
"""슬라이딩 윈도우 방식의 언어 모델링 데이터셋"""
def __init__(
self,
data_path: str,
tokenizer: GPTTokenizer,
seq_len: int = 1024,
stride: int = 512
):
self.seq_len = seq_len
self.stride = stride
# 텍스트 파일 로딩 및 토크나이징
print(f"데이터 로딩: {data_path}")
with open(data_path, "r", encoding="utf-8") as f:
text = f.read()
print(f"총 문자 수: {len(text):,}")
self.tokens = tokenizer.encode(text, add_special_tokens=False)
print(f"총 토큰 수: {len(self.tokens):,}")
# numpy 배열로 변환 (메모리 효율)
self.tokens = np.array(self.tokens, dtype=np.int32)
def __len__(self):
return max(0, (len(self.tokens) - self.seq_len) // self.stride)
def __getitem__(self, idx):
start = idx * self.stride
end = start + self.seq_len + 1 # +1은 레이블용
chunk = self.tokens[start:end]
# 입력: tokens[0:seq_len], 레이블: tokens[1:seq_len+1]
x = torch.tensor(chunk[:-1], dtype=torch.long)
y = torch.tensor(chunk[1:], dtype=torch.long)
return x, y
def create_dataloader(
data_path: str,
tokenizer: GPTTokenizer,
batch_size: int = 8,
seq_len: int = 1024,
stride: int = 512,
num_workers: int = 4,
shuffle: bool = True
) -> DataLoader:
dataset = TextDataset(data_path, tokenizer, seq_len, stride)
print(f"데이터셋 크기: {len(dataset):,} 샘플")
return DataLoader(
dataset,
batch_size=batch_size,
shuffle=shuffle,
num_workers=num_workers,
pin_memory=True,
drop_last=True
)
4. 모델 아키텍처 구현
4.1 설정 클래스
from dataclasses import dataclass
@dataclass
class GPTConfig:
"""GPT 모델 설정"""
vocab_size: int = 50257
max_seq_len: int = 1024
n_embd: int = 768
n_layer: int = 12
n_head: int = 12
dropout: float = 0.1
bias: bool = False # bias 없이도 좋은 성능
# FFN 확장 비율
ffn_mult: int = 4
# SwiGLU 사용 여부
use_swiglu: bool = True
# RoPE 사용 여부
use_rope: bool = True
4.2 RMSNorm
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
class RMSNorm(nn.Module):
"""Root Mean Square Layer Normalization (Llama 스타일)"""
def __init__(self, dim: int, eps: float = 1e-6):
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(dim))
def _norm(self, x):
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)
def forward(self, x):
return self.weight * self._norm(x.float()).type_as(x)
4.3 Rotary Position Embedding (RoPE)
class RotaryEmbedding(nn.Module):
"""Rotary Position Embedding (RoPE)"""
def __init__(self, dim: int, max_seq_len: int = 2048, base: int = 10000):
super().__init__()
self.dim = dim
self.max_seq_len = max_seq_len
# 주파수 계산: theta_i = base^(-2i/dim)
inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim))
self.register_buffer("inv_freq", inv_freq)
# 코사인/사인 캐시 미리 계산
self._build_cache(max_seq_len)
def _build_cache(self, seq_len: int):
t = torch.arange(seq_len, device=self.inv_freq.device).float()
freqs = torch.einsum("i,j->ij", t, self.inv_freq)
emb = torch.cat([freqs, freqs], dim=-1)
self.register_buffer("cos_cache", emb.cos()[None, None, :, :])
self.register_buffer("sin_cache", emb.sin()[None, None, :, :])
def rotate_half(self, x):
"""텐서의 절반을 회전"""
x1 = x[..., :x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2:]
return torch.cat([-x2, x1], dim=-1)
def forward(self, q: torch.Tensor, k: torch.Tensor, seq_len: int):
cos = self.cos_cache[:, :, :seq_len, :].to(q.dtype)
sin = self.sin_cache[:, :, :seq_len, :].to(q.dtype)
q_rot = (q * cos) + (self.rotate_half(q) * sin)
k_rot = (k * cos) + (self.rotate_half(k) * sin)
return q_rot, k_rot
4.4 Multi-Head Causal Attention with KV Cache
class CausalSelfAttention(nn.Module):
"""인과 자기 어텐션 (KV 캐시 지원)"""
def __init__(self, config: GPTConfig):
super().__init__()
assert config.n_embd % config.n_head == 0
self.n_head = config.n_head
self.n_embd = config.n_embd
self.head_dim = config.n_embd // config.n_head
# Q, K, V 프로젝션 (한 번에 처리)
self.qkv_proj = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
self.out_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
self.attn_dropout = nn.Dropout(config.dropout)
self.resid_dropout = nn.Dropout(config.dropout)
# RoPE
if config.use_rope:
self.rope = RotaryEmbedding(self.head_dim, config.max_seq_len)
else:
self.rope = None
# 인과 마스크 (하삼각 행렬)
self.register_buffer(
"causal_mask",
torch.tril(torch.ones(config.max_seq_len, config.max_seq_len))
.view(1, 1, config.max_seq_len, config.max_seq_len)
)
# KV 캐시
self.use_cache = False
self.cache_k = None
self.cache_v = None
def forward(
self,
x: torch.Tensor,
use_cache: bool = False,
past_kv=None
):
B, T, C = x.shape # batch, seq_len, n_embd
# Q, K, V 분리
qkv = self.qkv_proj(x)
q, k, v = qkv.split(self.n_embd, dim=2)
# 헤드 분할: [B, T, C] -> [B, n_head, T, head_dim]
q = q.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
k = k.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
v = v.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
# RoPE 적용
if self.rope is not None:
q, k = self.rope(q, k, T)
# KV 캐시 처리
if past_kv is not None:
past_k, past_v = past_kv
k = torch.cat([past_k, k], dim=2)
v = torch.cat([past_v, v], dim=2)
present_kv = (k, v) if use_cache else None
kv_len = k.shape[2]
# Scaled Dot-Product Attention
# PyTorch 2.0+ Flash Attention 자동 활용
if hasattr(F, "scaled_dot_product_attention"):
# Flash Attention 사용 (더 빠르고 메모리 효율적)
y = F.scaled_dot_product_attention(
q, k, v,
attn_mask=None,
dropout_p=self.attn_dropout.p if self.training else 0.0,
is_causal=True # 자동으로 인과 마스크 적용
)
else:
# 수동 구현
scale = 1.0 / math.sqrt(self.head_dim)
attn = (q @ k.transpose(-2, -1)) * scale
mask = self.causal_mask[:, :, :T, :kv_len]
attn = attn.masked_fill(mask == 0, float("-inf"))
attn = F.softmax(attn, dim=-1)
attn = self.attn_dropout(attn)
y = attn @ v
# 헤드 합치기: [B, n_head, T, head_dim] -> [B, T, C]
y = y.transpose(1, 2).contiguous().view(B, T, C)
y = self.resid_dropout(self.out_proj(y))
return y, present_kv
4.5 SwiGLU Feed-Forward Network
class SwiGLUFFN(nn.Module):
"""SwiGLU 활성화 함수를 사용하는 FFN (Llama 스타일)"""
def __init__(self, config: GPTConfig):
super().__init__()
hidden_dim = int(config.n_embd * config.ffn_mult * 2 / 3)
# 64의 배수로 반올림 (하드웨어 최적화)
hidden_dim = ((hidden_dim + 63) // 64) * 64
self.gate_proj = nn.Linear(config.n_embd, hidden_dim, bias=False)
self.up_proj = nn.Linear(config.n_embd, hidden_dim, bias=False)
self.down_proj = nn.Linear(hidden_dim, config.n_embd, bias=False)
self.dropout = nn.Dropout(config.dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# SwiGLU: gate * SiLU(x)
gate = F.silu(self.gate_proj(x))
up = self.up_proj(x)
return self.dropout(self.down_proj(gate * up))
class GPTFFN(nn.Module):
"""원래 GPT 스타일 GELU FFN"""
def __init__(self, config: GPTConfig):
super().__init__()
hidden_dim = config.n_embd * config.ffn_mult
self.fc1 = nn.Linear(config.n_embd, hidden_dim, bias=config.bias)
self.fc2 = nn.Linear(hidden_dim, config.n_embd, bias=config.bias)
self.dropout = nn.Dropout(config.dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = F.gelu(self.fc1(x))
x = self.dropout(self.fc2(x))
return x
4.6 Transformer Block
class TransformerBlock(nn.Module):
"""Pre-RMSNorm Transformer 블록"""
def __init__(self, config: GPTConfig):
super().__init__()
self.norm1 = RMSNorm(config.n_embd)
self.attn = CausalSelfAttention(config)
self.norm2 = RMSNorm(config.n_embd)
if config.use_swiglu:
self.ffn = SwiGLUFFN(config)
else:
self.ffn = GPTFFN(config)
def forward(self, x: torch.Tensor, use_cache: bool = False, past_kv=None):
# Pre-Norm + Attention + Residual
attn_out, present_kv = self.attn(
self.norm1(x),
use_cache=use_cache,
past_kv=past_kv
)
x = x + attn_out
# Pre-Norm + FFN + Residual
x = x + self.ffn(self.norm2(x))
return x, present_kv
4.7 전체 GPT 모델
class MiniGPT(nn.Module):
"""miniGPT 전체 모델"""
def __init__(self, config: GPTConfig):
super().__init__()
self.config = config
# 임베딩 레이어
self.token_emb = nn.Embedding(config.vocab_size, config.n_embd)
# RoPE를 사용하지 않는 경우 학습 가능한 위치 임베딩
if not config.use_rope:
self.pos_emb = nn.Embedding(config.max_seq_len, config.n_embd)
self.emb_dropout = nn.Dropout(config.dropout)
# Transformer 레이어
self.layers = nn.ModuleList([
TransformerBlock(config) for _ in range(config.n_layer)
])
# 최종 정규화
self.norm_final = RMSNorm(config.n_embd)
# 언어 모델 헤드 (어휘 크기로 투영)
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
# 가중치 공유 (입력 임베딩 == 출력 임베딩)
self.lm_head.weight = self.token_emb.weight
# 가중치 초기화
self.apply(self._init_weights)
# 특별 초기화: 잔차 스트림 스케일링
for pn, p in self.named_parameters():
if pn.endswith("out_proj.weight") or pn.endswith("down_proj.weight"):
nn.init.normal_(p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer))
print(f"파라미터 수: {self.count_params() / 1e6:.1f}M")
def _init_weights(self, module):
if isinstance(module, nn.Linear):
nn.init.normal_(module.weight, mean=0.0, std=0.02)
if module.bias is not None:
nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
nn.init.normal_(module.weight, mean=0.0, std=0.02)
def count_params(self) -> int:
return sum(p.numel() for p in self.parameters())
def forward(
self,
input_ids: torch.Tensor,
targets: torch.Tensor = None,
use_cache: bool = False,
past_kvs: list = None
):
B, T = input_ids.shape
device = input_ids.device
# 토큰 임베딩
x = self.token_emb(input_ids) # [B, T, n_embd]
# 위치 임베딩 (RoPE 미사용 시)
if not self.config.use_rope:
positions = torch.arange(T, device=device)
x = x + self.pos_emb(positions)
x = self.emb_dropout(x)
# Transformer 레이어 통과
present_kvs = []
for i, layer in enumerate(self.layers):
past_kv = past_kvs[i] if past_kvs is not None else None
x, present_kv = layer(x, use_cache=use_cache, past_kv=past_kv)
present_kvs.append(present_kv)
# 최종 정규화
x = self.norm_final(x)
if targets is not None:
# 학습: 전체 시퀀스에 대해 손실 계산
logits = self.lm_head(x)
loss = F.cross_entropy(
logits.view(-1, logits.size(-1)),
targets.view(-1),
ignore_index=-1
)
return logits, loss
else:
# 추론: 마지막 토큰의 로짓만 필요
logits = self.lm_head(x[:, [-1], :])
return logits, present_kvs
@classmethod
def from_pretrained(cls, model_path: str):
"""저장된 체크포인트에서 모델 로딩"""
checkpoint = torch.load(model_path, map_location="cpu")
config = checkpoint["config"]
model = cls(config)
model.load_state_dict(checkpoint["model"])
return model
5. 사전학습 구현
5.1 학습률 스케줄러
import math
def get_cosine_schedule_with_warmup(
optimizer,
warmup_steps: int,
total_steps: int,
min_lr_ratio: float = 0.1
):
"""Cosine LR 스케줄 with warmup"""
def lr_lambda(current_step: int):
if current_step < warmup_steps:
# Linear warmup
return current_step / max(1, warmup_steps)
else:
# Cosine decay
progress = (current_step - warmup_steps) / max(1, total_steps - warmup_steps)
cosine_decay = 0.5 * (1.0 + math.cos(math.pi * progress))
return max(min_lr_ratio, cosine_decay)
from torch.optim.lr_scheduler import LambdaLR
return LambdaLR(optimizer, lr_lambda)
5.2 학습 루프
import torch
import torch.nn as nn
from torch.optim import AdamW
import os
import time
from contextlib import nullcontext
class Trainer:
"""miniGPT 사전학습 트레이너"""
def __init__(
self,
model: MiniGPT,
train_dataloader,
val_dataloader,
config: dict
):
self.model = model
self.train_dl = train_dataloader
self.val_dl = val_dataloader
self.config = config
self.device = config.get("device", "cuda" if torch.cuda.is_available() else "cpu")
# Mixed Precision 설정
self.dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16
self.scaler = torch.cuda.amp.GradScaler(enabled=(self.dtype == torch.float16))
self.ctx = torch.amp.autocast(device_type="cuda", dtype=self.dtype)
# 옵티마이저 설정
self.optimizer = self._create_optimizer()
# 학습률 스케줄러
total_steps = len(train_dataloader) * config["num_epochs"]
warmup_steps = int(total_steps * config.get("warmup_ratio", 0.05))
self.scheduler = get_cosine_schedule_with_warmup(
self.optimizer,
warmup_steps=warmup_steps,
total_steps=total_steps
)
self.step = 0
self.best_val_loss = float("inf")
def _create_optimizer(self):
"""Weight decay를 적용할 파라미터와 그렇지 않을 파라미터 분리"""
decay_params = []
no_decay_params = []
for name, param in self.model.named_parameters():
if not param.requires_grad:
continue
# 1D 텐서 (bias, norm weight)는 weight decay 적용 안함
if param.dim() == 1 or "emb" in name:
no_decay_params.append(param)
else:
decay_params.append(param)
optim_groups = [
{"params": decay_params, "weight_decay": self.config.get("weight_decay", 0.1)},
{"params": no_decay_params, "weight_decay": 0.0}
]
return AdamW(
optim_groups,
lr=self.config["learning_rate"],
betas=(0.9, 0.95),
fused=True # CUDA fused 옵티마이저
)
def compute_val_loss(self) -> float:
"""검증 손실 계산"""
self.model.eval()
total_loss = 0.0
num_batches = min(len(self.val_dl), 50) # 최대 50 배치만 평가
with torch.no_grad():
for i, (x, y) in enumerate(self.val_dl):
if i >= num_batches:
break
x, y = x.to(self.device), y.to(self.device)
with self.ctx:
_, loss = self.model(x, y)
total_loss += loss.item()
self.model.train()
return total_loss / num_batches
def save_checkpoint(self, path: str):
"""체크포인트 저장"""
os.makedirs(os.path.dirname(path), exist_ok=True)
torch.save({
"step": self.step,
"model": self.model.state_dict(),
"optimizer": self.optimizer.state_dict(),
"scheduler": self.scheduler.state_dict(),
"config": self.model.config,
"best_val_loss": self.best_val_loss
}, path)
print(f"체크포인트 저장: {path}")
def train(self):
"""메인 학습 루프"""
self.model.to(self.device)
self.model.train()
num_epochs = self.config["num_epochs"]
grad_accum = self.config.get("gradient_accumulation_steps", 1)
max_grad_norm = self.config.get("max_grad_norm", 1.0)
log_interval = self.config.get("log_interval", 100)
save_interval = self.config.get("save_interval", 1000)
eval_interval = self.config.get("eval_interval", 500)
print(f"학습 시작: {self.device}, dtype={self.dtype}")
for epoch in range(num_epochs):
epoch_loss = 0.0
t0 = time.time()
for batch_idx, (x, y) in enumerate(self.train_dl):
x, y = x.to(self.device), y.to(self.device)
# Mixed Precision Forward
with self.ctx:
_, loss = self.model(x, y)
loss = loss / grad_accum
# Backward
self.scaler.scale(loss).backward()
# 그래디언트 누적
if (batch_idx + 1) % grad_accum == 0:
# Gradient Clipping
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(
self.model.parameters(), max_grad_norm
)
self.scaler.step(self.optimizer)
self.scaler.update()
self.scheduler.step()
self.optimizer.zero_grad(set_to_none=True)
self.step += 1
epoch_loss += loss.item() * grad_accum
# 로깅
if self.step % log_interval == 0:
lr = self.optimizer.param_groups[0]["lr"]
elapsed = time.time() - t0
tokens_per_sec = (
log_interval * x.shape[0] * x.shape[1] / elapsed
)
print(
f"Epoch {epoch} | Step {self.step} | "
f"Loss: {loss.item() * grad_accum:.4f} | "
f"LR: {lr:.6f} | "
f"Tokens/s: {tokens_per_sec:.0f}"
)
t0 = time.time()
# 평가
if self.step % eval_interval == 0:
val_loss = self.compute_val_loss()
print(f"Val Loss: {val_loss:.4f} | Perplexity: {math.exp(val_loss):.2f}")
if val_loss < self.best_val_loss:
self.best_val_loss = val_loss
self.save_checkpoint("./checkpoints/best_model.pt")
# 주기적 저장
if self.step % save_interval == 0:
self.save_checkpoint(f"./checkpoints/step_{self.step}.pt")
avg_loss = epoch_loss / len(self.train_dl)
print(f"\n=== Epoch {epoch} 완료 | 평균 Loss: {avg_loss:.4f} ===\n")
6. 텍스트 생성
6.1 다양한 디코딩 전략
import torch
import torch.nn.functional as F
from typing import Optional, List
class TextGenerator:
"""miniGPT 텍스트 생성기"""
def __init__(self, model: MiniGPT, tokenizer: GPTTokenizer, device: str = "cuda"):
self.model = model.to(device)
self.tokenizer = tokenizer
self.device = device
self.model.eval()
@torch.no_grad()
def generate(
self,
prompt: str,
max_new_tokens: int = 200,
strategy: str = "top_p",
temperature: float = 0.8,
top_k: int = 50,
top_p: float = 0.9,
repetition_penalty: float = 1.1,
num_beams: int = 4
) -> str:
"""텍스트 생성 메인 함수"""
input_ids = self.tokenizer.encode(prompt, add_special_tokens=False)
input_ids = torch.tensor(input_ids, dtype=torch.long, device=self.device).unsqueeze(0)
if strategy == "greedy":
generated = self._greedy_decode(input_ids, max_new_tokens)
elif strategy == "temperature":
generated = self._temperature_sampling(input_ids, max_new_tokens, temperature)
elif strategy == "top_k":
generated = self._top_k_sampling(input_ids, max_new_tokens, temperature, top_k)
elif strategy == "top_p":
generated = self._top_p_sampling(input_ids, max_new_tokens, temperature, top_p)
elif strategy == "beam":
generated = self._beam_search(input_ids, max_new_tokens, num_beams)
else:
raise ValueError(f"알 수 없는 전략: {strategy}")
# 생성된 토큰만 디코딩
new_tokens = generated[0][input_ids.shape[1]:].tolist()
return self.tokenizer.decode(new_tokens)
def _greedy_decode(self, input_ids: torch.Tensor, max_new_tokens: int) -> torch.Tensor:
"""탐욕적 디코딩: 항상 가장 높은 확률의 토큰 선택"""
current_ids = input_ids.clone()
for _ in range(max_new_tokens):
logits, _ = self.model(current_ids)
next_token = logits[:, -1, :].argmax(dim=-1, keepdim=True)
current_ids = torch.cat([current_ids, next_token], dim=1)
if next_token.item() == self.tokenizer.eot_token:
break
return current_ids
def _temperature_sampling(
self,
input_ids: torch.Tensor,
max_new_tokens: int,
temperature: float
) -> torch.Tensor:
"""Temperature 샘플링: 확률 분포를 조정하여 다양성 조절"""
current_ids = input_ids.clone()
for _ in range(max_new_tokens):
logits, _ = self.model(current_ids)
logits = logits[:, -1, :] / temperature # 온도로 나누기
probs = F.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
current_ids = torch.cat([current_ids, next_token], dim=1)
if next_token.item() == self.tokenizer.eot_token:
break
return current_ids
def _top_k_sampling(
self,
input_ids: torch.Tensor,
max_new_tokens: int,
temperature: float,
top_k: int
) -> torch.Tensor:
"""Top-k 샘플링: 상위 k개 토큰 중에서만 샘플링"""
current_ids = input_ids.clone()
for _ in range(max_new_tokens):
logits, _ = self.model(current_ids)
logits = logits[:, -1, :] / temperature
# 상위 k개만 유지, 나머지는 -inf
top_k_logits, top_k_indices = torch.topk(logits, k=top_k, dim=-1)
filtered_logits = torch.full_like(logits, float("-inf"))
filtered_logits.scatter_(1, top_k_indices, top_k_logits)
probs = F.softmax(filtered_logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
current_ids = torch.cat([current_ids, next_token], dim=1)
if next_token.item() == self.tokenizer.eot_token:
break
return current_ids
def _top_p_sampling(
self,
input_ids: torch.Tensor,
max_new_tokens: int,
temperature: float,
top_p: float
) -> torch.Tensor:
"""Top-p (Nucleus) 샘플링: 누적 확률 p 이내의 토큰에서 샘플링"""
current_ids = input_ids.clone()
for _ in range(max_new_tokens):
logits, _ = self.model(current_ids)
logits = logits[:, -1, :] / temperature
# 내림차순 정렬
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
# 누적 확률이 top_p를 초과하는 토큰 제거
sorted_indices_to_remove = cumulative_probs - F.softmax(sorted_logits, dim=-1) > top_p
sorted_logits[sorted_indices_to_remove] = float("-inf")
# 원래 순서로 복원
logits = torch.zeros_like(logits).scatter_(1, sorted_indices, sorted_logits)
probs = F.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
current_ids = torch.cat([current_ids, next_token], dim=1)
if next_token.item() == self.tokenizer.eot_token:
break
return current_ids
def _beam_search(
self,
input_ids: torch.Tensor,
max_new_tokens: int,
num_beams: int
) -> torch.Tensor:
"""빔 서치: 여러 후보를 동시에 탐색"""
B = input_ids.shape[0]
assert B == 1, "빔 서치는 배치 크기 1만 지원"
# 빔 초기화
beams = [(input_ids, 0.0)] # (ids, score)
for _ in range(max_new_tokens):
all_candidates = []
for beam_ids, beam_score in beams:
logits, _ = self.model(beam_ids)
log_probs = F.log_softmax(logits[:, -1, :], dim=-1)
# 상위 num_beams 토큰 선택
top_log_probs, top_ids = log_probs.topk(num_beams)
for i in range(num_beams):
token = top_ids[0, i].unsqueeze(0).unsqueeze(0)
new_ids = torch.cat([beam_ids, token], dim=1)
new_score = beam_score + top_log_probs[0, i].item()
all_candidates.append((new_ids, new_score))
# 길이 정규화 후 상위 num_beams 선택
all_candidates.sort(key=lambda x: x[1] / x[0].shape[1], reverse=True)
beams = all_candidates[:num_beams]
# EOT 토큰으로 끝난 빔이 있으면 중단
if beams[0][0][0, -1].item() == self.tokenizer.eot_token:
break
return beams[0][0] # 최고 점수 빔 반환
7. Perplexity 계산
import math
import torch
@torch.no_grad()
def compute_perplexity(
model: MiniGPT,
tokenizer: GPTTokenizer,
text: str,
device: str = "cuda",
seq_len: int = 512
) -> float:
"""텍스트의 Perplexity 계산"""
model.eval()
model.to(device)
tokens = tokenizer.encode(text, add_special_tokens=False)
total_loss = 0.0
total_tokens = 0
for i in range(0, len(tokens) - seq_len, seq_len):
chunk = tokens[i:i + seq_len + 1]
if len(chunk) < seq_len + 1:
break
x = torch.tensor(chunk[:-1], dtype=torch.long, device=device).unsqueeze(0)
y = torch.tensor(chunk[1:], dtype=torch.long, device=device).unsqueeze(0)
_, loss = model(x, y)
total_loss += loss.item() * seq_len
total_tokens += seq_len
avg_loss = total_loss / total_tokens
perplexity = math.exp(avg_loss)
return perplexity
# 사용 예시
model = MiniGPT.from_pretrained("./checkpoints/best_model.pt")
tokenizer = GPTTokenizer()
generator = TextGenerator(model, tokenizer)
test_text = "The quick brown fox jumps over the lazy dog."
ppl = compute_perplexity(model, tokenizer, test_text)
print(f"Perplexity: {ppl:.2f}")
# 텍스트 생성 테스트
for strategy in ["greedy", "top_k", "top_p"]:
generated = generator.generate(
prompt="Once upon a time",
max_new_tokens=100,
strategy=strategy,
temperature=0.8
)
print(f"\n[{strategy}]: {generated}")
8. 규모 확장
8.1 Flash Attention 2
# pip install flash-attn
from flash_attn import flash_attn_qkvpacked_func, flash_attn_func
class FlashCausalAttention(nn.Module):
"""Flash Attention 2를 사용한 고속 어텐션"""
def __init__(self, config: GPTConfig):
super().__init__()
self.n_head = config.n_head
self.n_embd = config.n_embd
self.head_dim = config.n_embd // config.n_head
self.qkv_proj = nn.Linear(config.n_embd, 3 * config.n_embd, bias=False)
self.out_proj = nn.Linear(config.n_embd, config.n_embd, bias=False)
self.dropout = config.dropout
def forward(self, x: torch.Tensor):
B, T, C = x.shape
qkv = self.qkv_proj(x)
# Flash Attention은 [B, T, 3, n_head, head_dim] 형식 필요
qkv = qkv.view(B, T, 3, self.n_head, self.head_dim)
# Flash Attention 호출 (자동으로 인과 마스크 적용)
attn_out = flash_attn_qkvpacked_func(
qkv,
dropout_p=self.dropout if self.training else 0.0,
causal=True # 인과 마스크
) # [B, T, n_head, head_dim]
attn_out = attn_out.reshape(B, T, C)
return self.out_proj(attn_out)
8.2 FSDP 분산 학습
import torch
import torch.distributed as dist
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
import functools
def setup_fsdp_training():
"""FSDP 분산 학습 설정"""
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
world_size = dist.get_world_size()
torch.cuda.set_device(rank)
config = GPTConfig(
n_embd=1024,
n_layer=24,
n_head=16,
vocab_size=50257
)
model = MiniGPT(config)
# FSDP 래핑 정책: TransformerBlock 단위로 샤딩
auto_wrap_policy = functools.partial(
transformer_auto_wrap_policy,
transformer_layer_cls={TransformerBlock}
)
model = FSDP(
model,
auto_wrap_policy=auto_wrap_policy,
mixed_precision=torch.distributed.fsdp.MixedPrecision(
param_dtype=torch.bfloat16,
reduce_dtype=torch.bfloat16,
buffer_dtype=torch.bfloat16
),
sharding_strategy="FULL_SHARD", # ZeRO-3 동급
device_id=rank
)
return model, rank, world_size
8.3 Chinchilla 스케일링 법칙
Chinchilla 논문(Hoffmann et al., 2022)에 따르면, 최적의 모델 크기와 학습 토큰 수의 관계:
최적 토큰 수 = 20 x 파라미터 수
def chinchilla_optimal_tokens(model_params: int) -> int:
"""Chinchilla 법칙에 따른 최적 학습 토큰 수"""
return 20 * model_params
# 예시:
models = {
"124M (GPT-2 Small)": 124e6,
"1.3B": 1.3e9,
"7B (Llama-2 7B)": 7e9,
"13B": 13e9,
"70B (Llama-2 70B)": 70e9,
}
print("모델 크기별 권장 학습 토큰 수:")
for name, params in models.items():
optimal_tokens = chinchilla_optimal_tokens(int(params))
print(f" {name}: {optimal_tokens/1e9:.1f}B 토큰")
9. Instruction Tuning
9.1 SFT 데이터 형식
# ChatML 형식
CHATML_SYSTEM = "<|im_start|>system\n{system}<|im_end|>\n"
CHATML_USER = "<|im_start|>user\n{user}<|im_end|>\n"
CHATML_ASSISTANT = "<|im_start|>assistant\n{assistant}<|im_end|>\n"
def format_chatml(
system: str,
user: str,
assistant: str,
add_generation_prompt: bool = False
) -> str:
"""ChatML 형식으로 대화 포맷팅"""
formatted = CHATML_SYSTEM.format(system=system)
formatted += CHATML_USER.format(user=user)
formatted += CHATML_ASSISTANT.format(assistant=assistant)
if add_generation_prompt:
formatted += "<|im_start|>assistant\n"
return formatted
# Llama-3 형식
def format_llama3(
system: str,
user: str,
assistant: str,
add_generation_prompt: bool = False
) -> str:
"""Llama-3 형식으로 대화 포맷팅"""
formatted = f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{system}<|eot_id|>\n"
formatted += f"<|start_header_id|>user<|end_header_id|>\n\n{user}<|eot_id|>\n"
formatted += f"<|start_header_id|>assistant<|end_header_id|>\n\n{assistant}<|eot_id|>"
if add_generation_prompt:
formatted += "\n<|start_header_id|>assistant<|end_header_id|>\n\n"
return formatted
9.2 SFT 데이터셋 클래스
class InstructionDataset(Dataset):
"""지시 파인튜닝 데이터셋 (어시스턴트 응답에만 손실 계산)"""
def __init__(
self,
data: list,
tokenizer: GPTTokenizer,
max_seq_len: int = 2048,
format_func = format_chatml
):
self.samples = []
for item in data:
# 전체 대화 토크나이징
full_text = format_func(
system=item.get("system", "You are a helpful assistant."),
user=item["instruction"],
assistant=item["response"]
)
full_ids = tokenizer.encode(full_text, add_special_tokens=False)
# 프롬프트 부분 (손실 마스킹 대상)
prompt_text = format_func(
system=item.get("system", "You are a helpful assistant."),
user=item["instruction"],
assistant="",
add_generation_prompt=True
)
prompt_ids = tokenizer.encode(prompt_text, add_special_tokens=False)
prompt_len = len(prompt_ids)
if len(full_ids) > max_seq_len:
full_ids = full_ids[:max_seq_len]
# 레이블: 프롬프트 부분은 -100으로 마스킹 (손실 계산 제외)
labels = full_ids.copy()
labels[:prompt_len] = [-100] * prompt_len
self.samples.append({
"input_ids": full_ids,
"labels": labels
})
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
sample = self.samples[idx]
return (
torch.tensor(sample["input_ids"], dtype=torch.long),
torch.tensor(sample["labels"], dtype=torch.long)
)
9.3 SFT 학습 루프
def train_sft(
base_model_path: str,
data_path: str,
output_dir: str,
num_epochs: int = 3,
learning_rate: float = 2e-5,
batch_size: int = 4
):
"""지시 파인튜닝 (SFT)"""
import json
# 데이터 로딩
with open(data_path) as f:
data = json.load(f)
tokenizer = GPTTokenizer()
dataset = InstructionDataset(data, tokenizer, max_seq_len=2048)
# 패딩 콜레이터
def collate_fn(batch):
input_ids = [item[0] for item in batch]
labels = [item[1] for item in batch]
# 배치 내 최대 길이로 패딩
max_len = max(len(x) for x in input_ids)
padded_ids = torch.zeros(len(batch), max_len, dtype=torch.long)
padded_labels = torch.full((len(batch), max_len), -100, dtype=torch.long)
for i, (ids, lbls) in enumerate(zip(input_ids, labels)):
padded_ids[i, :len(ids)] = ids
padded_labels[i, :len(lbls)] = lbls
return padded_ids, padded_labels
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
# 모델 로딩
model = MiniGPT.from_pretrained(base_model_path)
device = "cuda"
model.to(device)
optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)
total_steps = len(dataloader) * num_epochs
scheduler = get_cosine_schedule_with_warmup(optimizer, int(total_steps * 0.1), total_steps)
# 학습
model.train()
for epoch in range(num_epochs):
for step, (x, y) in enumerate(dataloader):
x, y = x.to(device), y.to(device)
logits, loss = model(x, y)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
if step % 50 == 0:
print(f"Epoch {epoch}, Step {step}, Loss: {loss.item():.4f}")
# 저장
os.makedirs(output_dir, exist_ok=True)
torch.save({
"model": model.state_dict(),
"config": model.config
}, f"{output_dir}/sft_model.pt")
print(f"SFT 완료. 모델 저장: {output_dir}")
10. 오픈소스 LLM 분석
10.1 Llama 3 구조 분석
Llama 3는 Meta에서 공개한 오픈소스 LLM으로, 다음과 같은 특징을 가집니다.
| 특징 | 설명 |
|---|---|
| 아키텍처 | Decoder-only Transformer |
| 정규화 | Pre-RMSNorm |
| 활성화 | SwiGLU |
| 위치 인코딩 | RoPE (theta=500,000) |
| 어휘 크기 | 128,256 |
| 컨텍스트 길이 | 128K (Llama 3.1+) |
| GQA | Grouped Query Attention (8B, 70B) |
# Llama 3 스타일 설정 (8B 모델 기준)
llama3_config = GPTConfig(
vocab_size=128256,
max_seq_len=8192,
n_embd=4096,
n_layer=32,
n_head=32,
use_rope=True,
use_swiglu=True,
bias=False
)
10.2 Grouped Query Attention (GQA)
class GroupedQueryAttention(nn.Module):
"""Grouped Query Attention (Llama 3, Mistral 사용)"""
def __init__(self, config: GPTConfig, n_kv_heads: int = 8):
super().__init__()
self.n_head = config.n_head # Query 헤드 수
self.n_kv_heads = n_kv_heads # KV 헤드 수 (더 적음)
self.n_rep = config.n_head // n_kv_heads # 각 KV 헤드를 몇 번 반복할지
self.head_dim = config.n_embd // config.n_head
self.q_proj = nn.Linear(config.n_embd, config.n_head * self.head_dim, bias=False)
self.k_proj = nn.Linear(config.n_embd, n_kv_heads * self.head_dim, bias=False)
self.v_proj = nn.Linear(config.n_embd, n_kv_heads * self.head_dim, bias=False)
self.out_proj = nn.Linear(config.n_embd, config.n_embd, bias=False)
self.rope = RotaryEmbedding(self.head_dim, config.max_seq_len, base=500000)
def forward(self, x: torch.Tensor):
B, T, C = x.shape
q = self.q_proj(x).view(B, T, self.n_head, self.head_dim).transpose(1, 2)
k = self.k_proj(x).view(B, T, self.n_kv_heads, self.head_dim).transpose(1, 2)
v = self.v_proj(x).view(B, T, self.n_kv_heads, self.head_dim).transpose(1, 2)
# RoPE 적용
q, k = self.rope(q, k, T)
# KV 헤드 반복 (GQA의 핵심)
# [B, n_kv_heads, T, head_dim] -> [B, n_head, T, head_dim]
k = k.unsqueeze(2).expand(B, self.n_kv_heads, self.n_rep, T, self.head_dim)
k = k.reshape(B, self.n_head, T, self.head_dim)
v = v.unsqueeze(2).expand(B, self.n_kv_heads, self.n_rep, T, self.head_dim)
v = v.reshape(B, self.n_head, T, self.head_dim)
# Flash Attention
y = F.scaled_dot_product_attention(q, k, v, is_causal=True)
y = y.transpose(1, 2).contiguous().view(B, T, C)
return self.out_proj(y)
10.3 DeepSeek 혁신
DeepSeek은 여러 기술적 혁신을 도입했습니다.
MLA (Multi-head Latent Attention): K, V를 저차원 잠재 공간으로 압축하여 KV 캐시 메모리를 획기적으로 줄입니다.
DeepSeekMoE: 전문가 혼합(MoE) 아키텍처를 미세화된 방식으로 적용합니다.
class MoEFFN(nn.Module):
"""Mixture of Experts FFN (간략화 버전)"""
def __init__(self, config: GPTConfig, num_experts: int = 8, top_k: int = 2):
super().__init__()
self.num_experts = num_experts
self.top_k = top_k
# 라우터
self.router = nn.Linear(config.n_embd, num_experts, bias=False)
# 각 전문가 FFN
self.experts = nn.ModuleList([
SwiGLUFFN(config) for _ in range(num_experts)
])
def forward(self, x: torch.Tensor) -> torch.Tensor:
B, T, C = x.shape
x_flat = x.view(B * T, C)
# 라우팅 점수 계산
router_logits = self.router(x_flat) # [B*T, num_experts]
router_probs = F.softmax(router_logits, dim=-1)
# 상위 k 전문가 선택
top_k_probs, top_k_indices = torch.topk(router_probs, self.top_k, dim=-1)
top_k_probs = top_k_probs / top_k_probs.sum(dim=-1, keepdim=True) # 정규화
# 전문가 출력 계산
output = torch.zeros_like(x_flat)
for k in range(self.top_k):
expert_idx = top_k_indices[:, k] # [B*T]
expert_prob = top_k_probs[:, k].unsqueeze(-1) # [B*T, 1]
for e in range(self.num_experts):
mask = (expert_idx == e)
if mask.any():
expert_out = self.experts[e](x_flat[mask])
output[mask] += expert_prob[mask] * expert_out
return output.view(B, T, C)
마치며
이 가이드에서 miniGPT를 처음부터 완전히 구현하며 현대 LLM의 모든 핵심 요소를 살펴보았습니다.
핵심 컴포넌트 요약:
- 토크나이저: BPE 알고리즘으로 텍스트를 서브워드 토큰으로 분리
- 임베딩: 토큰 임베딩 + RoPE 위치 인코딩
- Multi-Head Causal Attention: Q/K/V 투영, 인과 마스크, KV 캐시
- FFN: SwiGLU 활성화로 비선형성 추가
- Pre-RMSNorm: 학습 안정성
- 텍스트 생성: Greedy, Temperature, Top-k, Top-p, Beam Search
- 사전학습: Cosine LR Schedule, Gradient Clipping, Mixed Precision
- SFT: 응답 부분에만 손실 계산
이 기반 위에 Llama 3의 GQA, DeepSeek의 MoE 같은 현대적 기법들이 추가됩니다. 직접 구현하며 이해한 내용은 HuggingFace 라이브러리를 더 깊이 활용하는 데 큰 도움이 될 것입니다.
참고 자료
- Karpathy의 nanoGPT: https://github.com/karpathy/nanoGPT
- Chinchilla 스케일링 법칙 (Hoffmann et al., 2022): https://arxiv.org/abs/2203.15556
- Llama 3 기술 보고서: https://ai.meta.com/blog/meta-llama-3
- Flash Attention: https://github.com/Dao-AILab/flash-attention
- OpenAI tiktoken: https://github.com/openai/tiktoken
- Attention is All You Need (Vaswani et al., 2017)