Skip to content

필사 모드: LLM 처음부터 만들기: 코드로 이해하는 GPT 완전 구현 가이드

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며

GPT, Llama, Mistral 같은 대규모 언어 모델(LLM)은 현재 AI 혁명의 중심에 있습니다. 하지만 이 모델들이 어떻게 동작하는지 코드 수준에서 이해하는 사람은 많지 않습니다. 이 가이드에서는 "miniGPT"라는 소형 GPT를 처음부터 PyTorch로 직접 구현하며, 현대 LLM의 모든 핵심 개념을 마스터합니다.

1. LLM 전체 아키텍처 개요

1.1 사전학습 언어 모델의 파이프라인

현대 LLM의 개발 파이프라인은 세 단계로 나뉩니다.

1. **사전학습 (Pretraining)**: 수십억 개의 텍스트 토큰으로 다음 토큰 예측(next token prediction)을 학습합니다. 이 과정에서 모델은 언어의 통계적 패턴, 지식, 추론 능력을 습득합니다.

2. **지시 파인튜닝 (Instruction Fine-Tuning / SFT)**: 사람이 작성한 질문-답변 쌍으로 모델이 지시를 따르도록 학습합니다.

3. **선호도 학습 (RLHF / DPO)**: 사람의 선호도를 학습하여 더 유용하고 안전한 응답을 생성하도록 개선합니다.

1.2 miniGPT 사양

이 가이드에서 구현할 miniGPT의 사양:

| 파라미터 | 값 |

| ----------------- | --------------------- |

| 어휘 크기 | 50,257 (GPT-2와 동일) |

| 컨텍스트 길이 | 1024 |

| 임베딩 차원 | 768 |

| 레이어 수 | 12 |

| Attention 헤드 수 | 12 |

| FFN 확장 비율 | 4x |

| 총 파라미터 | 약 124M |

이는 GPT-2 Small과 동일한 사양으로, 단일 GPU에서 학습 가능합니다.

2. 토크나이저 구현

2.1 BPE 알고리즘

BPE(Byte-Pair Encoding)는 대부분의 현대 LLM이 사용하는 토크나이저 알고리즘입니다.

from collections import defaultdict

from typing import List, Tuple, Dict

class SimpleBPETokenizer:

"""BPE 토크나이저 처음부터 구현"""

def __init__(self):

self.vocab = {}

self.merges = {}

self.special_tokens = {

"<|endoftext|>": 50256,

"<|padding|>": 50257,

}

def get_stats(self, vocab: Dict[str, int]) -> Dict[Tuple, int]:

"""인접한 토큰 쌍의 빈도를 계산"""

pairs = defaultdict(int)

for word, freq in vocab.items():

symbols = word.split()

for i in range(len(symbols) - 1):

pairs[(symbols[i], symbols[i+1])] += freq

return pairs

def merge_vocab(self, pair: Tuple, v_in: Dict) -> Dict:

"""가장 빈번한 쌍을 병합"""

v_out = {}

bigram = " ".join(pair)

replacement = "".join(pair)

for word in v_in:

w_out = word.replace(bigram, replacement)

v_out[w_out] = v_in[word]

return v_out

def train(self, corpus: List[str], vocab_size: int = 1000):

"""BPE 어휘 학습"""

1. 문자 단위로 초기 어휘 구성

word_freq = defaultdict(int)

for text in corpus:

for word in text.split():

word_freq[" ".join(list(word)) + " </w>"] += 1

vocab = dict(word_freq)

2. vocab_size에 도달할 때까지 반복 병합

for i in range(vocab_size):

pairs = self.get_stats(vocab)

if not pairs:

break

best = max(pairs, key=pairs.get)

vocab = self.merge_vocab(best, vocab)

self.merges[best] = i

print(f"Step {i}: merged {best} (freq={pairs[best]})")

3. 최종 어휘 구성

for word in vocab:

for token in word.split():

if token not in self.vocab:

self.vocab[token] = len(self.vocab)

return self.vocab, self.merges

2.2 tiktoken 호환 토크나이저 사용

실제로는 OpenAI의 tiktoken 라이브러리를 사용하는 것이 훨씬 효율적입니다.

from typing import List

class GPTTokenizer:

"""tiktoken 기반 GPT 토크나이저 래퍼"""

def __init__(self, encoding_name: str = "gpt2"):

self.enc = tiktoken.get_encoding(encoding_name)

self.vocab_size = self.enc.n_vocab

self.eot_token = self.enc.eot_token # 50256

def encode(self, text: str, add_special_tokens: bool = True) -> List[int]:

"""텍스트를 토큰 ID 리스트로 변환"""

ids = self.enc.encode(text, allowed_special={"<|endoftext|>"})

if add_special_tokens:

ids = [self.eot_token] + ids

return ids

def decode(self, ids: List[int]) -> str:

"""토큰 ID 리스트를 텍스트로 변환"""

return self.enc.decode(ids)

def encode_batch(self, texts: List[str]) -> List[List[int]]:

"""배치 인코딩"""

return [self.encode(t) for t in texts]

def __len__(self):

return self.vocab_size

사용 예시

tokenizer = GPTTokenizer()

text = "안녕하세요! GPT를 처음부터 만들어봅시다."

ids = tokenizer.encode(text)

print(f"토큰 수: {len(ids)}")

print(f"토큰 ID: {ids}")

decoded = tokenizer.decode(ids)

print(f"디코딩: {decoded}")

3. 데이터 준비

3.1 텍스트 데이터셋 클래스

from torch.utils.data import Dataset, DataLoader

class TextDataset(Dataset):

"""슬라이딩 윈도우 방식의 언어 모델링 데이터셋"""

def __init__(

self,

data_path: str,

tokenizer: GPTTokenizer,

seq_len: int = 1024,

stride: int = 512

):

self.seq_len = seq_len

self.stride = stride

텍스트 파일 로딩 및 토크나이징

print(f"데이터 로딩: {data_path}")

with open(data_path, "r", encoding="utf-8") as f:

text = f.read()

print(f"총 문자 수: {len(text):,}")

self.tokens = tokenizer.encode(text, add_special_tokens=False)

print(f"총 토큰 수: {len(self.tokens):,}")

numpy 배열로 변환 (메모리 효율)

self.tokens = np.array(self.tokens, dtype=np.int32)

def __len__(self):

return max(0, (len(self.tokens) - self.seq_len) // self.stride)

def __getitem__(self, idx):

start = idx * self.stride

end = start + self.seq_len + 1 # +1은 레이블용

chunk = self.tokens[start:end]

입력: tokens[0:seq_len], 레이블: tokens[1:seq_len+1]

x = torch.tensor(chunk[:-1], dtype=torch.long)

y = torch.tensor(chunk[1:], dtype=torch.long)

return x, y

def create_dataloader(

data_path: str,

tokenizer: GPTTokenizer,

batch_size: int = 8,

seq_len: int = 1024,

stride: int = 512,

num_workers: int = 4,

shuffle: bool = True

) -> DataLoader:

dataset = TextDataset(data_path, tokenizer, seq_len, stride)

print(f"데이터셋 크기: {len(dataset):,} 샘플")

return DataLoader(

dataset,

batch_size=batch_size,

shuffle=shuffle,

num_workers=num_workers,

pin_memory=True,

drop_last=True

)

4. 모델 아키텍처 구현

4.1 설정 클래스

from dataclasses import dataclass

@dataclass

class GPTConfig:

"""GPT 모델 설정"""

vocab_size: int = 50257

max_seq_len: int = 1024

n_embd: int = 768

n_layer: int = 12

n_head: int = 12

dropout: float = 0.1

bias: bool = False # bias 없이도 좋은 성능

FFN 확장 비율

ffn_mult: int = 4

SwiGLU 사용 여부

use_swiglu: bool = True

RoPE 사용 여부

use_rope: bool = True

4.2 RMSNorm

class RMSNorm(nn.Module):

"""Root Mean Square Layer Normalization (Llama 스타일)"""

def __init__(self, dim: int, eps: float = 1e-6):

super().__init__()

self.eps = eps

self.weight = nn.Parameter(torch.ones(dim))

def _norm(self, x):

return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

def forward(self, x):

return self.weight * self._norm(x.float()).type_as(x)

4.3 Rotary Position Embedding (RoPE)

class RotaryEmbedding(nn.Module):

"""Rotary Position Embedding (RoPE)"""

def __init__(self, dim: int, max_seq_len: int = 2048, base: int = 10000):

super().__init__()

self.dim = dim

self.max_seq_len = max_seq_len

주파수 계산: theta_i = base^(-2i/dim)

inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim))

self.register_buffer("inv_freq", inv_freq)

코사인/사인 캐시 미리 계산

self._build_cache(max_seq_len)

def _build_cache(self, seq_len: int):

t = torch.arange(seq_len, device=self.inv_freq.device).float()

freqs = torch.einsum("i,j->ij", t, self.inv_freq)

emb = torch.cat([freqs, freqs], dim=-1)

self.register_buffer("cos_cache", emb.cos()[None, None, :, :])

self.register_buffer("sin_cache", emb.sin()[None, None, :, :])

def rotate_half(self, x):

"""텐서의 절반을 회전"""

x1 = x[..., :x.shape[-1] // 2]

x2 = x[..., x.shape[-1] // 2:]

return torch.cat([-x2, x1], dim=-1)

def forward(self, q: torch.Tensor, k: torch.Tensor, seq_len: int):

cos = self.cos_cache[:, :, :seq_len, :].to(q.dtype)

sin = self.sin_cache[:, :, :seq_len, :].to(q.dtype)

q_rot = (q * cos) + (self.rotate_half(q) * sin)

k_rot = (k * cos) + (self.rotate_half(k) * sin)

return q_rot, k_rot

4.4 Multi-Head Causal Attention with KV Cache

class CausalSelfAttention(nn.Module):

"""인과 자기 어텐션 (KV 캐시 지원)"""

def __init__(self, config: GPTConfig):

super().__init__()

assert config.n_embd % config.n_head == 0

self.n_head = config.n_head

self.n_embd = config.n_embd

self.head_dim = config.n_embd // config.n_head

Q, K, V 프로젝션 (한 번에 처리)

self.qkv_proj = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)

self.out_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)

self.attn_dropout = nn.Dropout(config.dropout)

self.resid_dropout = nn.Dropout(config.dropout)

RoPE

if config.use_rope:

self.rope = RotaryEmbedding(self.head_dim, config.max_seq_len)

else:

self.rope = None

인과 마스크 (하삼각 행렬)

self.register_buffer(

"causal_mask",

torch.tril(torch.ones(config.max_seq_len, config.max_seq_len))

.view(1, 1, config.max_seq_len, config.max_seq_len)

)

KV 캐시

self.use_cache = False

self.cache_k = None

self.cache_v = None

def forward(

self,

x: torch.Tensor,

use_cache: bool = False,

past_kv=None

):

B, T, C = x.shape # batch, seq_len, n_embd

Q, K, V 분리

qkv = self.qkv_proj(x)

q, k, v = qkv.split(self.n_embd, dim=2)

헤드 분할: [B, T, C] -> [B, n_head, T, head_dim]

q = q.view(B, T, self.n_head, self.head_dim).transpose(1, 2)

k = k.view(B, T, self.n_head, self.head_dim).transpose(1, 2)

v = v.view(B, T, self.n_head, self.head_dim).transpose(1, 2)

RoPE 적용

if self.rope is not None:

q, k = self.rope(q, k, T)

KV 캐시 처리

if past_kv is not None:

past_k, past_v = past_kv

k = torch.cat([past_k, k], dim=2)

v = torch.cat([past_v, v], dim=2)

present_kv = (k, v) if use_cache else None

kv_len = k.shape[2]

Scaled Dot-Product Attention

PyTorch 2.0+ Flash Attention 자동 활용

if hasattr(F, "scaled_dot_product_attention"):

Flash Attention 사용 (더 빠르고 메모리 효율적)

y = F.scaled_dot_product_attention(

q, k, v,

attn_mask=None,

dropout_p=self.attn_dropout.p if self.training else 0.0,

is_causal=True # 자동으로 인과 마스크 적용

)

else:

수동 구현

scale = 1.0 / math.sqrt(self.head_dim)

attn = (q @ k.transpose(-2, -1)) * scale

mask = self.causal_mask[:, :, :T, :kv_len]

attn = attn.masked_fill(mask == 0, float("-inf"))

attn = F.softmax(attn, dim=-1)

attn = self.attn_dropout(attn)

y = attn @ v

헤드 합치기: [B, n_head, T, head_dim] -> [B, T, C]

y = y.transpose(1, 2).contiguous().view(B, T, C)

y = self.resid_dropout(self.out_proj(y))

return y, present_kv

4.5 SwiGLU Feed-Forward Network

class SwiGLUFFN(nn.Module):

"""SwiGLU 활성화 함수를 사용하는 FFN (Llama 스타일)"""

def __init__(self, config: GPTConfig):

super().__init__()

hidden_dim = int(config.n_embd * config.ffn_mult * 2 / 3)

64의 배수로 반올림 (하드웨어 최적화)

hidden_dim = ((hidden_dim + 63) // 64) * 64

self.gate_proj = nn.Linear(config.n_embd, hidden_dim, bias=False)

self.up_proj = nn.Linear(config.n_embd, hidden_dim, bias=False)

self.down_proj = nn.Linear(hidden_dim, config.n_embd, bias=False)

self.dropout = nn.Dropout(config.dropout)

def forward(self, x: torch.Tensor) -> torch.Tensor:

SwiGLU: gate * SiLU(x)

gate = F.silu(self.gate_proj(x))

up = self.up_proj(x)

return self.dropout(self.down_proj(gate * up))

class GPTFFN(nn.Module):

"""원래 GPT 스타일 GELU FFN"""

def __init__(self, config: GPTConfig):

super().__init__()

hidden_dim = config.n_embd * config.ffn_mult

self.fc1 = nn.Linear(config.n_embd, hidden_dim, bias=config.bias)

self.fc2 = nn.Linear(hidden_dim, config.n_embd, bias=config.bias)

self.dropout = nn.Dropout(config.dropout)

def forward(self, x: torch.Tensor) -> torch.Tensor:

x = F.gelu(self.fc1(x))

x = self.dropout(self.fc2(x))

return x

4.6 Transformer Block

class TransformerBlock(nn.Module):

"""Pre-RMSNorm Transformer 블록"""

def __init__(self, config: GPTConfig):

super().__init__()

self.norm1 = RMSNorm(config.n_embd)

self.attn = CausalSelfAttention(config)

self.norm2 = RMSNorm(config.n_embd)

if config.use_swiglu:

self.ffn = SwiGLUFFN(config)

else:

self.ffn = GPTFFN(config)

def forward(self, x: torch.Tensor, use_cache: bool = False, past_kv=None):

Pre-Norm + Attention + Residual

attn_out, present_kv = self.attn(

self.norm1(x),

use_cache=use_cache,

past_kv=past_kv

)

x = x + attn_out

Pre-Norm + FFN + Residual

x = x + self.ffn(self.norm2(x))

return x, present_kv

4.7 전체 GPT 모델

class MiniGPT(nn.Module):

"""miniGPT 전체 모델"""

def __init__(self, config: GPTConfig):

super().__init__()

self.config = config

임베딩 레이어

self.token_emb = nn.Embedding(config.vocab_size, config.n_embd)

RoPE를 사용하지 않는 경우 학습 가능한 위치 임베딩

if not config.use_rope:

self.pos_emb = nn.Embedding(config.max_seq_len, config.n_embd)

self.emb_dropout = nn.Dropout(config.dropout)

Transformer 레이어

self.layers = nn.ModuleList([

TransformerBlock(config) for _ in range(config.n_layer)

])

최종 정규화

self.norm_final = RMSNorm(config.n_embd)

언어 모델 헤드 (어휘 크기로 투영)

self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)

가중치 공유 (입력 임베딩 == 출력 임베딩)

self.lm_head.weight = self.token_emb.weight

가중치 초기화

self.apply(self._init_weights)

특별 초기화: 잔차 스트림 스케일링

for pn, p in self.named_parameters():

if pn.endswith("out_proj.weight") or pn.endswith("down_proj.weight"):

nn.init.normal_(p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer))

print(f"파라미터 수: {self.count_params() / 1e6:.1f}M")

def _init_weights(self, module):

if isinstance(module, nn.Linear):

nn.init.normal_(module.weight, mean=0.0, std=0.02)

if module.bias is not None:

nn.init.zeros_(module.bias)

elif isinstance(module, nn.Embedding):

nn.init.normal_(module.weight, mean=0.0, std=0.02)

def count_params(self) -> int:

return sum(p.numel() for p in self.parameters())

def forward(

self,

input_ids: torch.Tensor,

targets: torch.Tensor = None,

use_cache: bool = False,

past_kvs: list = None

):

B, T = input_ids.shape

device = input_ids.device

토큰 임베딩

x = self.token_emb(input_ids) # [B, T, n_embd]

위치 임베딩 (RoPE 미사용 시)

if not self.config.use_rope:

positions = torch.arange(T, device=device)

x = x + self.pos_emb(positions)

x = self.emb_dropout(x)

Transformer 레이어 통과

present_kvs = []

for i, layer in enumerate(self.layers):

past_kv = past_kvs[i] if past_kvs is not None else None

x, present_kv = layer(x, use_cache=use_cache, past_kv=past_kv)

present_kvs.append(present_kv)

최종 정규화

x = self.norm_final(x)

if targets is not None:

학습: 전체 시퀀스에 대해 손실 계산

logits = self.lm_head(x)

loss = F.cross_entropy(

logits.view(-1, logits.size(-1)),

targets.view(-1),

ignore_index=-1

)

return logits, loss

else:

추론: 마지막 토큰의 로짓만 필요

logits = self.lm_head(x[:, [-1], :])

return logits, present_kvs

@classmethod

def from_pretrained(cls, model_path: str):

"""저장된 체크포인트에서 모델 로딩"""

checkpoint = torch.load(model_path, map_location="cpu")

config = checkpoint["config"]

model = cls(config)

model.load_state_dict(checkpoint["model"])

return model

5. 사전학습 구현

5.1 학습률 스케줄러

def get_cosine_schedule_with_warmup(

optimizer,

warmup_steps: int,

total_steps: int,

min_lr_ratio: float = 0.1

):

"""Cosine LR 스케줄 with warmup"""

def lr_lambda(current_step: int):

if current_step < warmup_steps:

Linear warmup

return current_step / max(1, warmup_steps)

else:

Cosine decay

progress = (current_step - warmup_steps) / max(1, total_steps - warmup_steps)

cosine_decay = 0.5 * (1.0 + math.cos(math.pi * progress))

return max(min_lr_ratio, cosine_decay)

from torch.optim.lr_scheduler import LambdaLR

return LambdaLR(optimizer, lr_lambda)

5.2 학습 루프

from torch.optim import AdamW

from contextlib import nullcontext

class Trainer:

"""miniGPT 사전학습 트레이너"""

def __init__(

self,

model: MiniGPT,

train_dataloader,

val_dataloader,

config: dict

):

self.model = model

self.train_dl = train_dataloader

self.val_dl = val_dataloader

self.config = config

self.device = config.get("device", "cuda" if torch.cuda.is_available() else "cpu")

Mixed Precision 설정

self.dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16

self.scaler = torch.cuda.amp.GradScaler(enabled=(self.dtype == torch.float16))

self.ctx = torch.amp.autocast(device_type="cuda", dtype=self.dtype)

옵티마이저 설정

self.optimizer = self._create_optimizer()

학습률 스케줄러

total_steps = len(train_dataloader) * config["num_epochs"]

warmup_steps = int(total_steps * config.get("warmup_ratio", 0.05))

self.scheduler = get_cosine_schedule_with_warmup(

self.optimizer,

warmup_steps=warmup_steps,

total_steps=total_steps

)

self.step = 0

self.best_val_loss = float("inf")

def _create_optimizer(self):

"""Weight decay를 적용할 파라미터와 그렇지 않을 파라미터 분리"""

decay_params = []

no_decay_params = []

for name, param in self.model.named_parameters():

if not param.requires_grad:

continue

1D 텐서 (bias, norm weight)는 weight decay 적용 안함

if param.dim() == 1 or "emb" in name:

no_decay_params.append(param)

else:

decay_params.append(param)

optim_groups = [

{"params": decay_params, "weight_decay": self.config.get("weight_decay", 0.1)},

{"params": no_decay_params, "weight_decay": 0.0}

]

return AdamW(

optim_groups,

lr=self.config["learning_rate"],

betas=(0.9, 0.95),

fused=True # CUDA fused 옵티마이저

)

def compute_val_loss(self) -> float:

"""검증 손실 계산"""

self.model.eval()

total_loss = 0.0

num_batches = min(len(self.val_dl), 50) # 최대 50 배치만 평가

with torch.no_grad():

for i, (x, y) in enumerate(self.val_dl):

if i >= num_batches:

break

x, y = x.to(self.device), y.to(self.device)

with self.ctx:

_, loss = self.model(x, y)

total_loss += loss.item()

self.model.train()

return total_loss / num_batches

def save_checkpoint(self, path: str):

"""체크포인트 저장"""

os.makedirs(os.path.dirname(path), exist_ok=True)

torch.save({

"step": self.step,

"model": self.model.state_dict(),

"optimizer": self.optimizer.state_dict(),

"scheduler": self.scheduler.state_dict(),

"config": self.model.config,

"best_val_loss": self.best_val_loss

}, path)

print(f"체크포인트 저장: {path}")

def train(self):

"""메인 학습 루프"""

self.model.to(self.device)

self.model.train()

num_epochs = self.config["num_epochs"]

grad_accum = self.config.get("gradient_accumulation_steps", 1)

max_grad_norm = self.config.get("max_grad_norm", 1.0)

log_interval = self.config.get("log_interval", 100)

save_interval = self.config.get("save_interval", 1000)

eval_interval = self.config.get("eval_interval", 500)

print(f"학습 시작: {self.device}, dtype={self.dtype}")

for epoch in range(num_epochs):

epoch_loss = 0.0

t0 = time.time()

for batch_idx, (x, y) in enumerate(self.train_dl):

x, y = x.to(self.device), y.to(self.device)

Mixed Precision Forward

with self.ctx:

_, loss = self.model(x, y)

loss = loss / grad_accum

Backward

self.scaler.scale(loss).backward()

그래디언트 누적

if (batch_idx + 1) % grad_accum == 0:

Gradient Clipping

self.scaler.unscale_(self.optimizer)

torch.nn.utils.clip_grad_norm_(

self.model.parameters(), max_grad_norm

)

self.scaler.step(self.optimizer)

self.scaler.update()

self.scheduler.step()

self.optimizer.zero_grad(set_to_none=True)

self.step += 1

epoch_loss += loss.item() * grad_accum

로깅

if self.step % log_interval == 0:

lr = self.optimizer.param_groups[0]["lr"]

elapsed = time.time() - t0

tokens_per_sec = (

log_interval * x.shape[0] * x.shape[1] / elapsed

)

print(

f"Epoch {epoch} | Step {self.step} | "

f"Loss: {loss.item() * grad_accum:.4f} | "

f"LR: {lr:.6f} | "

f"Tokens/s: {tokens_per_sec:.0f}"

)

t0 = time.time()

평가

if self.step % eval_interval == 0:

val_loss = self.compute_val_loss()

print(f"Val Loss: {val_loss:.4f} | Perplexity: {math.exp(val_loss):.2f}")

if val_loss < self.best_val_loss:

self.best_val_loss = val_loss

self.save_checkpoint("./checkpoints/best_model.pt")

주기적 저장

if self.step % save_interval == 0:

self.save_checkpoint(f"./checkpoints/step_{self.step}.pt")

avg_loss = epoch_loss / len(self.train_dl)

print(f"\n=== Epoch {epoch} 완료 | 평균 Loss: {avg_loss:.4f} ===\n")

6. 텍스트 생성

6.1 다양한 디코딩 전략

from typing import Optional, List

class TextGenerator:

"""miniGPT 텍스트 생성기"""

def __init__(self, model: MiniGPT, tokenizer: GPTTokenizer, device: str = "cuda"):

self.model = model.to(device)

self.tokenizer = tokenizer

self.device = device

self.model.eval()

@torch.no_grad()

def generate(

self,

prompt: str,

max_new_tokens: int = 200,

strategy: str = "top_p",

temperature: float = 0.8,

top_k: int = 50,

top_p: float = 0.9,

repetition_penalty: float = 1.1,

num_beams: int = 4

) -> str:

"""텍스트 생성 메인 함수"""

input_ids = self.tokenizer.encode(prompt, add_special_tokens=False)

input_ids = torch.tensor(input_ids, dtype=torch.long, device=self.device).unsqueeze(0)

if strategy == "greedy":

generated = self._greedy_decode(input_ids, max_new_tokens)

elif strategy == "temperature":

generated = self._temperature_sampling(input_ids, max_new_tokens, temperature)

elif strategy == "top_k":

generated = self._top_k_sampling(input_ids, max_new_tokens, temperature, top_k)

elif strategy == "top_p":

generated = self._top_p_sampling(input_ids, max_new_tokens, temperature, top_p)

elif strategy == "beam":

generated = self._beam_search(input_ids, max_new_tokens, num_beams)

else:

raise ValueError(f"알 수 없는 전략: {strategy}")

생성된 토큰만 디코딩

new_tokens = generated[0][input_ids.shape[1]:].tolist()

return self.tokenizer.decode(new_tokens)

def _greedy_decode(self, input_ids: torch.Tensor, max_new_tokens: int) -> torch.Tensor:

"""탐욕적 디코딩: 항상 가장 높은 확률의 토큰 선택"""

current_ids = input_ids.clone()

for _ in range(max_new_tokens):

logits, _ = self.model(current_ids)

next_token = logits[:, -1, :].argmax(dim=-1, keepdim=True)

current_ids = torch.cat([current_ids, next_token], dim=1)

if next_token.item() == self.tokenizer.eot_token:

break

return current_ids

def _temperature_sampling(

self,

input_ids: torch.Tensor,

max_new_tokens: int,

temperature: float

) -> torch.Tensor:

"""Temperature 샘플링: 확률 분포를 조정하여 다양성 조절"""

current_ids = input_ids.clone()

for _ in range(max_new_tokens):

logits, _ = self.model(current_ids)

logits = logits[:, -1, :] / temperature # 온도로 나누기

probs = F.softmax(logits, dim=-1)

next_token = torch.multinomial(probs, num_samples=1)

current_ids = torch.cat([current_ids, next_token], dim=1)

if next_token.item() == self.tokenizer.eot_token:

break

return current_ids

def _top_k_sampling(

self,

input_ids: torch.Tensor,

max_new_tokens: int,

temperature: float,

top_k: int

) -> torch.Tensor:

"""Top-k 샘플링: 상위 k개 토큰 중에서만 샘플링"""

current_ids = input_ids.clone()

for _ in range(max_new_tokens):

logits, _ = self.model(current_ids)

logits = logits[:, -1, :] / temperature

상위 k개만 유지, 나머지는 -inf

top_k_logits, top_k_indices = torch.topk(logits, k=top_k, dim=-1)

filtered_logits = torch.full_like(logits, float("-inf"))

filtered_logits.scatter_(1, top_k_indices, top_k_logits)

probs = F.softmax(filtered_logits, dim=-1)

next_token = torch.multinomial(probs, num_samples=1)

current_ids = torch.cat([current_ids, next_token], dim=1)

if next_token.item() == self.tokenizer.eot_token:

break

return current_ids

def _top_p_sampling(

self,

input_ids: torch.Tensor,

max_new_tokens: int,

temperature: float,

top_p: float

) -> torch.Tensor:

"""Top-p (Nucleus) 샘플링: 누적 확률 p 이내의 토큰에서 샘플링"""

current_ids = input_ids.clone()

for _ in range(max_new_tokens):

logits, _ = self.model(current_ids)

logits = logits[:, -1, :] / temperature

내림차순 정렬

sorted_logits, sorted_indices = torch.sort(logits, descending=True)

cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)

누적 확률이 top_p를 초과하는 토큰 제거

sorted_indices_to_remove = cumulative_probs - F.softmax(sorted_logits, dim=-1) > top_p

sorted_logits[sorted_indices_to_remove] = float("-inf")

원래 순서로 복원

logits = torch.zeros_like(logits).scatter_(1, sorted_indices, sorted_logits)

probs = F.softmax(logits, dim=-1)

next_token = torch.multinomial(probs, num_samples=1)

current_ids = torch.cat([current_ids, next_token], dim=1)

if next_token.item() == self.tokenizer.eot_token:

break

return current_ids

def _beam_search(

self,

input_ids: torch.Tensor,

max_new_tokens: int,

num_beams: int

) -> torch.Tensor:

"""빔 서치: 여러 후보를 동시에 탐색"""

B = input_ids.shape[0]

assert B == 1, "빔 서치는 배치 크기 1만 지원"

빔 초기화

beams = [(input_ids, 0.0)] # (ids, score)

for _ in range(max_new_tokens):

all_candidates = []

for beam_ids, beam_score in beams:

logits, _ = self.model(beam_ids)

log_probs = F.log_softmax(logits[:, -1, :], dim=-1)

상위 num_beams 토큰 선택

top_log_probs, top_ids = log_probs.topk(num_beams)

for i in range(num_beams):

token = top_ids[0, i].unsqueeze(0).unsqueeze(0)

new_ids = torch.cat([beam_ids, token], dim=1)

new_score = beam_score + top_log_probs[0, i].item()

all_candidates.append((new_ids, new_score))

길이 정규화 후 상위 num_beams 선택

all_candidates.sort(key=lambda x: x[1] / x[0].shape[1], reverse=True)

beams = all_candidates[:num_beams]

EOT 토큰으로 끝난 빔이 있으면 중단

if beams[0][0][0, -1].item() == self.tokenizer.eot_token:

break

return beams[0][0] # 최고 점수 빔 반환

7. Perplexity 계산

@torch.no_grad()

def compute_perplexity(

model: MiniGPT,

tokenizer: GPTTokenizer,

text: str,

device: str = "cuda",

seq_len: int = 512

) -> float:

"""텍스트의 Perplexity 계산"""

model.eval()

model.to(device)

tokens = tokenizer.encode(text, add_special_tokens=False)

total_loss = 0.0

total_tokens = 0

for i in range(0, len(tokens) - seq_len, seq_len):

chunk = tokens[i:i + seq_len + 1]

if len(chunk) < seq_len + 1:

break

x = torch.tensor(chunk[:-1], dtype=torch.long, device=device).unsqueeze(0)

y = torch.tensor(chunk[1:], dtype=torch.long, device=device).unsqueeze(0)

_, loss = model(x, y)

total_loss += loss.item() * seq_len

total_tokens += seq_len

avg_loss = total_loss / total_tokens

perplexity = math.exp(avg_loss)

return perplexity

사용 예시

model = MiniGPT.from_pretrained("./checkpoints/best_model.pt")

tokenizer = GPTTokenizer()

generator = TextGenerator(model, tokenizer)

test_text = "The quick brown fox jumps over the lazy dog."

ppl = compute_perplexity(model, tokenizer, test_text)

print(f"Perplexity: {ppl:.2f}")

텍스트 생성 테스트

for strategy in ["greedy", "top_k", "top_p"]:

generated = generator.generate(

prompt="Once upon a time",

max_new_tokens=100,

strategy=strategy,

temperature=0.8

)

print(f"\n[{strategy}]: {generated}")

8. 규모 확장

8.1 Flash Attention 2

pip install flash-attn

from flash_attn import flash_attn_qkvpacked_func, flash_attn_func

class FlashCausalAttention(nn.Module):

"""Flash Attention 2를 사용한 고속 어텐션"""

def __init__(self, config: GPTConfig):

super().__init__()

self.n_head = config.n_head

self.n_embd = config.n_embd

self.head_dim = config.n_embd // config.n_head

self.qkv_proj = nn.Linear(config.n_embd, 3 * config.n_embd, bias=False)

self.out_proj = nn.Linear(config.n_embd, config.n_embd, bias=False)

self.dropout = config.dropout

def forward(self, x: torch.Tensor):

B, T, C = x.shape

qkv = self.qkv_proj(x)

Flash Attention은 [B, T, 3, n_head, head_dim] 형식 필요

qkv = qkv.view(B, T, 3, self.n_head, self.head_dim)

Flash Attention 호출 (자동으로 인과 마스크 적용)

attn_out = flash_attn_qkvpacked_func(

qkv,

dropout_p=self.dropout if self.training else 0.0,

causal=True # 인과 마스크

) # [B, T, n_head, head_dim]

attn_out = attn_out.reshape(B, T, C)

return self.out_proj(attn_out)

8.2 FSDP 분산 학습

from torch.distributed.fsdp import FullyShardedDataParallel as FSDP

from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy

def setup_fsdp_training():

"""FSDP 분산 학습 설정"""

dist.init_process_group(backend="nccl")

rank = dist.get_rank()

world_size = dist.get_world_size()

torch.cuda.set_device(rank)

config = GPTConfig(

n_embd=1024,

n_layer=24,

n_head=16,

vocab_size=50257

)

model = MiniGPT(config)

FSDP 래핑 정책: TransformerBlock 단위로 샤딩

auto_wrap_policy = functools.partial(

transformer_auto_wrap_policy,

transformer_layer_cls={TransformerBlock}

)

model = FSDP(

model,

auto_wrap_policy=auto_wrap_policy,

mixed_precision=torch.distributed.fsdp.MixedPrecision(

param_dtype=torch.bfloat16,

reduce_dtype=torch.bfloat16,

buffer_dtype=torch.bfloat16

),

sharding_strategy="FULL_SHARD", # ZeRO-3 동급

device_id=rank

)

return model, rank, world_size

8.3 Chinchilla 스케일링 법칙

Chinchilla 논문(Hoffmann et al., 2022)에 따르면, 최적의 모델 크기와 학습 토큰 수의 관계:

**최적 토큰 수 = 20 x 파라미터 수**

def chinchilla_optimal_tokens(model_params: int) -> int:

"""Chinchilla 법칙에 따른 최적 학습 토큰 수"""

return 20 * model_params

예시:

models = {

"124M (GPT-2 Small)": 124e6,

"1.3B": 1.3e9,

"7B (Llama-2 7B)": 7e9,

"13B": 13e9,

"70B (Llama-2 70B)": 70e9,

}

print("모델 크기별 권장 학습 토큰 수:")

for name, params in models.items():

optimal_tokens = chinchilla_optimal_tokens(int(params))

print(f" {name}: {optimal_tokens/1e9:.1f}B 토큰")

9. Instruction Tuning

9.1 SFT 데이터 형식

ChatML 형식

CHATML_SYSTEM = "<|im_start|>system\n{system}<|im_end|>\n"

CHATML_USER = "<|im_start|>user\n{user}<|im_end|>\n"

CHATML_ASSISTANT = "<|im_start|>assistant\n{assistant}<|im_end|>\n"

def format_chatml(

system: str,

user: str,

assistant: str,

add_generation_prompt: bool = False

) -> str:

"""ChatML 형식으로 대화 포맷팅"""

formatted = CHATML_SYSTEM.format(system=system)

formatted += CHATML_USER.format(user=user)

formatted += CHATML_ASSISTANT.format(assistant=assistant)

if add_generation_prompt:

formatted += "<|im_start|>assistant\n"

return formatted

Llama-3 형식

def format_llama3(

system: str,

user: str,

assistant: str,

add_generation_prompt: bool = False

) -> str:

"""Llama-3 형식으로 대화 포맷팅"""

formatted = f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{system}<|eot_id|>\n"

formatted += f"<|start_header_id|>user<|end_header_id|>\n\n{user}<|eot_id|>\n"

formatted += f"<|start_header_id|>assistant<|end_header_id|>\n\n{assistant}<|eot_id|>"

if add_generation_prompt:

formatted += "\n<|start_header_id|>assistant<|end_header_id|>\n\n"

return formatted

9.2 SFT 데이터셋 클래스

class InstructionDataset(Dataset):

"""지시 파인튜닝 데이터셋 (어시스턴트 응답에만 손실 계산)"""

def __init__(

self,

data: list,

tokenizer: GPTTokenizer,

max_seq_len: int = 2048,

format_func = format_chatml

):

self.samples = []

for item in data:

전체 대화 토크나이징

full_text = format_func(

system=item.get("system", "You are a helpful assistant."),

user=item["instruction"],

assistant=item["response"]

)

full_ids = tokenizer.encode(full_text, add_special_tokens=False)

프롬프트 부분 (손실 마스킹 대상)

prompt_text = format_func(

system=item.get("system", "You are a helpful assistant."),

user=item["instruction"],

assistant="",

add_generation_prompt=True

)

prompt_ids = tokenizer.encode(prompt_text, add_special_tokens=False)

prompt_len = len(prompt_ids)

if len(full_ids) > max_seq_len:

full_ids = full_ids[:max_seq_len]

레이블: 프롬프트 부분은 -100으로 마스킹 (손실 계산 제외)

labels = full_ids.copy()

labels[:prompt_len] = [-100] * prompt_len

self.samples.append({

"input_ids": full_ids,

"labels": labels

})

def __len__(self):

return len(self.samples)

def __getitem__(self, idx):

sample = self.samples[idx]

return (

torch.tensor(sample["input_ids"], dtype=torch.long),

torch.tensor(sample["labels"], dtype=torch.long)

)

9.3 SFT 학습 루프

def train_sft(

base_model_path: str,

data_path: str,

output_dir: str,

num_epochs: int = 3,

learning_rate: float = 2e-5,

batch_size: int = 4

):

"""지시 파인튜닝 (SFT)"""

데이터 로딩

with open(data_path) as f:

data = json.load(f)

tokenizer = GPTTokenizer()

dataset = InstructionDataset(data, tokenizer, max_seq_len=2048)

패딩 콜레이터

def collate_fn(batch):

input_ids = [item[0] for item in batch]

labels = [item[1] for item in batch]

배치 내 최대 길이로 패딩

max_len = max(len(x) for x in input_ids)

padded_ids = torch.zeros(len(batch), max_len, dtype=torch.long)

padded_labels = torch.full((len(batch), max_len), -100, dtype=torch.long)

for i, (ids, lbls) in enumerate(zip(input_ids, labels)):

padded_ids[i, :len(ids)] = ids

padded_labels[i, :len(lbls)] = lbls

return padded_ids, padded_labels

dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

모델 로딩

model = MiniGPT.from_pretrained(base_model_path)

device = "cuda"

model.to(device)

optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)

total_steps = len(dataloader) * num_epochs

scheduler = get_cosine_schedule_with_warmup(optimizer, int(total_steps * 0.1), total_steps)

학습

model.train()

for epoch in range(num_epochs):

for step, (x, y) in enumerate(dataloader):

x, y = x.to(device), y.to(device)

logits, loss = model(x, y)

loss.backward()

torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

optimizer.step()

scheduler.step()

optimizer.zero_grad()

if step % 50 == 0:

print(f"Epoch {epoch}, Step {step}, Loss: {loss.item():.4f}")

저장

os.makedirs(output_dir, exist_ok=True)

torch.save({

"model": model.state_dict(),

"config": model.config

}, f"{output_dir}/sft_model.pt")

print(f"SFT 완료. 모델 저장: {output_dir}")

10. 오픈소스 LLM 분석

10.1 Llama 3 구조 분석

Llama 3는 Meta에서 공개한 오픈소스 LLM으로, 다음과 같은 특징을 가집니다.

| 특징 | 설명 |

| ------------- | --------------------------------- |

| 아키텍처 | Decoder-only Transformer |

| 정규화 | Pre-RMSNorm |

| 활성화 | SwiGLU |

| 위치 인코딩 | RoPE (theta=500,000) |

| 어휘 크기 | 128,256 |

| 컨텍스트 길이 | 128K (Llama 3.1+) |

| GQA | Grouped Query Attention (8B, 70B) |

Llama 3 스타일 설정 (8B 모델 기준)

llama3_config = GPTConfig(

vocab_size=128256,

max_seq_len=8192,

n_embd=4096,

n_layer=32,

n_head=32,

use_rope=True,

use_swiglu=True,

bias=False

)

10.2 Grouped Query Attention (GQA)

class GroupedQueryAttention(nn.Module):

"""Grouped Query Attention (Llama 3, Mistral 사용)"""

def __init__(self, config: GPTConfig, n_kv_heads: int = 8):

super().__init__()

self.n_head = config.n_head # Query 헤드 수

self.n_kv_heads = n_kv_heads # KV 헤드 수 (더 적음)

self.n_rep = config.n_head // n_kv_heads # 각 KV 헤드를 몇 번 반복할지

self.head_dim = config.n_embd // config.n_head

self.q_proj = nn.Linear(config.n_embd, config.n_head * self.head_dim, bias=False)

self.k_proj = nn.Linear(config.n_embd, n_kv_heads * self.head_dim, bias=False)

self.v_proj = nn.Linear(config.n_embd, n_kv_heads * self.head_dim, bias=False)

self.out_proj = nn.Linear(config.n_embd, config.n_embd, bias=False)

self.rope = RotaryEmbedding(self.head_dim, config.max_seq_len, base=500000)

def forward(self, x: torch.Tensor):

B, T, C = x.shape

q = self.q_proj(x).view(B, T, self.n_head, self.head_dim).transpose(1, 2)

k = self.k_proj(x).view(B, T, self.n_kv_heads, self.head_dim).transpose(1, 2)

v = self.v_proj(x).view(B, T, self.n_kv_heads, self.head_dim).transpose(1, 2)

RoPE 적용

q, k = self.rope(q, k, T)

KV 헤드 반복 (GQA의 핵심)

[B, n_kv_heads, T, head_dim] -> [B, n_head, T, head_dim]

k = k.unsqueeze(2).expand(B, self.n_kv_heads, self.n_rep, T, self.head_dim)

k = k.reshape(B, self.n_head, T, self.head_dim)

v = v.unsqueeze(2).expand(B, self.n_kv_heads, self.n_rep, T, self.head_dim)

v = v.reshape(B, self.n_head, T, self.head_dim)

Flash Attention

y = F.scaled_dot_product_attention(q, k, v, is_causal=True)

y = y.transpose(1, 2).contiguous().view(B, T, C)

return self.out_proj(y)

10.3 DeepSeek 혁신

DeepSeek은 여러 기술적 혁신을 도입했습니다.

**MLA (Multi-head Latent Attention)**: K, V를 저차원 잠재 공간으로 압축하여 KV 캐시 메모리를 획기적으로 줄입니다.

**DeepSeekMoE**: 전문가 혼합(MoE) 아키텍처를 미세화된 방식으로 적용합니다.

class MoEFFN(nn.Module):

"""Mixture of Experts FFN (간략화 버전)"""

def __init__(self, config: GPTConfig, num_experts: int = 8, top_k: int = 2):

super().__init__()

self.num_experts = num_experts

self.top_k = top_k

라우터

self.router = nn.Linear(config.n_embd, num_experts, bias=False)

각 전문가 FFN

self.experts = nn.ModuleList([

SwiGLUFFN(config) for _ in range(num_experts)

])

def forward(self, x: torch.Tensor) -> torch.Tensor:

B, T, C = x.shape

x_flat = x.view(B * T, C)

라우팅 점수 계산

router_logits = self.router(x_flat) # [B*T, num_experts]

router_probs = F.softmax(router_logits, dim=-1)

상위 k 전문가 선택

top_k_probs, top_k_indices = torch.topk(router_probs, self.top_k, dim=-1)

top_k_probs = top_k_probs / top_k_probs.sum(dim=-1, keepdim=True) # 정규화

전문가 출력 계산

output = torch.zeros_like(x_flat)

for k in range(self.top_k):

expert_idx = top_k_indices[:, k] # [B*T]

expert_prob = top_k_probs[:, k].unsqueeze(-1) # [B*T, 1]

for e in range(self.num_experts):

mask = (expert_idx == e)

if mask.any():

expert_out = self.experts[e](x_flat[mask])

output[mask] += expert_prob[mask] * expert_out

return output.view(B, T, C)

마치며

이 가이드에서 miniGPT를 처음부터 완전히 구현하며 현대 LLM의 모든 핵심 요소를 살펴보았습니다.

**핵심 컴포넌트 요약:**

- **토크나이저**: BPE 알고리즘으로 텍스트를 서브워드 토큰으로 분리

- **임베딩**: 토큰 임베딩 + RoPE 위치 인코딩

- **Multi-Head Causal Attention**: Q/K/V 투영, 인과 마스크, KV 캐시

- **FFN**: SwiGLU 활성화로 비선형성 추가

- **Pre-RMSNorm**: 학습 안정성

- **텍스트 생성**: Greedy, Temperature, Top-k, Top-p, Beam Search

- **사전학습**: Cosine LR Schedule, Gradient Clipping, Mixed Precision

- **SFT**: 응답 부분에만 손실 계산

이 기반 위에 Llama 3의 GQA, DeepSeek의 MoE 같은 현대적 기법들이 추가됩니다. 직접 구현하며 이해한 내용은 HuggingFace 라이브러리를 더 깊이 활용하는 데 큰 도움이 될 것입니다.

참고 자료

- Karpathy의 nanoGPT: https://github.com/karpathy/nanoGPT

- Chinchilla 스케일링 법칙 (Hoffmann et al., 2022): https://arxiv.org/abs/2203.15556

- Llama 3 기술 보고서: https://ai.meta.com/blog/meta-llama-3

- Flash Attention: https://github.com/Dao-AILab/flash-attention

- OpenAI tiktoken: https://github.com/openai/tiktoken

- Attention is All You Need (Vaswani et al., 2017)

현재 단락 (1/968)

GPT, Llama, Mistral 같은 대규모 언어 모델(LLM)은 현재 AI 혁명의 중심에 있습니다. 하지만 이 모델들이 어떻게 동작하는지 코드 수준에서 이해하는 사람은 많지 ...

작성 글자: 0원문 글자: 31,003작성 단락: 0/968