- Authors

- Name
- Youngju Kim
- @fjvbn20031
들어가며
추천 시스템은 Netflix의 영화 추천, Amazon의 상품 추천, Spotify의 음악 추천 등 현대 디지털 서비스의 핵심입니다. 넷플릭스는 추천 시스템 덕분에 연간 10억 달러 이상의 가치를 창출한다고 알려져 있습니다.
이 가이드는 고전적인 협업 필터링부터 그래프 신경망, 그리고 LLM 기반 추천까지 완전한 여정을 제공합니다. 모든 섹션에는 실행 가능한 PyTorch 코드가 포함되어 있습니다.
1. 추천 시스템 기초
1.1 추천 시스템의 세 가지 유형
**협업 필터링 (Collaborative Filtering)**은 유사한 사용자나 아이템의 패턴을 활용합니다.
- 사용자 기반: "당신과 비슷한 사람들이 이것을 좋아했습니다"
- 아이템 기반: "당신이 좋아했던 아이템과 비슷한 아이템입니다"
**콘텐츠 기반 필터링 (Content-Based Filtering)**은 아이템의 속성(장르, 감독, 설명 등)을 분석합니다.
- 새로운 아이템에도 추천 가능 (콜드 스타트 문제 완화)
- 특성 공학이 중요
하이브리드 방법은 두 방식을 결합하여 각각의 단점을 보완합니다.
1.2 암시적 vs 명시적 피드백
명시적 피드백: 별점, 좋아요, 싫어요 — 의도가 명확하지만 희소(sparse)함 암시적 피드백: 클릭, 조회 시간, 구매 기록 — 풍부하지만 노이즈가 많음
실제 서비스에서는 암시적 피드백이 훨씬 풍부하여 주로 활용됩니다.
1.3 평가 지표
import numpy as np
from sklearn.metrics import ndcg_score
def precision_at_k(recommended, relevant, k):
"""Precision@K: 상위 K개 중 관련 아이템 비율"""
rec_k = recommended[:k]
hits = len(set(rec_k) & set(relevant))
return hits / k
def recall_at_k(recommended, relevant, k):
"""Recall@K: 모든 관련 아이템 중 상위 K에서 찾은 비율"""
rec_k = recommended[:k]
hits = len(set(rec_k) & set(relevant))
return hits / len(relevant) if relevant else 0
def average_precision_at_k(recommended, relevant, k):
"""AP@K: Precision@k의 누적 평균 (순위 반영)"""
if not relevant:
return 0.0
hits = 0
sum_prec = 0.0
for i, item in enumerate(recommended[:k]):
if item in relevant:
hits += 1
sum_prec += hits / (i + 1)
return sum_prec / min(len(relevant), k)
def ndcg_at_k(recommended, relevant, k):
"""NDCG@K: Normalized Discounted Cumulative Gain"""
relevance = [1 if item in relevant else 0 for item in recommended[:k]]
if not any(relevance):
return 0.0
# DCG
dcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(relevance))
# Ideal DCG
ideal = sorted(relevance, reverse=True)
idcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(ideal))
return dcg / idcg if idcg > 0 else 0.0
# 예시 평가
recommended = [1, 4, 7, 2, 9, 3, 5, 6, 8, 10] # 추천된 아이템 ID
relevant = {1, 2, 5, 7, 8} # 실제 관련 아이템
print("=" * 40)
print("추천 시스템 평가 지표")
print("=" * 40)
for k in [5, 10]:
p = precision_at_k(recommended, relevant, k)
r = recall_at_k(recommended, relevant, k)
ap = average_precision_at_k(recommended, relevant, k)
n = ndcg_at_k(recommended, relevant, k)
print(f"\nk = {k}")
print(f" Precision@{k}: {p:.4f}")
print(f" Recall@{k}: {r:.4f}")
print(f" AP@{k}: {ap:.4f}")
print(f" NDCG@{k}: {n:.4f}")
2. 행렬 분해 (Matrix Factorization)
2.1 기본 개념
행렬 분해는 사용자-아이템 상호작용 행렬 R을 두 개의 낮은 차원 행렬로 분해합니다.
R ≈ U × V^T
- U: 사용자 임베딩 행렬 (n_users × k)
- V: 아이템 임베딩 행렬 (n_items × k)
- k: 잠재 요인(latent factor) 수
2.2 PyTorch로 구현하는 Matrix Factorization
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
# MovieLens 100K 시뮬레이션 데이터 생성
def generate_movielens_like_data(n_users=1000, n_items=500, n_ratings=50000):
np.random.seed(42)
# 사용자와 아이템의 잠재 특성 시뮬레이션
k = 20 # 잠재 요인 수
user_factors = np.random.randn(n_users, k) * 0.5
item_factors = np.random.randn(n_items, k) * 0.5
# 진짜 선호도 = 잠재 요인의 내적
true_ratings = user_factors @ item_factors.T
# 1-5 스케일로 변환
true_ratings = (true_ratings - true_ratings.min()) / (true_ratings.max() - true_ratings.min()) * 4 + 1
# 랜덤 샘플링으로 관측 데이터 생성
user_ids = np.random.choice(n_users, n_ratings)
item_ids = np.random.choice(n_items, n_ratings)
ratings = true_ratings[user_ids, item_ids] + np.random.randn(n_ratings) * 0.3
ratings = np.clip(ratings, 1, 5)
df = pd.DataFrame({'user_id': user_ids, 'item_id': item_ids, 'rating': ratings})
df = df.drop_duplicates(subset=['user_id', 'item_id'])
return df
# 데이터 로드
ratings_df = generate_movielens_like_data()
print(f"총 평점 수: {len(ratings_df)}")
print(f"사용자 수: {ratings_df['user_id'].nunique()}")
print(f"아이템 수: {ratings_df['item_id'].nunique()}")
print(f"평점 분포:\n{ratings_df['rating'].describe()}")
# 훈련/테스트 분리
train_df, test_df = train_test_split(ratings_df, test_size=0.2, random_state=42)
n_users = ratings_df['user_id'].max() + 1
n_items = ratings_df['item_id'].max() + 1
class RatingsDataset(Dataset):
def __init__(self, df):
self.users = torch.LongTensor(df['user_id'].values)
self.items = torch.LongTensor(df['item_id'].values)
self.ratings = torch.FloatTensor(df['rating'].values)
def __len__(self):
return len(self.ratings)
def __getitem__(self, idx):
return self.users[idx], self.items[idx], self.ratings[idx]
class MatrixFactorization(nn.Module):
"""기본 행렬 분해 모델"""
def __init__(self, n_users, n_items, n_factors=50):
super().__init__()
self.user_embedding = nn.Embedding(n_users, n_factors)
self.item_embedding = nn.Embedding(n_items, n_factors)
# 바이어스 항
self.user_bias = nn.Embedding(n_users, 1)
self.item_bias = nn.Embedding(n_items, 1)
# 전역 평균
self.global_bias = nn.Parameter(torch.zeros(1))
# 임베딩 초기화
nn.init.normal_(self.user_embedding.weight, mean=0, std=0.01)
nn.init.normal_(self.item_embedding.weight, mean=0, std=0.01)
nn.init.zeros_(self.user_bias.weight)
nn.init.zeros_(self.item_bias.weight)
def forward(self, user_ids, item_ids):
user_emb = self.user_embedding(user_ids)
item_emb = self.item_embedding(item_ids)
# 내적으로 기본 예측
dot_product = (user_emb * item_emb).sum(dim=1)
# 바이어스 추가
u_bias = self.user_bias(user_ids).squeeze()
i_bias = self.item_bias(item_ids).squeeze()
prediction = dot_product + u_bias + i_bias + self.global_bias
return prediction
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
mf_model = MatrixFactorization(n_users, n_items, n_factors=64).to(device)
train_loader = DataLoader(RatingsDataset(train_df), batch_size=512, shuffle=True)
test_loader = DataLoader(RatingsDataset(test_df), batch_size=512, shuffle=False)
optimizer = optim.Adam(mf_model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion = nn.MSELoss()
def train_rating_model(model, loader, optimizer, criterion, device):
model.train()
total_loss = 0
for users, items, ratings in loader:
users, items, ratings = users.to(device), items.to(device), ratings.to(device)
optimizer.zero_grad()
pred = model(users, items)
loss = criterion(pred, ratings)
loss.backward()
optimizer.step()
total_loss += loss.item() * len(ratings)
return (total_loss / len(loader.dataset)) ** 0.5 # RMSE
def evaluate_rating_model(model, loader, device):
model.eval()
all_preds, all_targets = [], []
with torch.no_grad():
for users, items, ratings in loader:
users, items = users.to(device), items.to(device)
pred = model(users, items).cpu().numpy()
all_preds.extend(pred)
all_targets.extend(ratings.numpy())
preds = np.array(all_preds)
targets = np.array(all_targets)
rmse = np.sqrt(((preds - targets)**2).mean())
mae = np.abs(preds - targets).mean()
return rmse, mae
for epoch in range(30):
train_rmse = train_rating_model(mf_model, train_loader, optimizer, criterion, device)
if (epoch + 1) % 10 == 0:
test_rmse, test_mae = evaluate_rating_model(mf_model, test_loader, device)
print(f"Epoch {epoch+1:2d} | Train RMSE: {train_rmse:.4f} | Test RMSE: {test_rmse:.4f} | MAE: {test_mae:.4f}")
2.3 BPR (Bayesian Personalized Ranking)
BPR은 암시적 피드백에 특화된 학습 방법으로, 사용자가 상호작용한 아이템을 그렇지 않은 아이템보다 더 선호한다고 가정합니다.
class BPRModel(nn.Module):
"""BPR 손실로 훈련되는 행렬 분해"""
def __init__(self, n_users, n_items, n_factors=64):
super().__init__()
self.user_embedding = nn.Embedding(n_users, n_factors)
self.item_embedding = nn.Embedding(n_items, n_factors)
nn.init.normal_(self.user_embedding.weight, 0, 0.01)
nn.init.normal_(self.item_embedding.weight, 0, 0.01)
def forward(self, user_ids, pos_item_ids, neg_item_ids):
user_emb = self.user_embedding(user_ids)
pos_emb = self.item_embedding(pos_item_ids)
neg_emb = self.item_embedding(neg_item_ids)
pos_score = (user_emb * pos_emb).sum(dim=1)
neg_score = (user_emb * neg_emb).sum(dim=1)
return pos_score, neg_score
def predict(self, user_ids, item_ids):
user_emb = self.user_embedding(user_ids)
item_emb = self.item_embedding(item_ids)
return (user_emb * item_emb).sum(dim=1)
def bpr_loss(pos_score, neg_score, reg_lambda=1e-5, model=None):
"""BPR 손실 = -log(sigmoid(pos - neg)) + 정규화"""
loss = -torch.log(torch.sigmoid(pos_score - neg_score)).mean()
if model and reg_lambda > 0:
reg = sum(p.norm(2) for p in model.parameters())
loss += reg_lambda * reg
return loss
# BPR용 데이터셋 (포지티브 + 랜덤 네거티브 샘플링)
class BPRDataset(Dataset):
def __init__(self, df, n_items):
self.users = df['user_id'].values
self.pos_items = df['item_id'].values
self.n_items = n_items
# 사용자별 상호작용 아이템 기록
self.user_items = df.groupby('user_id')['item_id'].apply(set).to_dict()
def __len__(self):
return len(self.users)
def __getitem__(self, idx):
user = self.users[idx]
pos = self.pos_items[idx]
# 네거티브 아이템 샘플링 (상호작용하지 않은 아이템)
neg = np.random.randint(self.n_items)
while neg in self.user_items.get(user, set()):
neg = np.random.randint(self.n_items)
return torch.LongTensor([user])[0], torch.LongTensor([pos])[0], torch.LongTensor([neg])[0]
bpr_train_dataset = BPRDataset(train_df, n_items)
bpr_loader = DataLoader(bpr_train_dataset, batch_size=512, shuffle=True)
bpr_model = BPRModel(n_users, n_items, n_factors=64).to(device)
bpr_optimizer = optim.Adam(bpr_model.parameters(), lr=1e-3)
for epoch in range(20):
bpr_model.train()
total_loss = 0
for users, pos_items, neg_items in bpr_loader:
users, pos_items, neg_items = users.to(device), pos_items.to(device), neg_items.to(device)
bpr_optimizer.zero_grad()
pos_score, neg_score = bpr_model(users, pos_items, neg_items)
loss = bpr_loss(pos_score, neg_score, model=bpr_model)
loss.backward()
bpr_optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 5 == 0:
print(f"BPR Epoch {epoch+1:2d} | Loss: {total_loss/len(bpr_loader):.4f}")
3. Neural Collaborative Filtering (NCF)
3.1 NCF 아키텍처
NCF는 행렬 분해를 딥러닝으로 확장한 모델입니다. 두 가지 컴포넌트를 결합합니다.
GMF (Generalized Matrix Factorization): 임베딩의 원소별 곱 (MF의 일반화) MLP (Multi-Layer Perceptron): 임베딩의 연결(concatenation)을 비선형 변환
class NCF(nn.Module):
"""
Neural Collaborative Filtering
He et al., 2017 (arxiv.org/abs/1708.05031)
"""
def __init__(self, n_users, n_items, n_factors=64, mlp_dims=None, dropout=0.2):
super().__init__()
if mlp_dims is None:
mlp_dims = [256, 128, 64]
# GMF 임베딩
self.gmf_user = nn.Embedding(n_users, n_factors)
self.gmf_item = nn.Embedding(n_items, n_factors)
# MLP 임베딩 (별도 임베딩 사용)
self.mlp_user = nn.Embedding(n_users, n_factors)
self.mlp_item = nn.Embedding(n_items, n_factors)
# MLP 레이어 구성
mlp_layers = []
input_size = n_factors * 2
for dim in mlp_dims:
mlp_layers.extend([
nn.Linear(input_size, dim),
nn.BatchNorm1d(dim),
nn.ReLU(),
nn.Dropout(dropout)
])
input_size = dim
self.mlp = nn.Sequential(*mlp_layers)
# GMF와 MLP 결합 후 최종 예측
self.output_layer = nn.Linear(n_factors + mlp_dims[-1], 1)
# 임베딩 초기화
for emb in [self.gmf_user, self.gmf_item, self.mlp_user, self.mlp_item]:
nn.init.normal_(emb.weight, 0, 0.01)
def forward(self, user_ids, item_ids):
# GMF 경로
gmf_u = self.gmf_user(user_ids)
gmf_i = self.gmf_item(item_ids)
gmf_out = gmf_u * gmf_i # 원소별 곱 (n_factors)
# MLP 경로
mlp_u = self.mlp_user(user_ids)
mlp_i = self.mlp_item(item_ids)
mlp_in = torch.cat([mlp_u, mlp_i], dim=1) # (batch, 2*n_factors)
mlp_out = self.mlp(mlp_in) # (batch, mlp_dims[-1])
# 결합
combined = torch.cat([gmf_out, mlp_out], dim=1)
output = torch.sigmoid(self.output_layer(combined)).squeeze()
return output
# 암시적 피드백 데이터셋 (클릭 여부: 0 또는 1)
class ImplicitDataset(Dataset):
def __init__(self, pos_df, n_items, neg_ratio=4):
self.users = []
self.items = []
self.labels = []
self.n_items = n_items
user_items = pos_df.groupby('user_id')['item_id'].apply(set).to_dict()
for _, row in pos_df.iterrows():
user, item = row['user_id'], row['item_id']
# 포지티브 샘플
self.users.append(user)
self.items.append(item)
self.labels.append(1.0)
# 네거티브 샘플
for _ in range(neg_ratio):
neg = np.random.randint(n_items)
while neg in user_items.get(user, set()):
neg = np.random.randint(n_items)
self.users.append(user)
self.items.append(neg)
self.labels.append(0.0)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
return (torch.LongTensor([self.users[idx]])[0],
torch.LongTensor([self.items[idx]])[0],
torch.FloatTensor([self.labels[idx]])[0])
# 암시적 데이터셋 생성 (평점 >= 3.5인 경우 포지티브로 간주)
implicit_train = train_df[train_df['rating'] >= 3.5].copy()
implicit_dataset = ImplicitDataset(implicit_train, n_items, neg_ratio=4)
implicit_loader = DataLoader(implicit_dataset, batch_size=1024, shuffle=True)
ncf_model = NCF(n_users, n_items, n_factors=64, mlp_dims=[256, 128, 64]).to(device)
ncf_optimizer = optim.Adam(ncf_model.parameters(), lr=1e-3, weight_decay=1e-5)
bce_loss = nn.BCELoss()
print(f"NCF 파라미터 수: {sum(p.numel() for p in ncf_model.parameters()):,}")
for epoch in range(20):
ncf_model.train()
total_loss = 0
for users, items, labels in implicit_loader:
users, items, labels = users.to(device), items.to(device), labels.to(device)
ncf_optimizer.zero_grad()
pred = ncf_model(users, items)
loss = bce_loss(pred, labels)
loss.backward()
ncf_optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 5 == 0:
print(f"NCF Epoch {epoch+1:2d} | Loss: {total_loss/len(implicit_loader):.4f}")
4. 투 타워 모델 (Two-Tower Model)
4.1 아키텍처 개요
투 타워 모델(Dual Encoder / Bi-Encoder)은 유저 타워와 아이템 타워를 각각 독립적으로 학습합니다. 두 타워의 임베딩 유사도를 점수로 사용합니다.
대규모 서비스에서의 장점:
- 아이템 임베딩 사전 계산 가능 (오프라인)
- 빠른 근사 최근접 이웃(ANN) 검색
- 수십억 개의 아이템에도 확장 가능
YouTube, Google, Spotify, Pinterest 등 대형 서비스에서 사용합니다.
class UserTower(nn.Module):
"""유저 타워: 사용자 특성을 임베딩으로 변환"""
def __init__(self, n_users, user_feature_dim, embed_dim=128, hidden_dims=None):
super().__init__()
if hidden_dims is None:
hidden_dims = [256, 128]
# ID 임베딩
self.id_embedding = nn.Embedding(n_users, embed_dim)
# 특성 처리 네트워크
layers = []
input_dim = embed_dim + user_feature_dim
for h in hidden_dims:
layers.extend([nn.Linear(input_dim, h), nn.LayerNorm(h), nn.ReLU(), nn.Dropout(0.1)])
input_dim = h
layers.append(nn.Linear(input_dim, embed_dim))
self.network = nn.Sequential(*layers)
def forward(self, user_ids, user_features):
id_emb = self.id_embedding(user_ids)
combined = torch.cat([id_emb, user_features], dim=1)
return nn.functional.normalize(self.network(combined), dim=-1) # L2 정규화
class ItemTower(nn.Module):
"""아이템 타워: 아이템 특성을 임베딩으로 변환"""
def __init__(self, n_items, item_feature_dim, embed_dim=128, hidden_dims=None):
super().__init__()
if hidden_dims is None:
hidden_dims = [256, 128]
self.id_embedding = nn.Embedding(n_items, embed_dim)
layers = []
input_dim = embed_dim + item_feature_dim
for h in hidden_dims:
layers.extend([nn.Linear(input_dim, h), nn.LayerNorm(h), nn.ReLU(), nn.Dropout(0.1)])
input_dim = h
layers.append(nn.Linear(input_dim, embed_dim))
self.network = nn.Sequential(*layers)
def forward(self, item_ids, item_features):
id_emb = self.id_embedding(item_ids)
combined = torch.cat([id_emb, item_features], dim=1)
return nn.functional.normalize(self.network(combined), dim=-1)
class TwoTowerModel(nn.Module):
"""두 타워를 결합한 전체 모델"""
def __init__(self, n_users, n_items, user_feat_dim, item_feat_dim, embed_dim=128):
super().__init__()
self.user_tower = UserTower(n_users, user_feat_dim, embed_dim)
self.item_tower = ItemTower(n_items, item_feat_dim, embed_dim)
self.temperature = nn.Parameter(torch.tensor(0.07))
def forward(self, user_ids, user_features, item_ids, item_features):
user_emb = self.user_tower(user_ids, user_features)
item_emb = self.item_tower(item_ids, item_features)
return user_emb, item_emb
def compute_similarity(self, user_emb, item_emb):
"""배치 내 인-배치 네거티브를 사용하는 InfoNCE 손실을 위한 유사도 행렬"""
return torch.matmul(user_emb, item_emb.T) / self.temperature.exp()
def info_nce_loss(similarity_matrix):
"""In-batch negative sampling을 이용한 InfoNCE 손실"""
batch_size = similarity_matrix.size(0)
labels = torch.arange(batch_size, device=similarity_matrix.device)
loss = nn.CrossEntropyLoss()(similarity_matrix, labels)
return loss
# 임시 특성 데이터 생성
user_feature_dim = 16
item_feature_dim = 32
np.random.seed(42)
user_features = torch.FloatTensor(np.random.randn(n_users, user_feature_dim))
item_features = torch.FloatTensor(np.random.randn(n_items, item_feature_dim))
two_tower = TwoTowerModel(n_users, n_items, user_feature_dim, item_feature_dim, embed_dim=128).to(device)
print(f"Two-Tower 파라미터: {sum(p.numel() for p in two_tower.parameters()):,}")
# 추론 시 아이템 임베딩 미리 계산
def precompute_item_embeddings(model, n_items, item_features, batch_size=256, device='cpu'):
"""전체 아이템 임베딩 사전 계산 (서빙 시 효율을 위해)"""
model.eval()
all_item_embs = []
with torch.no_grad():
for start in range(0, n_items, batch_size):
end = min(start + batch_size, n_items)
ids = torch.arange(start, end, device=device)
feat = item_features[start:end].to(device)
emb = model.item_tower(ids, feat)
all_item_embs.append(emb.cpu())
return torch.cat(all_item_embs, dim=0)
item_emb_cache = precompute_item_embeddings(two_tower, n_items, item_features, device=device)
print(f"사전 계산된 아이템 임베딩 형태: {item_emb_cache.shape}")
4.2 Faiss를 이용한 근사 최근접 이웃 검색
def demo_faiss_search():
"""
Faiss를 이용한 ANN 검색 예시
설치: pip install faiss-cpu (또는 faiss-gpu)
"""
usage_note = """
import faiss
embed_dim = 128
n_items = 1000000 # 100만 아이템
# FAISS 인덱스 생성
index = faiss.IndexFlatIP(embed_dim) # 내적(IP) 기반 검색
# 또는 근사 검색 (더 빠름):
# index = faiss.IndexIVFFlat(faiss.IndexFlatIP(embed_dim), embed_dim, 100)
# index.train(item_embeddings)
# 아이템 임베딩 추가 (L2 정규화된 벡터 사용 권장)
item_embeddings = item_emb_cache.numpy().astype('float32')
faiss.normalize_L2(item_embeddings)
index.add(item_embeddings)
# 사용자 쿼리 검색
user_query = user_emb.numpy().astype('float32')
faiss.normalize_L2(user_query)
k = 100 # Top-K 검색
scores, indices = index.search(user_query, k)
print(f"Top-{k} 추천 아이템: {indices[0]}")
print(f"유사도 점수: {scores[0]}")
"""
print("Faiss ANN 검색:")
print(" - IndexFlatIP: 정확한 내적 검색 (소규모)")
print(" - IndexIVFFlat: 역파일 인덱스 (중규모)")
print(" - IndexHNSW: 계층적 그래프 (대규모, 빠름)")
print(" - IndexPQ: 제품 양자화 (메모리 효율)")
demo_faiss_search()
5. 시퀀스 기반 추천
5.1 SASRec (Self-Attentive Sequential Recommendation)
SASRec은 Transformer의 Self-Attention을 사용하여 사용자 행동 시퀀스의 중요한 아이템을 선택합니다.
class SASRecBlock(nn.Module):
"""SASRec Transformer 블록"""
def __init__(self, d_model, n_heads, dropout=0.1):
super().__init__()
self.attention = nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True)
self.feed_forward = nn.Sequential(
nn.Linear(d_model, d_model * 4),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(d_model * 4, d_model),
nn.Dropout(dropout)
)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
def forward(self, x, attention_mask=None):
# 인과적 마스크 (미래 아이템을 보지 않도록)
seq_len = x.size(1)
causal_mask = torch.triu(torch.ones(seq_len, seq_len, device=x.device), diagonal=1).bool()
# Self-Attention + 잔차 연결
attn_out, _ = self.attention(x, x, x, attn_mask=causal_mask)
x = self.norm1(x + attn_out)
# FFN + 잔차 연결
x = self.norm2(x + self.feed_forward(x))
return x
class SASRec(nn.Module):
"""
SASRec: Self-Attentive Sequential Recommendation
Kang & McAuley, 2018 (arxiv.org/abs/1808.09781)
"""
def __init__(self, n_items, max_seq_len, d_model=128, n_heads=4,
num_layers=2, dropout=0.1):
super().__init__()
self.item_embedding = nn.Embedding(n_items + 1, d_model, padding_idx=0) # 0은 패딩
self.pos_embedding = nn.Embedding(max_seq_len, d_model)
self.blocks = nn.ModuleList([SASRecBlock(d_model, n_heads, dropout) for _ in range(num_layers)])
self.dropout = nn.Dropout(dropout)
self.norm = nn.LayerNorm(d_model)
self.d_model = d_model
self.max_seq = max_seq_len
def forward(self, item_seq):
"""
item_seq: (batch, seq_len) — 아이템 ID 시퀀스 (0은 패딩)
Returns: (batch, seq_len, d_model) — 각 위치의 표현
"""
seq_len = item_seq.size(1)
positions = torch.arange(seq_len, device=item_seq.device).unsqueeze(0)
x = self.item_embedding(item_seq) + self.pos_embedding(positions)
x = self.dropout(x)
for block in self.blocks:
x = block(x)
return self.norm(x)
def predict(self, item_seq, candidate_item_ids):
"""
주어진 시퀀스 다음에 올 아이템 예측
Args:
item_seq: (batch, seq_len)
candidate_item_ids: (batch, n_candidates)
Returns:
scores: (batch, n_candidates)
"""
seq_repr = self.forward(item_seq)
# 마지막 비패딩 위치의 표현 사용
last_repr = seq_repr[:, -1, :] # (batch, d_model)
cand_emb = self.item_embedding(candidate_item_ids) # (batch, n_cand, d_model)
scores = (last_repr.unsqueeze(1) * cand_emb).sum(-1) # (batch, n_cand)
return scores
# 시퀀스 데이터셋 생성
class SequentialDataset(Dataset):
def __init__(self, ratings_df, max_seq_len=50, min_seq_len=5):
self.max_seq_len = max_seq_len
self.sequences = []
# 사용자별 아이템 시퀀스 구성 (시간 순서)
user_sequences = ratings_df.sort_values('user_id').groupby('user_id')['item_id'].apply(list)
for user_id, items in user_sequences.items():
if len(items) < min_seq_len:
continue
# 마지막 아이템을 타겟으로, 나머지를 입력으로
for i in range(min_seq_len, len(items) + 1):
seq = items[max(0, i-max_seq_len-1):i-1]
target = items[i-1]
# 패딩 (왼쪽 패딩으로 최신 아이템이 오른쪽에)
padded_seq = [0] * (max_seq_len - len(seq)) + seq
padded_seq = padded_seq[-max_seq_len:] # max_seq_len으로 자르기
self.sequences.append((padded_seq, target + 1)) # +1 (0은 패딩용)
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq, target = self.sequences[idx]
return torch.LongTensor(seq), torch.LongTensor([target])[0]
# 모델 생성
sasrec = SASRec(
n_items=n_items,
max_seq_len=50,
d_model=128,
n_heads=4,
num_layers=2
).to(device)
print(f"SASRec 파라미터: {sum(p.numel() for p in sasrec.parameters()):,}")
5.2 BERT4Rec
BERT4Rec은 BERT의 마스크드 언어 모델(MLM)을 시퀀스 추천에 적용합니다. 무작위로 아이템을 마스킹하고 예측하여 양방향 컨텍스트를 학습합니다.
class BERT4Rec(nn.Module):
"""
BERT4Rec: Sequential Recommendation with BERT
Sun et al., 2019
"""
def __init__(self, n_items, max_seq_len, d_model=256, n_heads=4,
num_layers=2, dropout=0.1, mask_prob=0.15):
super().__init__()
self.mask_token = n_items + 1 # 마스크 토큰 ID
self.n_items = n_items
self.mask_prob = mask_prob
self.item_embedding = nn.Embedding(n_items + 2, d_model, padding_idx=0)
self.pos_embedding = nn.Embedding(max_seq_len, d_model)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=n_heads,
dim_feedforward=d_model*4, dropout=dropout, batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.norm = nn.LayerNorm(d_model)
self.output = nn.Linear(d_model, n_items + 2)
def forward(self, item_seq):
seq_len = item_seq.size(1)
positions = torch.arange(seq_len, device=item_seq.device).unsqueeze(0)
x = self.item_embedding(item_seq) + self.pos_embedding(positions)
x = self.transformer(x)
x = self.norm(x)
return self.output(x)
def mask_sequence(self, item_seq):
"""훈련 시 랜덤 마스킹"""
masked_seq = item_seq.clone()
mask = (torch.rand_like(item_seq.float()) < self.mask_prob) & (item_seq != 0)
masked_seq[mask] = self.mask_token
return masked_seq, mask
6. 그래프 기반 추천: LightGCN
6.1 LightGCN 아키텍처
LightGCN(Light Graph Convolution Network)은 사용자-아이템 이분 그래프에서 메시지 패싱으로 고차 연결성을 학습합니다. 불필요한 컴포넌트(특성 변환, 비선형 활성화)를 제거하여 효율적입니다.
class LightGCN(nn.Module):
"""
LightGCN: Simplifying and Powering Graph Convolution Network
He et al., 2020 (arxiv.org/abs/2202.01151)
"""
def __init__(self, n_users, n_items, embed_dim=64, n_layers=3):
super().__init__()
self.n_users = n_users
self.n_items = n_items
self.n_layers = n_layers
self.embed_dim = embed_dim
# 임베딩 초기화
self.user_embedding = nn.Embedding(n_users, embed_dim)
self.item_embedding = nn.Embedding(n_items, embed_dim)
nn.init.normal_(self.user_embedding.weight, std=0.1)
nn.init.normal_(self.item_embedding.weight, std=0.1)
def compute_normalized_adj(self, interactions, device):
"""
정규화된 인접 행렬 계산
A_hat = D^(-1/2) * A * D^(-1/2)
"""
n_nodes = self.n_users + self.n_items
# 사용자-아이템 상호작용을 엣지로 변환
user_ids = interactions[:, 0]
item_ids = interactions[:, 1] + self.n_users
row = torch.cat([user_ids, item_ids])
col = torch.cat([item_ids, user_ids])
edge_index = torch.stack([row, col]).to(device)
# 차수 계산
deg = torch.zeros(n_nodes, device=device)
deg.scatter_add_(0, row, torch.ones(len(row), device=device))
# D^(-1/2) 계산
deg_inv_sqrt = deg.pow(-0.5)
deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0
# 정규화 가중치
edge_weight = deg_inv_sqrt[row] * deg_inv_sqrt[col]
return edge_index, edge_weight, n_nodes
def forward(self, interactions):
"""
모든 레이어의 임베딩 평균으로 최종 임베딩 계산
"""
device = self.user_embedding.weight.device
edge_index, edge_weight, n_nodes = self.compute_normalized_adj(interactions, device)
# 초기 임베딩
all_emb = torch.cat([
self.user_embedding.weight,
self.item_embedding.weight
], dim=0)
layer_embs = [all_emb]
# 그래프 합성곱 레이어
for _ in range(self.n_layers):
# 스파스 행렬-벡터 곱 (메시지 패싱)
new_emb = torch.zeros_like(all_emb)
new_emb.scatter_add_(
0,
edge_index[1].unsqueeze(1).expand(-1, self.embed_dim),
all_emb[edge_index[0]] * edge_weight.unsqueeze(1)
)
all_emb = new_emb
layer_embs.append(all_emb)
# 모든 레이어 평균
final_emb = torch.stack(layer_embs, dim=0).mean(dim=0)
user_emb = final_emb[:self.n_users]
item_emb = final_emb[self.n_users:]
return user_emb, item_emb
def bpr_loss(self, user_emb, item_emb, users, pos_items, neg_items, reg_lambda=1e-4):
"""BPR 손실 계산"""
u_emb = user_emb[users]
p_emb = item_emb[pos_items]
n_emb = item_emb[neg_items]
pos_score = (u_emb * p_emb).sum(-1)
neg_score = (u_emb * n_emb).sum(-1)
loss = -torch.log(torch.sigmoid(pos_score - neg_score)).mean()
# L2 정규화
reg = (self.user_embedding.weight[users].norm(2).pow(2) +
self.item_embedding.weight[pos_items].norm(2).pow(2) +
self.item_embedding.weight[neg_items].norm(2).pow(2)) / (2 * len(users))
return loss + reg_lambda * reg
# LightGCN 모델 생성
lightgcn = LightGCN(n_users, n_items, embed_dim=64, n_layers=3).to(device)
# 상호작용 행렬 생성
interactions = torch.LongTensor(train_df[['user_id', 'item_id']].values)
print(f"LightGCN 파라미터: {sum(p.numel() for p in lightgcn.parameters()):,}")
print(f"상호작용 수: {len(interactions)}")
7. LLM 기반 추천
7.1 LLM을 추천에 활용하는 방법
LLM은 추천 시스템에 다양한 방식으로 활용됩니다.
- 아이템 특성 인코딩: LLM으로 아이템 설명을 임베딩
- 프롬프트 기반 추천: 직접 프롬프트로 추천 요청
- 사용자 프로파일 텍스트 생성: 사용자 행동을 텍스트로 변환
- 이유 생성 (Explainability): 추천 이유 텍스트 생성
from transformers import AutoTokenizer, AutoModel
import torch.nn.functional as F
def mean_pooling(model_output, attention_mask):
"""토큰 임베딩의 평균 풀링"""
token_embeddings = model_output[0]
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
class LLMItemEncoder:
"""LLM으로 아이템 설명을 임베딩으로 변환"""
def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
"""
실제 사용 시:
pip install transformers sentence-transformers
"""
self.model_name = model_name
print(f"LLM 인코더 초기화: {model_name}")
def encode(self, texts, batch_size=32):
"""
텍스트 배치를 임베딩으로 변환
실제 코드:
tokenizer = AutoTokenizer.from_pretrained(self.model_name)
model = AutoModel.from_pretrained(self.model_name)
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
encoded = tokenizer(batch, padding=True, truncation=True,
max_length=128, return_tensors='pt')
with torch.no_grad():
output = model(**encoded)
emb = mean_pooling(output, encoded['attention_mask'])
emb = F.normalize(emb, dim=1)
embeddings.append(emb)
return torch.cat(embeddings, dim=0)
"""
# 시뮬레이션
return torch.randn(len(texts), 384)
# 영화 아이템 설명 예시
movie_descriptions = [
"A thrilling sci-fi adventure set in space with stunning visual effects",
"A heartwarming romantic comedy about finding love in unexpected places",
"An intense psychological thriller with unexpected plot twists",
"An animated fantasy film perfect for families and children",
"A gripping crime drama based on true events"
]
encoder = LLMItemEncoder()
item_embeddings = encoder.encode(movie_descriptions)
print(f"LLM 아이템 임베딩 형태: {item_embeddings.shape}")
# 유사도 계산
similarity_matrix = torch.matmul(item_embeddings, item_embeddings.T)
print("\n아이템 간 유사도 행렬:")
print(similarity_matrix.numpy().round(3))
7.2 프롬프트 기반 추천 시스템
def build_recommendation_prompt(user_history, candidate_items, user_profile=None):
"""
LLM 추천을 위한 프롬프트 구성
"""
history_text = "\n".join([f" - {item}" for item in user_history])
candidates_text = "\n".join([
f" {i+1}. {item}" for i, item in enumerate(candidate_items)
])
profile_text = f"\n사용자 프로필: {user_profile}" if user_profile else ""
prompt = f"""당신은 개인화된 영화 추천 전문가입니다.{profile_text}
사용자가 최근에 좋아한 영화들:
{history_text}
다음 후보 영화들 중 사용자가 가장 좋아할 것 같은 영화를 순서대로 추천해주세요.
각 추천에 대한 이유도 한 문장으로 설명해주세요.
후보 영화들:
{candidates_text}
추천 결과를 다음 형식으로 제공해주세요:
1. [영화명] - [추천 이유]
2. [영화명] - [추천 이유]
3. [영화명] - [추천 이유]"""
return prompt
# 예시 사용
user_history = [
"인터스텔라 (2014)",
"매트릭스 (1999)",
"블레이드 러너 2049 (2017)"
]
candidates = [
"아바타: 물의 길 (2022)",
"노트북 (2004)",
"기생충 (2019)",
"듄 (2021)",
"어바웃 타임 (2013)"
]
prompt = build_recommendation_prompt(
user_history=user_history,
candidate_items=candidates,
user_profile="SF와 스릴러 장르 선호, 시각효과와 세계관 설정을 중시"
)
print("생성된 프롬프트:")
print("=" * 60)
print(prompt)
print("=" * 60)
8. 산업 수준 추천 시스템
8.1 멀티 스테이지 아키텍처
실제 대규모 추천 시스템은 여러 단계로 구성됩니다.
class IndustrialRecSystem:
"""
산업 수준 추천 시스템 아키텍처 개요
3단계:
1. 검색(Retrieval): 수백만 → 수백 개 후보 선별
2. 랭킹(Ranking): 수백 → 수십 개 정밀 재순위
3. 재랭킹(Re-Ranking): 다양성, 신선도, 비즈니스 규칙 적용
"""
def __init__(self):
print("산업 수준 추천 시스템 초기화")
print("=" * 50)
print("아키텍처:")
print(" 1단계 검색: Two-Tower + ANN (밀리초 내)")
print(" 2단계 랭킹: DCN/xDeepFM (복잡한 특성 교차)")
print(" 3단계 재랭킹: MMR/DPP (다양성 보장)")
def retrieval_stage(self, user_embedding, item_index, k=500):
"""단계 1: 후보 검색"""
# Faiss ANN 검색으로 빠르게 Top-K 후보 선별
print(f"\n[검색 단계] {k}개 후보 선별")
return list(range(k))
def ranking_stage(self, user_features, candidate_items, context_features):
"""단계 2: 정밀 랭킹"""
print(f"\n[랭킹 단계] {len(candidate_items)}개 → 50개 정밀 랭킹")
return candidate_items[:50]
def reranking_stage(self, ranked_items, diversity_weight=0.3):
"""단계 3: 재랭킹 (다양성 + 신선도)"""
print(f"\n[재랭킹 단계] 다양성 가중치: {diversity_weight}")
# MMR (Maximal Marginal Relevance)로 다양성 보장
return ranked_items[:20]
class DeepCrossNetwork(nn.Module):
"""
DCN (Deep & Cross Network): 자동 특성 교차 학습
Wang et al., 2017
"""
def __init__(self, input_dim, cross_layers=3, deep_dims=None, dropout=0.1):
super().__init__()
if deep_dims is None:
deep_dims = [256, 128, 64]
self.input_dim = input_dim
self.cross_layers = cross_layers
# Cross Network (다항 특성 교차)
self.cross_weights = nn.ParameterList([
nn.Parameter(torch.randn(input_dim, 1)) for _ in range(cross_layers)
])
self.cross_biases = nn.ParameterList([
nn.Parameter(torch.zeros(input_dim)) for _ in range(cross_layers)
])
# Deep Network
deep_layers = []
in_dim = input_dim
for dim in deep_dims:
deep_layers.extend([
nn.Linear(in_dim, dim), nn.LayerNorm(dim), nn.ReLU(), nn.Dropout(dropout)
])
in_dim = dim
self.deep = nn.Sequential(*deep_layers)
# 최종 예측
self.output = nn.Linear(input_dim + deep_dims[-1], 1)
def cross_forward(self, x0, x):
"""Cross Network 포워드"""
for w, b in zip(self.cross_weights, self.cross_biases):
x = x0 * (torch.matmul(x, w) + b.unsqueeze(0)) + x
return x
def forward(self, x):
x0 = x.clone()
cross_out = self.cross_forward(x0, x)
deep_out = self.deep(x)
combined = torch.cat([cross_out, deep_out], dim=1)
return torch.sigmoid(self.output(combined)).squeeze()
dcn = DeepCrossNetwork(input_dim=128, cross_layers=3).to(device)
print(f"DCN 파라미터: {sum(p.numel() for p in dcn.parameters()):,}")
8.2 콜드 스타트 문제 해결
class ColdStartSolver:
"""콜드 스타트 문제 해결 전략"""
@staticmethod
def content_based_for_new_items(item_description, item_encoder, existing_item_embs):
"""
새 아이템 콜드 스타트:
콘텐츠 기반으로 유사 아이템 찾기
"""
new_item_emb = item_encoder.encode([item_description])
similarities = torch.matmul(new_item_emb, existing_item_embs.T)
similar_items = similarities.topk(5).indices[0]
return similar_items
@staticmethod
def demographic_for_new_users(user_demographics, user_groups):
"""
새 사용자 콜드 스타트:
인구통계 기반 그룹 추천
"""
# 나이, 지역, 관심사로 유사 사용자 그룹 찾기
print("새 사용자 콜드 스타트 전략:")
print(" 1. 온보딩 설문 (선호 장르, 인기도 취향)")
print(" 2. 인구통계 기반 그룹 추천")
print(" 3. 탐색-활용 균형 (Explore-Exploit)")
print(" 4. 암시적 피드백 빠른 수집")
@staticmethod
def explore_exploit_bandit(n_items, exploration_rate=0.1):
"""
Epsilon-Greedy 탐색-활용 균형
"""
if np.random.random() < exploration_rate:
# 탐색: 랜덤 추천
return np.random.randint(n_items)
else:
# 활용: 현재 최선 추천
return 0 # 최고 점수 아이템
ColdStartSolver.demographic_for_new_users(None, None)
9. 실전 구현: Surprise 라이브러리
9.1 Surprise를 이용한 빠른 추천 시스템 구축
def demo_surprise():
"""
Surprise 라이브러리를 이용한 협업 필터링
설치: pip install scikit-surprise
"""
usage_note = """
from surprise import Dataset, Reader, SVD, KNNBasic, NMF
from surprise.model_selection import cross_validate, train_test_split
from surprise import accuracy
# MovieLens 100K 데이터 로드
data = Dataset.load_builtin('ml-100k')
# 훈련/테스트 분리
trainset, testset = train_test_split(data, test_size=0.2, random_state=42)
# SVD 모델
svd = SVD(n_factors=100, n_epochs=20, lr_all=0.005, reg_all=0.02)
svd.fit(trainset)
predictions = svd.test(testset)
print(f"SVD RMSE: {accuracy.rmse(predictions):.4f}")
print(f"SVD MAE: {accuracy.mae(predictions):.4f}")
# 교차 검증
cv_results = cross_validate(SVD(), data, measures=['RMSE', 'MAE'], cv=5, verbose=True)
print(f"평균 RMSE: {cv_results['test_rmse'].mean():.4f}")
# KNN 기반 협업 필터링
knn_user = KNNBasic(k=40, sim_options={'name': 'pearson', 'user_based': True})
knn_item = KNNBasic(k=40, sim_options={'name': 'pearson', 'user_based': False})
# 특정 사용자에 대한 상위 추천
user_id = '196'
inner_id = trainset.to_inner_uid(user_id)
user_ratings = trainset.ur[inner_id]
# 아직 평가하지 않은 아이템 추천
all_items = set(trainset.all_items())
rated_items = set(iid for iid, _ in user_ratings)
unrated = all_items - rated_items
predictions_unrated = [svd.predict(user_id, trainset.to_raw_iid(iid)) for iid in unrated]
top10 = sorted(predictions_unrated, key=lambda x: x.est, reverse=True)[:10]
print(f"사용자 {user_id}의 Top-10 추천:")
for pred in top10:
print(f" 아이템 {pred.iid}: 예측 평점 {pred.est:.2f}")
"""
print("Surprise 라이브러리 주요 알고리즘:")
print(" - SVD: 행렬 분해 (넷플릭스 프라이즈 우승 알고리즘 기반)")
print(" - SVD++: 암시적 피드백 포함 SVD")
print(" - NMF: 비음수 행렬 분해")
print(" - KNNBasic/Means/Baseline: K-최근접 이웃")
demo_surprise()
9.2 LightFM을 이용한 하이브리드 추천
def demo_lightfm():
"""
LightFM: 협업 필터링 + 콘텐츠 기반 하이브리드
설치: pip install lightfm
"""
usage_note = """
from lightfm import LightFM
from lightfm.data import Dataset
from lightfm.evaluation import precision_at_k, auc_score
from lightfm.datasets import fetch_movielens
# MovieLens 데이터 로드
movielens = fetch_movielens()
train = movielens['train']
test = movielens['test']
# BPR 손실 (순위 학습)
model_bpr = LightFM(
no_components=30,
loss='bpr',
learning_rate=0.05,
item_alpha=1e-6,
user_alpha=1e-6
)
model_bpr.fit(train, epochs=30, num_threads=4, verbose=True)
# WARP 손실 (더 강력한 랭킹)
model_warp = LightFM(
no_components=30,
loss='warp',
learning_rate=0.05
)
model_warp.fit(train, epochs=30, num_threads=4)
# 평가
test_precision_bpr = precision_at_k(model_bpr, test, k=10).mean()
test_precision_warp = precision_at_k(model_warp, test, k=10).mean()
print(f"BPR Precision@10: {test_precision_bpr:.4f}")
print(f"WARP Precision@10: {test_precision_warp:.4f}")
# 아이템 특성 추가 (하이브리드)
dataset = Dataset()
dataset.fit(
users=range(n_users),
items=range(n_items),
item_features=['genre:action', 'genre:comedy', 'genre:drama']
)
model_hybrid = LightFM(no_components=30, loss='warp')
model_hybrid.fit(
interactions,
item_features=item_feature_matrix,
epochs=30
)
"""
print("LightFM 하이브리드 추천:")
print(" - 협업 필터링 + 아이템/유저 특성 결합")
print(" - BPR, WARP, logistic, warp-kos 손실 지원")
print(" - 콜드 스타트 문제 완화 (콘텐츠 특성 활용)")
demo_lightfm()
10. 모델 성능 비교 및 선택 가이드
# 추천 시스템 모델 비교
comparison = pd.DataFrame({
'Model': [
'User-based KNN',
'SVD (Matrix Factorization)',
'BPR-MF',
'Neural CF (NCF)',
'Two-Tower',
'SASRec',
'LightGCN',
'LLM-based'
],
'Precision@10': [0.042, 0.061, 0.068, 0.075, 0.072, 0.089, 0.085, 0.078],
'Recall@10': [0.134, 0.198, 0.221, 0.244, 0.238, 0.289, 0.279, 0.261],
'NDCG@10': [0.089, 0.124, 0.138, 0.158, 0.154, 0.187, 0.179, 0.169],
'Training Time':['1min', '5min', '3min', '20min', '30min', '25min', '40min', '60min+'],
'Scale': ['Small', 'Medium', 'Medium', 'Large', 'Very Large', 'Large', 'Large', 'Any'],
'Cold Start': ['Poor', 'Poor', 'Poor', 'Poor', 'Good', 'Fair', 'Fair', 'Excellent']
})
print("추천 시스템 모델 성능 비교 (MovieLens 1M 기준)")
print("=" * 90)
print(comparison.to_string(index=False))
print("\n모델 선택 가이드:")
print(" - 소규모 (~100K 상호작용): SVD, User-based KNN")
print(" - 중규모 (~1M 상호작용): NCF, BPR-MF")
print(" - 대규모 (10M+): Two-Tower + LightGCN + SASRec")
print(" - 콜드 스타트 중요: LLM 기반 인코딩 + Two-Tower")
print(" - 실시간 추천: Two-Tower (사전 계산된 임베딩) + Faiss")
마무리
이 가이드에서는 추천 시스템의 전체 스펙트럼을 다루었습니다.
핵심 요약:
- 기초 이해: 협업 필터링의 원리와 평가 지표 (Precision@K, NDCG)
- 행렬 분해: SVD, BPR — 강력한 기준선 모델
- NCF: 딥러닝으로 MF의 한계 극복
- Two-Tower: 대규모 서비스의 핵심 아키텍처
- 시퀀스 모델: SASRec, BERT4Rec — 사용자 행동 순서 활용
- 그래프 모델: LightGCN — 고차 연결성 학습
- LLM 추천: 텍스트 의미 이해로 콜드 스타트 해결
실전 조언:
- 항상 BPR-MF로 강력한 기준선부터 구축하세요
- 서비스 규모가 커지면 멀티 스테이지 아키텍처가 필수입니다
- 시퀀스 정보가 있다면 SASRec이 뛰어납니다
- 콜드 스타트가 중요하면 LLM 임베딩을 활용하세요
참고 자료: