Skip to content

Split View: 딥러닝 추천 시스템 완전 가이드: 협업 필터링부터 LLM 기반 추천까지

|

딥러닝 추천 시스템 완전 가이드: 협업 필터링부터 LLM 기반 추천까지

들어가며

추천 시스템은 Netflix의 영화 추천, Amazon의 상품 추천, Spotify의 음악 추천 등 현대 디지털 서비스의 핵심입니다. 넷플릭스는 추천 시스템 덕분에 연간 10억 달러 이상의 가치를 창출한다고 알려져 있습니다.

이 가이드는 고전적인 협업 필터링부터 그래프 신경망, 그리고 LLM 기반 추천까지 완전한 여정을 제공합니다. 모든 섹션에는 실행 가능한 PyTorch 코드가 포함되어 있습니다.


1. 추천 시스템 기초

1.1 추천 시스템의 세 가지 유형

**협업 필터링 (Collaborative Filtering)**은 유사한 사용자나 아이템의 패턴을 활용합니다.

  • 사용자 기반: "당신과 비슷한 사람들이 이것을 좋아했습니다"
  • 아이템 기반: "당신이 좋아했던 아이템과 비슷한 아이템입니다"

**콘텐츠 기반 필터링 (Content-Based Filtering)**은 아이템의 속성(장르, 감독, 설명 등)을 분석합니다.

  • 새로운 아이템에도 추천 가능 (콜드 스타트 문제 완화)
  • 특성 공학이 중요

하이브리드 방법은 두 방식을 결합하여 각각의 단점을 보완합니다.

1.2 암시적 vs 명시적 피드백

명시적 피드백: 별점, 좋아요, 싫어요 — 의도가 명확하지만 희소(sparse)함 암시적 피드백: 클릭, 조회 시간, 구매 기록 — 풍부하지만 노이즈가 많음

실제 서비스에서는 암시적 피드백이 훨씬 풍부하여 주로 활용됩니다.

1.3 평가 지표

import numpy as np
from sklearn.metrics import ndcg_score

def precision_at_k(recommended, relevant, k):
    """Precision@K: 상위 K개 중 관련 아이템 비율"""
    rec_k = recommended[:k]
    hits = len(set(rec_k) & set(relevant))
    return hits / k

def recall_at_k(recommended, relevant, k):
    """Recall@K: 모든 관련 아이템 중 상위 K에서 찾은 비율"""
    rec_k = recommended[:k]
    hits = len(set(rec_k) & set(relevant))
    return hits / len(relevant) if relevant else 0

def average_precision_at_k(recommended, relevant, k):
    """AP@K: Precision@k의 누적 평균 (순위 반영)"""
    if not relevant:
        return 0.0
    hits = 0
    sum_prec = 0.0
    for i, item in enumerate(recommended[:k]):
        if item in relevant:
            hits += 1
            sum_prec += hits / (i + 1)
    return sum_prec / min(len(relevant), k)

def ndcg_at_k(recommended, relevant, k):
    """NDCG@K: Normalized Discounted Cumulative Gain"""
    relevance = [1 if item in relevant else 0 for item in recommended[:k]]
    if not any(relevance):
        return 0.0

    # DCG
    dcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(relevance))

    # Ideal DCG
    ideal = sorted(relevance, reverse=True)
    idcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(ideal))

    return dcg / idcg if idcg > 0 else 0.0

# 예시 평가
recommended = [1, 4, 7, 2, 9, 3, 5, 6, 8, 10]  # 추천된 아이템 ID
relevant    = {1, 2, 5, 7, 8}                    # 실제 관련 아이템

print("=" * 40)
print("추천 시스템 평가 지표")
print("=" * 40)
for k in [5, 10]:
    p = precision_at_k(recommended, relevant, k)
    r = recall_at_k(recommended, relevant, k)
    ap = average_precision_at_k(recommended, relevant, k)
    n = ndcg_at_k(recommended, relevant, k)
    print(f"\nk = {k}")
    print(f"  Precision@{k}: {p:.4f}")
    print(f"  Recall@{k}:    {r:.4f}")
    print(f"  AP@{k}:        {ap:.4f}")
    print(f"  NDCG@{k}:      {n:.4f}")

2. 행렬 분해 (Matrix Factorization)

2.1 기본 개념

행렬 분해는 사용자-아이템 상호작용 행렬 R을 두 개의 낮은 차원 행렬로 분해합니다.

R ≈ U × V^T

  • U: 사용자 임베딩 행렬 (n_users × k)
  • V: 아이템 임베딩 행렬 (n_items × k)
  • k: 잠재 요인(latent factor) 수

2.2 PyTorch로 구현하는 Matrix Factorization

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# MovieLens 100K 시뮬레이션 데이터 생성
def generate_movielens_like_data(n_users=1000, n_items=500, n_ratings=50000):
    np.random.seed(42)

    # 사용자와 아이템의 잠재 특성 시뮬레이션
    k = 20  # 잠재 요인 수
    user_factors = np.random.randn(n_users, k) * 0.5
    item_factors = np.random.randn(n_items, k) * 0.5

    # 진짜 선호도 = 잠재 요인의 내적
    true_ratings = user_factors @ item_factors.T
    # 1-5 스케일로 변환
    true_ratings = (true_ratings - true_ratings.min()) / (true_ratings.max() - true_ratings.min()) * 4 + 1

    # 랜덤 샘플링으로 관측 데이터 생성
    user_ids = np.random.choice(n_users, n_ratings)
    item_ids = np.random.choice(n_items, n_ratings)
    ratings  = true_ratings[user_ids, item_ids] + np.random.randn(n_ratings) * 0.3
    ratings  = np.clip(ratings, 1, 5)

    df = pd.DataFrame({'user_id': user_ids, 'item_id': item_ids, 'rating': ratings})
    df = df.drop_duplicates(subset=['user_id', 'item_id'])
    return df

# 데이터 로드
ratings_df = generate_movielens_like_data()
print(f"총 평점 수: {len(ratings_df)}")
print(f"사용자 수: {ratings_df['user_id'].nunique()}")
print(f"아이템 수: {ratings_df['item_id'].nunique()}")
print(f"평점 분포:\n{ratings_df['rating'].describe()}")

# 훈련/테스트 분리
train_df, test_df = train_test_split(ratings_df, test_size=0.2, random_state=42)

n_users = ratings_df['user_id'].max() + 1
n_items = ratings_df['item_id'].max() + 1


class RatingsDataset(Dataset):
    def __init__(self, df):
        self.users   = torch.LongTensor(df['user_id'].values)
        self.items   = torch.LongTensor(df['item_id'].values)
        self.ratings = torch.FloatTensor(df['rating'].values)

    def __len__(self):
        return len(self.ratings)

    def __getitem__(self, idx):
        return self.users[idx], self.items[idx], self.ratings[idx]


class MatrixFactorization(nn.Module):
    """기본 행렬 분해 모델"""
    def __init__(self, n_users, n_items, n_factors=50):
        super().__init__()
        self.user_embedding = nn.Embedding(n_users, n_factors)
        self.item_embedding = nn.Embedding(n_items, n_factors)

        # 바이어스 항
        self.user_bias = nn.Embedding(n_users, 1)
        self.item_bias = nn.Embedding(n_items, 1)

        # 전역 평균
        self.global_bias = nn.Parameter(torch.zeros(1))

        # 임베딩 초기화
        nn.init.normal_(self.user_embedding.weight, mean=0, std=0.01)
        nn.init.normal_(self.item_embedding.weight, mean=0, std=0.01)
        nn.init.zeros_(self.user_bias.weight)
        nn.init.zeros_(self.item_bias.weight)

    def forward(self, user_ids, item_ids):
        user_emb  = self.user_embedding(user_ids)
        item_emb  = self.item_embedding(item_ids)

        # 내적으로 기본 예측
        dot_product = (user_emb * item_emb).sum(dim=1)

        # 바이어스 추가
        u_bias = self.user_bias(user_ids).squeeze()
        i_bias = self.item_bias(item_ids).squeeze()

        prediction = dot_product + u_bias + i_bias + self.global_bias

        return prediction


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
mf_model = MatrixFactorization(n_users, n_items, n_factors=64).to(device)

train_loader = DataLoader(RatingsDataset(train_df), batch_size=512, shuffle=True)
test_loader  = DataLoader(RatingsDataset(test_df), batch_size=512, shuffle=False)

optimizer = optim.Adam(mf_model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion = nn.MSELoss()


def train_rating_model(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for users, items, ratings in loader:
        users, items, ratings = users.to(device), items.to(device), ratings.to(device)
        optimizer.zero_grad()
        pred = model(users, items)
        loss = criterion(pred, ratings)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * len(ratings)
    return (total_loss / len(loader.dataset)) ** 0.5  # RMSE


def evaluate_rating_model(model, loader, device):
    model.eval()
    all_preds, all_targets = [], []
    with torch.no_grad():
        for users, items, ratings in loader:
            users, items = users.to(device), items.to(device)
            pred = model(users, items).cpu().numpy()
            all_preds.extend(pred)
            all_targets.extend(ratings.numpy())
    preds   = np.array(all_preds)
    targets = np.array(all_targets)
    rmse = np.sqrt(((preds - targets)**2).mean())
    mae  = np.abs(preds - targets).mean()
    return rmse, mae


for epoch in range(30):
    train_rmse = train_rating_model(mf_model, train_loader, optimizer, criterion, device)
    if (epoch + 1) % 10 == 0:
        test_rmse, test_mae = evaluate_rating_model(mf_model, test_loader, device)
        print(f"Epoch {epoch+1:2d} | Train RMSE: {train_rmse:.4f} | Test RMSE: {test_rmse:.4f} | MAE: {test_mae:.4f}")

2.3 BPR (Bayesian Personalized Ranking)

BPR은 암시적 피드백에 특화된 학습 방법으로, 사용자가 상호작용한 아이템을 그렇지 않은 아이템보다 더 선호한다고 가정합니다.

class BPRModel(nn.Module):
    """BPR 손실로 훈련되는 행렬 분해"""
    def __init__(self, n_users, n_items, n_factors=64):
        super().__init__()
        self.user_embedding = nn.Embedding(n_users, n_factors)
        self.item_embedding = nn.Embedding(n_items, n_factors)
        nn.init.normal_(self.user_embedding.weight, 0, 0.01)
        nn.init.normal_(self.item_embedding.weight, 0, 0.01)

    def forward(self, user_ids, pos_item_ids, neg_item_ids):
        user_emb = self.user_embedding(user_ids)
        pos_emb  = self.item_embedding(pos_item_ids)
        neg_emb  = self.item_embedding(neg_item_ids)

        pos_score = (user_emb * pos_emb).sum(dim=1)
        neg_score = (user_emb * neg_emb).sum(dim=1)

        return pos_score, neg_score

    def predict(self, user_ids, item_ids):
        user_emb = self.user_embedding(user_ids)
        item_emb = self.item_embedding(item_ids)
        return (user_emb * item_emb).sum(dim=1)


def bpr_loss(pos_score, neg_score, reg_lambda=1e-5, model=None):
    """BPR 손실 = -log(sigmoid(pos - neg)) + 정규화"""
    loss = -torch.log(torch.sigmoid(pos_score - neg_score)).mean()
    if model and reg_lambda > 0:
        reg = sum(p.norm(2) for p in model.parameters())
        loss += reg_lambda * reg
    return loss


# BPR용 데이터셋 (포지티브 + 랜덤 네거티브 샘플링)
class BPRDataset(Dataset):
    def __init__(self, df, n_items):
        self.users     = df['user_id'].values
        self.pos_items = df['item_id'].values
        self.n_items   = n_items

        # 사용자별 상호작용 아이템 기록
        self.user_items = df.groupby('user_id')['item_id'].apply(set).to_dict()

    def __len__(self):
        return len(self.users)

    def __getitem__(self, idx):
        user = self.users[idx]
        pos  = self.pos_items[idx]

        # 네거티브 아이템 샘플링 (상호작용하지 않은 아이템)
        neg = np.random.randint(self.n_items)
        while neg in self.user_items.get(user, set()):
            neg = np.random.randint(self.n_items)

        return torch.LongTensor([user])[0], torch.LongTensor([pos])[0], torch.LongTensor([neg])[0]


bpr_train_dataset = BPRDataset(train_df, n_items)
bpr_loader = DataLoader(bpr_train_dataset, batch_size=512, shuffle=True)

bpr_model = BPRModel(n_users, n_items, n_factors=64).to(device)
bpr_optimizer = optim.Adam(bpr_model.parameters(), lr=1e-3)

for epoch in range(20):
    bpr_model.train()
    total_loss = 0
    for users, pos_items, neg_items in bpr_loader:
        users, pos_items, neg_items = users.to(device), pos_items.to(device), neg_items.to(device)
        bpr_optimizer.zero_grad()
        pos_score, neg_score = bpr_model(users, pos_items, neg_items)
        loss = bpr_loss(pos_score, neg_score, model=bpr_model)
        loss.backward()
        bpr_optimizer.step()
        total_loss += loss.item()

    if (epoch + 1) % 5 == 0:
        print(f"BPR Epoch {epoch+1:2d} | Loss: {total_loss/len(bpr_loader):.4f}")

3. Neural Collaborative Filtering (NCF)

3.1 NCF 아키텍처

NCF는 행렬 분해를 딥러닝으로 확장한 모델입니다. 두 가지 컴포넌트를 결합합니다.

GMF (Generalized Matrix Factorization): 임베딩의 원소별 곱 (MF의 일반화) MLP (Multi-Layer Perceptron): 임베딩의 연결(concatenation)을 비선형 변환

class NCF(nn.Module):
    """
    Neural Collaborative Filtering
    He et al., 2017 (arxiv.org/abs/1708.05031)
    """
    def __init__(self, n_users, n_items, n_factors=64, mlp_dims=None, dropout=0.2):
        super().__init__()

        if mlp_dims is None:
            mlp_dims = [256, 128, 64]

        # GMF 임베딩
        self.gmf_user = nn.Embedding(n_users, n_factors)
        self.gmf_item = nn.Embedding(n_items, n_factors)

        # MLP 임베딩 (별도 임베딩 사용)
        self.mlp_user = nn.Embedding(n_users, n_factors)
        self.mlp_item = nn.Embedding(n_items, n_factors)

        # MLP 레이어 구성
        mlp_layers = []
        input_size = n_factors * 2
        for dim in mlp_dims:
            mlp_layers.extend([
                nn.Linear(input_size, dim),
                nn.BatchNorm1d(dim),
                nn.ReLU(),
                nn.Dropout(dropout)
            ])
            input_size = dim
        self.mlp = nn.Sequential(*mlp_layers)

        # GMF와 MLP 결합 후 최종 예측
        self.output_layer = nn.Linear(n_factors + mlp_dims[-1], 1)

        # 임베딩 초기화
        for emb in [self.gmf_user, self.gmf_item, self.mlp_user, self.mlp_item]:
            nn.init.normal_(emb.weight, 0, 0.01)

    def forward(self, user_ids, item_ids):
        # GMF 경로
        gmf_u = self.gmf_user(user_ids)
        gmf_i = self.gmf_item(item_ids)
        gmf_out = gmf_u * gmf_i  # 원소별 곱 (n_factors)

        # MLP 경로
        mlp_u   = self.mlp_user(user_ids)
        mlp_i   = self.mlp_item(item_ids)
        mlp_in  = torch.cat([mlp_u, mlp_i], dim=1)  # (batch, 2*n_factors)
        mlp_out = self.mlp(mlp_in)  # (batch, mlp_dims[-1])

        # 결합
        combined = torch.cat([gmf_out, mlp_out], dim=1)
        output   = torch.sigmoid(self.output_layer(combined)).squeeze()

        return output


# 암시적 피드백 데이터셋 (클릭 여부: 0 또는 1)
class ImplicitDataset(Dataset):
    def __init__(self, pos_df, n_items, neg_ratio=4):
        self.users   = []
        self.items   = []
        self.labels  = []
        self.n_items = n_items

        user_items = pos_df.groupby('user_id')['item_id'].apply(set).to_dict()

        for _, row in pos_df.iterrows():
            user, item = row['user_id'], row['item_id']

            # 포지티브 샘플
            self.users.append(user)
            self.items.append(item)
            self.labels.append(1.0)

            # 네거티브 샘플
            for _ in range(neg_ratio):
                neg = np.random.randint(n_items)
                while neg in user_items.get(user, set()):
                    neg = np.random.randint(n_items)
                self.users.append(user)
                self.items.append(neg)
                self.labels.append(0.0)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return (torch.LongTensor([self.users[idx]])[0],
                torch.LongTensor([self.items[idx]])[0],
                torch.FloatTensor([self.labels[idx]])[0])


# 암시적 데이터셋 생성 (평점 >= 3.5인 경우 포지티브로 간주)
implicit_train = train_df[train_df['rating'] >= 3.5].copy()
implicit_dataset = ImplicitDataset(implicit_train, n_items, neg_ratio=4)
implicit_loader  = DataLoader(implicit_dataset, batch_size=1024, shuffle=True)

ncf_model = NCF(n_users, n_items, n_factors=64, mlp_dims=[256, 128, 64]).to(device)
ncf_optimizer = optim.Adam(ncf_model.parameters(), lr=1e-3, weight_decay=1e-5)
bce_loss = nn.BCELoss()

print(f"NCF 파라미터 수: {sum(p.numel() for p in ncf_model.parameters()):,}")

for epoch in range(20):
    ncf_model.train()
    total_loss = 0
    for users, items, labels in implicit_loader:
        users, items, labels = users.to(device), items.to(device), labels.to(device)
        ncf_optimizer.zero_grad()
        pred = ncf_model(users, items)
        loss = bce_loss(pred, labels)
        loss.backward()
        ncf_optimizer.step()
        total_loss += loss.item()

    if (epoch + 1) % 5 == 0:
        print(f"NCF Epoch {epoch+1:2d} | Loss: {total_loss/len(implicit_loader):.4f}")

4. 투 타워 모델 (Two-Tower Model)

4.1 아키텍처 개요

투 타워 모델(Dual Encoder / Bi-Encoder)은 유저 타워와 아이템 타워를 각각 독립적으로 학습합니다. 두 타워의 임베딩 유사도를 점수로 사용합니다.

대규모 서비스에서의 장점:

  • 아이템 임베딩 사전 계산 가능 (오프라인)
  • 빠른 근사 최근접 이웃(ANN) 검색
  • 수십억 개의 아이템에도 확장 가능

YouTube, Google, Spotify, Pinterest 등 대형 서비스에서 사용합니다.

class UserTower(nn.Module):
    """유저 타워: 사용자 특성을 임베딩으로 변환"""
    def __init__(self, n_users, user_feature_dim, embed_dim=128, hidden_dims=None):
        super().__init__()
        if hidden_dims is None:
            hidden_dims = [256, 128]

        # ID 임베딩
        self.id_embedding = nn.Embedding(n_users, embed_dim)

        # 특성 처리 네트워크
        layers = []
        input_dim = embed_dim + user_feature_dim
        for h in hidden_dims:
            layers.extend([nn.Linear(input_dim, h), nn.LayerNorm(h), nn.ReLU(), nn.Dropout(0.1)])
            input_dim = h
        layers.append(nn.Linear(input_dim, embed_dim))
        self.network = nn.Sequential(*layers)

    def forward(self, user_ids, user_features):
        id_emb = self.id_embedding(user_ids)
        combined = torch.cat([id_emb, user_features], dim=1)
        return nn.functional.normalize(self.network(combined), dim=-1)  # L2 정규화


class ItemTower(nn.Module):
    """아이템 타워: 아이템 특성을 임베딩으로 변환"""
    def __init__(self, n_items, item_feature_dim, embed_dim=128, hidden_dims=None):
        super().__init__()
        if hidden_dims is None:
            hidden_dims = [256, 128]

        self.id_embedding = nn.Embedding(n_items, embed_dim)

        layers = []
        input_dim = embed_dim + item_feature_dim
        for h in hidden_dims:
            layers.extend([nn.Linear(input_dim, h), nn.LayerNorm(h), nn.ReLU(), nn.Dropout(0.1)])
            input_dim = h
        layers.append(nn.Linear(input_dim, embed_dim))
        self.network = nn.Sequential(*layers)

    def forward(self, item_ids, item_features):
        id_emb = self.id_embedding(item_ids)
        combined = torch.cat([id_emb, item_features], dim=1)
        return nn.functional.normalize(self.network(combined), dim=-1)


class TwoTowerModel(nn.Module):
    """두 타워를 결합한 전체 모델"""
    def __init__(self, n_users, n_items, user_feat_dim, item_feat_dim, embed_dim=128):
        super().__init__()
        self.user_tower = UserTower(n_users, user_feat_dim, embed_dim)
        self.item_tower = ItemTower(n_items, item_feat_dim, embed_dim)
        self.temperature = nn.Parameter(torch.tensor(0.07))

    def forward(self, user_ids, user_features, item_ids, item_features):
        user_emb = self.user_tower(user_ids, user_features)
        item_emb = self.item_tower(item_ids, item_features)
        return user_emb, item_emb

    def compute_similarity(self, user_emb, item_emb):
        """배치 내 인-배치 네거티브를 사용하는 InfoNCE 손실을 위한 유사도 행렬"""
        return torch.matmul(user_emb, item_emb.T) / self.temperature.exp()


def info_nce_loss(similarity_matrix):
    """In-batch negative sampling을 이용한 InfoNCE 손실"""
    batch_size = similarity_matrix.size(0)
    labels = torch.arange(batch_size, device=similarity_matrix.device)
    loss = nn.CrossEntropyLoss()(similarity_matrix, labels)
    return loss


# 임시 특성 데이터 생성
user_feature_dim = 16
item_feature_dim = 32

np.random.seed(42)
user_features = torch.FloatTensor(np.random.randn(n_users, user_feature_dim))
item_features = torch.FloatTensor(np.random.randn(n_items, item_feature_dim))

two_tower = TwoTowerModel(n_users, n_items, user_feature_dim, item_feature_dim, embed_dim=128).to(device)

print(f"Two-Tower 파라미터: {sum(p.numel() for p in two_tower.parameters()):,}")

# 추론 시 아이템 임베딩 미리 계산
def precompute_item_embeddings(model, n_items, item_features, batch_size=256, device='cpu'):
    """전체 아이템 임베딩 사전 계산 (서빙 시 효율을 위해)"""
    model.eval()
    all_item_embs = []

    with torch.no_grad():
        for start in range(0, n_items, batch_size):
            end = min(start + batch_size, n_items)
            ids  = torch.arange(start, end, device=device)
            feat = item_features[start:end].to(device)
            emb  = model.item_tower(ids, feat)
            all_item_embs.append(emb.cpu())

    return torch.cat(all_item_embs, dim=0)

item_emb_cache = precompute_item_embeddings(two_tower, n_items, item_features, device=device)
print(f"사전 계산된 아이템 임베딩 형태: {item_emb_cache.shape}")

4.2 Faiss를 이용한 근사 최근접 이웃 검색

def demo_faiss_search():
    """
    Faiss를 이용한 ANN 검색 예시
    설치: pip install faiss-cpu (또는 faiss-gpu)
    """
    usage_note = """
    import faiss

    embed_dim = 128
    n_items = 1000000  # 100만 아이템

    # FAISS 인덱스 생성
    index = faiss.IndexFlatIP(embed_dim)  # 내적(IP) 기반 검색
    # 또는 근사 검색 (더 빠름):
    # index = faiss.IndexIVFFlat(faiss.IndexFlatIP(embed_dim), embed_dim, 100)
    # index.train(item_embeddings)

    # 아이템 임베딩 추가 (L2 정규화된 벡터 사용 권장)
    item_embeddings = item_emb_cache.numpy().astype('float32')
    faiss.normalize_L2(item_embeddings)
    index.add(item_embeddings)

    # 사용자 쿼리 검색
    user_query = user_emb.numpy().astype('float32')
    faiss.normalize_L2(user_query)

    k = 100  # Top-K 검색
    scores, indices = index.search(user_query, k)

    print(f"Top-{k} 추천 아이템: {indices[0]}")
    print(f"유사도 점수: {scores[0]}")
    """
    print("Faiss ANN 검색:")
    print("  - IndexFlatIP: 정확한 내적 검색 (소규모)")
    print("  - IndexIVFFlat: 역파일 인덱스 (중규모)")
    print("  - IndexHNSW: 계층적 그래프 (대규모, 빠름)")
    print("  - IndexPQ: 제품 양자화 (메모리 효율)")

demo_faiss_search()

5. 시퀀스 기반 추천

5.1 SASRec (Self-Attentive Sequential Recommendation)

SASRec은 Transformer의 Self-Attention을 사용하여 사용자 행동 시퀀스의 중요한 아이템을 선택합니다.

class SASRecBlock(nn.Module):
    """SASRec Transformer 블록"""
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        self.attention = nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_model * 4),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_model * 4, d_model),
            nn.Dropout(dropout)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x, attention_mask=None):
        # 인과적 마스크 (미래 아이템을 보지 않도록)
        seq_len = x.size(1)
        causal_mask = torch.triu(torch.ones(seq_len, seq_len, device=x.device), diagonal=1).bool()

        # Self-Attention + 잔차 연결
        attn_out, _ = self.attention(x, x, x, attn_mask=causal_mask)
        x = self.norm1(x + attn_out)

        # FFN + 잔차 연결
        x = self.norm2(x + self.feed_forward(x))
        return x


class SASRec(nn.Module):
    """
    SASRec: Self-Attentive Sequential Recommendation
    Kang & McAuley, 2018 (arxiv.org/abs/1808.09781)
    """
    def __init__(self, n_items, max_seq_len, d_model=128, n_heads=4,
                 num_layers=2, dropout=0.1):
        super().__init__()

        self.item_embedding = nn.Embedding(n_items + 1, d_model, padding_idx=0)  # 0은 패딩
        self.pos_embedding  = nn.Embedding(max_seq_len, d_model)

        self.blocks   = nn.ModuleList([SASRecBlock(d_model, n_heads, dropout) for _ in range(num_layers)])
        self.dropout  = nn.Dropout(dropout)
        self.norm     = nn.LayerNorm(d_model)
        self.d_model  = d_model
        self.max_seq  = max_seq_len

    def forward(self, item_seq):
        """
        item_seq: (batch, seq_len) — 아이템 ID 시퀀스 (0은 패딩)
        Returns: (batch, seq_len, d_model) — 각 위치의 표현
        """
        seq_len = item_seq.size(1)
        positions = torch.arange(seq_len, device=item_seq.device).unsqueeze(0)

        x = self.item_embedding(item_seq) + self.pos_embedding(positions)
        x = self.dropout(x)

        for block in self.blocks:
            x = block(x)

        return self.norm(x)

    def predict(self, item_seq, candidate_item_ids):
        """
        주어진 시퀀스 다음에 올 아이템 예측

        Args:
            item_seq: (batch, seq_len)
            candidate_item_ids: (batch, n_candidates)
        Returns:
            scores: (batch, n_candidates)
        """
        seq_repr = self.forward(item_seq)
        # 마지막 비패딩 위치의 표현 사용
        last_repr = seq_repr[:, -1, :]  # (batch, d_model)

        cand_emb = self.item_embedding(candidate_item_ids)  # (batch, n_cand, d_model)
        scores = (last_repr.unsqueeze(1) * cand_emb).sum(-1)  # (batch, n_cand)
        return scores


# 시퀀스 데이터셋 생성
class SequentialDataset(Dataset):
    def __init__(self, ratings_df, max_seq_len=50, min_seq_len=5):
        self.max_seq_len = max_seq_len
        self.sequences = []

        # 사용자별 아이템 시퀀스 구성 (시간 순서)
        user_sequences = ratings_df.sort_values('user_id').groupby('user_id')['item_id'].apply(list)

        for user_id, items in user_sequences.items():
            if len(items) < min_seq_len:
                continue

            # 마지막 아이템을 타겟으로, 나머지를 입력으로
            for i in range(min_seq_len, len(items) + 1):
                seq = items[max(0, i-max_seq_len-1):i-1]
                target = items[i-1]

                # 패딩 (왼쪽 패딩으로 최신 아이템이 오른쪽에)
                padded_seq = [0] * (max_seq_len - len(seq)) + seq
                padded_seq = padded_seq[-max_seq_len:]  # max_seq_len으로 자르기

                self.sequences.append((padded_seq, target + 1))  # +1 (0은 패딩용)

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        seq, target = self.sequences[idx]
        return torch.LongTensor(seq), torch.LongTensor([target])[0]


# 모델 생성
sasrec = SASRec(
    n_items=n_items,
    max_seq_len=50,
    d_model=128,
    n_heads=4,
    num_layers=2
).to(device)

print(f"SASRec 파라미터: {sum(p.numel() for p in sasrec.parameters()):,}")

5.2 BERT4Rec

BERT4Rec은 BERT의 마스크드 언어 모델(MLM)을 시퀀스 추천에 적용합니다. 무작위로 아이템을 마스킹하고 예측하여 양방향 컨텍스트를 학습합니다.

class BERT4Rec(nn.Module):
    """
    BERT4Rec: Sequential Recommendation with BERT
    Sun et al., 2019
    """
    def __init__(self, n_items, max_seq_len, d_model=256, n_heads=4,
                 num_layers=2, dropout=0.1, mask_prob=0.15):
        super().__init__()

        self.mask_token  = n_items + 1  # 마스크 토큰 ID
        self.n_items     = n_items
        self.mask_prob   = mask_prob

        self.item_embedding = nn.Embedding(n_items + 2, d_model, padding_idx=0)
        self.pos_embedding  = nn.Embedding(max_seq_len, d_model)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=n_heads,
            dim_feedforward=d_model*4, dropout=dropout, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.norm        = nn.LayerNorm(d_model)
        self.output      = nn.Linear(d_model, n_items + 2)

    def forward(self, item_seq):
        seq_len   = item_seq.size(1)
        positions = torch.arange(seq_len, device=item_seq.device).unsqueeze(0)

        x = self.item_embedding(item_seq) + self.pos_embedding(positions)
        x = self.transformer(x)
        x = self.norm(x)
        return self.output(x)

    def mask_sequence(self, item_seq):
        """훈련 시 랜덤 마스킹"""
        masked_seq = item_seq.clone()
        mask = (torch.rand_like(item_seq.float()) < self.mask_prob) & (item_seq != 0)
        masked_seq[mask] = self.mask_token
        return masked_seq, mask

6. 그래프 기반 추천: LightGCN

6.1 LightGCN 아키텍처

LightGCN(Light Graph Convolution Network)은 사용자-아이템 이분 그래프에서 메시지 패싱으로 고차 연결성을 학습합니다. 불필요한 컴포넌트(특성 변환, 비선형 활성화)를 제거하여 효율적입니다.

class LightGCN(nn.Module):
    """
    LightGCN: Simplifying and Powering Graph Convolution Network
    He et al., 2020 (arxiv.org/abs/2202.01151)
    """
    def __init__(self, n_users, n_items, embed_dim=64, n_layers=3):
        super().__init__()

        self.n_users   = n_users
        self.n_items   = n_items
        self.n_layers  = n_layers
        self.embed_dim = embed_dim

        # 임베딩 초기화
        self.user_embedding = nn.Embedding(n_users, embed_dim)
        self.item_embedding = nn.Embedding(n_items, embed_dim)

        nn.init.normal_(self.user_embedding.weight, std=0.1)
        nn.init.normal_(self.item_embedding.weight, std=0.1)

    def compute_normalized_adj(self, interactions, device):
        """
        정규화된 인접 행렬 계산

        A_hat = D^(-1/2) * A * D^(-1/2)
        """
        n_nodes = self.n_users + self.n_items

        # 사용자-아이템 상호작용을 엣지로 변환
        user_ids = interactions[:, 0]
        item_ids = interactions[:, 1] + self.n_users

        row = torch.cat([user_ids, item_ids])
        col = torch.cat([item_ids, user_ids])
        edge_index = torch.stack([row, col]).to(device)

        # 차수 계산
        deg = torch.zeros(n_nodes, device=device)
        deg.scatter_add_(0, row, torch.ones(len(row), device=device))

        # D^(-1/2) 계산
        deg_inv_sqrt = deg.pow(-0.5)
        deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0

        # 정규화 가중치
        edge_weight = deg_inv_sqrt[row] * deg_inv_sqrt[col]

        return edge_index, edge_weight, n_nodes

    def forward(self, interactions):
        """
        모든 레이어의 임베딩 평균으로 최종 임베딩 계산
        """
        device = self.user_embedding.weight.device
        edge_index, edge_weight, n_nodes = self.compute_normalized_adj(interactions, device)

        # 초기 임베딩
        all_emb = torch.cat([
            self.user_embedding.weight,
            self.item_embedding.weight
        ], dim=0)

        layer_embs = [all_emb]

        # 그래프 합성곱 레이어
        for _ in range(self.n_layers):
            # 스파스 행렬-벡터 곱 (메시지 패싱)
            new_emb = torch.zeros_like(all_emb)
            new_emb.scatter_add_(
                0,
                edge_index[1].unsqueeze(1).expand(-1, self.embed_dim),
                all_emb[edge_index[0]] * edge_weight.unsqueeze(1)
            )
            all_emb = new_emb
            layer_embs.append(all_emb)

        # 모든 레이어 평균
        final_emb = torch.stack(layer_embs, dim=0).mean(dim=0)

        user_emb = final_emb[:self.n_users]
        item_emb = final_emb[self.n_users:]

        return user_emb, item_emb

    def bpr_loss(self, user_emb, item_emb, users, pos_items, neg_items, reg_lambda=1e-4):
        """BPR 손실 계산"""
        u_emb = user_emb[users]
        p_emb = item_emb[pos_items]
        n_emb = item_emb[neg_items]

        pos_score = (u_emb * p_emb).sum(-1)
        neg_score = (u_emb * n_emb).sum(-1)

        loss = -torch.log(torch.sigmoid(pos_score - neg_score)).mean()

        # L2 정규화
        reg = (self.user_embedding.weight[users].norm(2).pow(2) +
               self.item_embedding.weight[pos_items].norm(2).pow(2) +
               self.item_embedding.weight[neg_items].norm(2).pow(2)) / (2 * len(users))

        return loss + reg_lambda * reg


# LightGCN 모델 생성
lightgcn = LightGCN(n_users, n_items, embed_dim=64, n_layers=3).to(device)

# 상호작용 행렬 생성
interactions = torch.LongTensor(train_df[['user_id', 'item_id']].values)

print(f"LightGCN 파라미터: {sum(p.numel() for p in lightgcn.parameters()):,}")
print(f"상호작용 수: {len(interactions)}")

7. LLM 기반 추천

7.1 LLM을 추천에 활용하는 방법

LLM은 추천 시스템에 다양한 방식으로 활용됩니다.

  1. 아이템 특성 인코딩: LLM으로 아이템 설명을 임베딩
  2. 프롬프트 기반 추천: 직접 프롬프트로 추천 요청
  3. 사용자 프로파일 텍스트 생성: 사용자 행동을 텍스트로 변환
  4. 이유 생성 (Explainability): 추천 이유 텍스트 생성
from transformers import AutoTokenizer, AutoModel
import torch.nn.functional as F

def mean_pooling(model_output, attention_mask):
    """토큰 임베딩의 평균 풀링"""
    token_embeddings = model_output[0]
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)


class LLMItemEncoder:
    """LLM으로 아이템 설명을 임베딩으로 변환"""

    def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
        """
        실제 사용 시:
        pip install transformers sentence-transformers
        """
        self.model_name = model_name
        print(f"LLM 인코더 초기화: {model_name}")

    def encode(self, texts, batch_size=32):
        """
        텍스트 배치를 임베딩으로 변환

        실제 코드:
        tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        model = AutoModel.from_pretrained(self.model_name)

        embeddings = []
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i+batch_size]
            encoded = tokenizer(batch, padding=True, truncation=True,
                               max_length=128, return_tensors='pt')
            with torch.no_grad():
                output = model(**encoded)
            emb = mean_pooling(output, encoded['attention_mask'])
            emb = F.normalize(emb, dim=1)
            embeddings.append(emb)

        return torch.cat(embeddings, dim=0)
        """
        # 시뮬레이션
        return torch.randn(len(texts), 384)


# 영화 아이템 설명 예시
movie_descriptions = [
    "A thrilling sci-fi adventure set in space with stunning visual effects",
    "A heartwarming romantic comedy about finding love in unexpected places",
    "An intense psychological thriller with unexpected plot twists",
    "An animated fantasy film perfect for families and children",
    "A gripping crime drama based on true events"
]

encoder = LLMItemEncoder()
item_embeddings = encoder.encode(movie_descriptions)
print(f"LLM 아이템 임베딩 형태: {item_embeddings.shape}")

# 유사도 계산
similarity_matrix = torch.matmul(item_embeddings, item_embeddings.T)
print("\n아이템 간 유사도 행렬:")
print(similarity_matrix.numpy().round(3))

7.2 프롬프트 기반 추천 시스템

def build_recommendation_prompt(user_history, candidate_items, user_profile=None):
    """
    LLM 추천을 위한 프롬프트 구성
    """
    history_text = "\n".join([f"  - {item}" for item in user_history])

    candidates_text = "\n".join([
        f"  {i+1}. {item}" for i, item in enumerate(candidate_items)
    ])

    profile_text = f"\n사용자 프로필: {user_profile}" if user_profile else ""

    prompt = f"""당신은 개인화된 영화 추천 전문가입니다.{profile_text}

사용자가 최근에 좋아한 영화들:
{history_text}

다음 후보 영화들 중 사용자가 가장 좋아할 것 같은 영화를 순서대로 추천해주세요.
각 추천에 대한 이유도 한 문장으로 설명해주세요.

후보 영화들:
{candidates_text}

추천 결과를 다음 형식으로 제공해주세요:
1. [영화명] - [추천 이유]
2. [영화명] - [추천 이유]
3. [영화명] - [추천 이유]"""

    return prompt


# 예시 사용
user_history = [
    "인터스텔라 (2014)",
    "매트릭스 (1999)",
    "블레이드 러너 2049 (2017)"
]

candidates = [
    "아바타: 물의 길 (2022)",
    "노트북 (2004)",
    "기생충 (2019)",
    "듄 (2021)",
    "어바웃 타임 (2013)"
]

prompt = build_recommendation_prompt(
    user_history=user_history,
    candidate_items=candidates,
    user_profile="SF와 스릴러 장르 선호, 시각효과와 세계관 설정을 중시"
)

print("생성된 프롬프트:")
print("=" * 60)
print(prompt)
print("=" * 60)

8. 산업 수준 추천 시스템

8.1 멀티 스테이지 아키텍처

실제 대규모 추천 시스템은 여러 단계로 구성됩니다.

class IndustrialRecSystem:
    """
    산업 수준 추천 시스템 아키텍처 개요

    3단계:
    1. 검색(Retrieval): 수백만 → 수백 개 후보 선별
    2. 랭킹(Ranking): 수백 → 수십 개 정밀 재순위
    3. 재랭킹(Re-Ranking): 다양성, 신선도, 비즈니스 규칙 적용
    """

    def __init__(self):
        print("산업 수준 추천 시스템 초기화")
        print("=" * 50)
        print("아키텍처:")
        print("  1단계 검색: Two-Tower + ANN (밀리초 내)")
        print("  2단계 랭킹: DCN/xDeepFM (복잡한 특성 교차)")
        print("  3단계 재랭킹: MMR/DPP (다양성 보장)")

    def retrieval_stage(self, user_embedding, item_index, k=500):
        """단계 1: 후보 검색"""
        # Faiss ANN 검색으로 빠르게 Top-K 후보 선별
        print(f"\n[검색 단계] {k}개 후보 선별")
        return list(range(k))

    def ranking_stage(self, user_features, candidate_items, context_features):
        """단계 2: 정밀 랭킹"""
        print(f"\n[랭킹 단계] {len(candidate_items)}개 → 50개 정밀 랭킹")
        return candidate_items[:50]

    def reranking_stage(self, ranked_items, diversity_weight=0.3):
        """단계 3: 재랭킹 (다양성 + 신선도)"""
        print(f"\n[재랭킹 단계] 다양성 가중치: {diversity_weight}")
        # MMR (Maximal Marginal Relevance)로 다양성 보장
        return ranked_items[:20]


class DeepCrossNetwork(nn.Module):
    """
    DCN (Deep & Cross Network): 자동 특성 교차 학습
    Wang et al., 2017
    """
    def __init__(self, input_dim, cross_layers=3, deep_dims=None, dropout=0.1):
        super().__init__()

        if deep_dims is None:
            deep_dims = [256, 128, 64]

        self.input_dim   = input_dim
        self.cross_layers = cross_layers

        # Cross Network (다항 특성 교차)
        self.cross_weights = nn.ParameterList([
            nn.Parameter(torch.randn(input_dim, 1)) for _ in range(cross_layers)
        ])
        self.cross_biases = nn.ParameterList([
            nn.Parameter(torch.zeros(input_dim)) for _ in range(cross_layers)
        ])

        # Deep Network
        deep_layers = []
        in_dim = input_dim
        for dim in deep_dims:
            deep_layers.extend([
                nn.Linear(in_dim, dim), nn.LayerNorm(dim), nn.ReLU(), nn.Dropout(dropout)
            ])
            in_dim = dim
        self.deep = nn.Sequential(*deep_layers)

        # 최종 예측
        self.output = nn.Linear(input_dim + deep_dims[-1], 1)

    def cross_forward(self, x0, x):
        """Cross Network 포워드"""
        for w, b in zip(self.cross_weights, self.cross_biases):
            x = x0 * (torch.matmul(x, w) + b.unsqueeze(0)) + x
        return x

    def forward(self, x):
        x0 = x.clone()
        cross_out = self.cross_forward(x0, x)
        deep_out  = self.deep(x)
        combined  = torch.cat([cross_out, deep_out], dim=1)
        return torch.sigmoid(self.output(combined)).squeeze()


dcn = DeepCrossNetwork(input_dim=128, cross_layers=3).to(device)
print(f"DCN 파라미터: {sum(p.numel() for p in dcn.parameters()):,}")

8.2 콜드 스타트 문제 해결

class ColdStartSolver:
    """콜드 스타트 문제 해결 전략"""

    @staticmethod
    def content_based_for_new_items(item_description, item_encoder, existing_item_embs):
        """
        새 아이템 콜드 스타트:
        콘텐츠 기반으로 유사 아이템 찾기
        """
        new_item_emb = item_encoder.encode([item_description])
        similarities = torch.matmul(new_item_emb, existing_item_embs.T)
        similar_items = similarities.topk(5).indices[0]
        return similar_items

    @staticmethod
    def demographic_for_new_users(user_demographics, user_groups):
        """
        새 사용자 콜드 스타트:
        인구통계 기반 그룹 추천
        """
        # 나이, 지역, 관심사로 유사 사용자 그룹 찾기
        print("새 사용자 콜드 스타트 전략:")
        print("  1. 온보딩 설문 (선호 장르, 인기도 취향)")
        print("  2. 인구통계 기반 그룹 추천")
        print("  3. 탐색-활용 균형 (Explore-Exploit)")
        print("  4. 암시적 피드백 빠른 수집")

    @staticmethod
    def explore_exploit_bandit(n_items, exploration_rate=0.1):
        """
        Epsilon-Greedy 탐색-활용 균형
        """
        if np.random.random() < exploration_rate:
            # 탐색: 랜덤 추천
            return np.random.randint(n_items)
        else:
            # 활용: 현재 최선 추천
            return 0  # 최고 점수 아이템

ColdStartSolver.demographic_for_new_users(None, None)

9. 실전 구현: Surprise 라이브러리

9.1 Surprise를 이용한 빠른 추천 시스템 구축

def demo_surprise():
    """
    Surprise 라이브러리를 이용한 협업 필터링

    설치: pip install scikit-surprise
    """
    usage_note = """
    from surprise import Dataset, Reader, SVD, KNNBasic, NMF
    from surprise.model_selection import cross_validate, train_test_split
    from surprise import accuracy

    # MovieLens 100K 데이터 로드
    data = Dataset.load_builtin('ml-100k')

    # 훈련/테스트 분리
    trainset, testset = train_test_split(data, test_size=0.2, random_state=42)

    # SVD 모델
    svd = SVD(n_factors=100, n_epochs=20, lr_all=0.005, reg_all=0.02)
    svd.fit(trainset)

    predictions = svd.test(testset)
    print(f"SVD RMSE: {accuracy.rmse(predictions):.4f}")
    print(f"SVD MAE:  {accuracy.mae(predictions):.4f}")

    # 교차 검증
    cv_results = cross_validate(SVD(), data, measures=['RMSE', 'MAE'], cv=5, verbose=True)
    print(f"평균 RMSE: {cv_results['test_rmse'].mean():.4f}")

    # KNN 기반 협업 필터링
    knn_user = KNNBasic(k=40, sim_options={'name': 'pearson', 'user_based': True})
    knn_item = KNNBasic(k=40, sim_options={'name': 'pearson', 'user_based': False})

    # 특정 사용자에 대한 상위 추천
    user_id = '196'
    inner_id = trainset.to_inner_uid(user_id)
    user_ratings = trainset.ur[inner_id]

    # 아직 평가하지 않은 아이템 추천
    all_items = set(trainset.all_items())
    rated_items = set(iid for iid, _ in user_ratings)
    unrated = all_items - rated_items

    predictions_unrated = [svd.predict(user_id, trainset.to_raw_iid(iid)) for iid in unrated]
    top10 = sorted(predictions_unrated, key=lambda x: x.est, reverse=True)[:10]

    print(f"사용자 {user_id}의 Top-10 추천:")
    for pred in top10:
        print(f"  아이템 {pred.iid}: 예측 평점 {pred.est:.2f}")
    """
    print("Surprise 라이브러리 주요 알고리즘:")
    print("  - SVD: 행렬 분해 (넷플릭스 프라이즈 우승 알고리즘 기반)")
    print("  - SVD++: 암시적 피드백 포함 SVD")
    print("  - NMF: 비음수 행렬 분해")
    print("  - KNNBasic/Means/Baseline: K-최근접 이웃")

demo_surprise()

9.2 LightFM을 이용한 하이브리드 추천

def demo_lightfm():
    """
    LightFM: 협업 필터링 + 콘텐츠 기반 하이브리드

    설치: pip install lightfm
    """
    usage_note = """
    from lightfm import LightFM
    from lightfm.data import Dataset
    from lightfm.evaluation import precision_at_k, auc_score
    from lightfm.datasets import fetch_movielens

    # MovieLens 데이터 로드
    movielens = fetch_movielens()

    train = movielens['train']
    test  = movielens['test']

    # BPR 손실 (순위 학습)
    model_bpr = LightFM(
        no_components=30,
        loss='bpr',
        learning_rate=0.05,
        item_alpha=1e-6,
        user_alpha=1e-6
    )
    model_bpr.fit(train, epochs=30, num_threads=4, verbose=True)

    # WARP 손실 (더 강력한 랭킹)
    model_warp = LightFM(
        no_components=30,
        loss='warp',
        learning_rate=0.05
    )
    model_warp.fit(train, epochs=30, num_threads=4)

    # 평가
    test_precision_bpr  = precision_at_k(model_bpr,  test, k=10).mean()
    test_precision_warp = precision_at_k(model_warp, test, k=10).mean()
    print(f"BPR  Precision@10: {test_precision_bpr:.4f}")
    print(f"WARP Precision@10: {test_precision_warp:.4f}")

    # 아이템 특성 추가 (하이브리드)
    dataset = Dataset()
    dataset.fit(
        users=range(n_users),
        items=range(n_items),
        item_features=['genre:action', 'genre:comedy', 'genre:drama']
    )

    model_hybrid = LightFM(no_components=30, loss='warp')
    model_hybrid.fit(
        interactions,
        item_features=item_feature_matrix,
        epochs=30
    )
    """
    print("LightFM 하이브리드 추천:")
    print("  - 협업 필터링 + 아이템/유저 특성 결합")
    print("  - BPR, WARP, logistic, warp-kos 손실 지원")
    print("  - 콜드 스타트 문제 완화 (콘텐츠 특성 활용)")

demo_lightfm()

10. 모델 성능 비교 및 선택 가이드

# 추천 시스템 모델 비교
comparison = pd.DataFrame({
    'Model': [
        'User-based KNN',
        'SVD (Matrix Factorization)',
        'BPR-MF',
        'Neural CF (NCF)',
        'Two-Tower',
        'SASRec',
        'LightGCN',
        'LLM-based'
    ],
    'Precision@10': [0.042, 0.061, 0.068, 0.075, 0.072, 0.089, 0.085, 0.078],
    'Recall@10':    [0.134, 0.198, 0.221, 0.244, 0.238, 0.289, 0.279, 0.261],
    'NDCG@10':      [0.089, 0.124, 0.138, 0.158, 0.154, 0.187, 0.179, 0.169],
    'Training Time':['1min', '5min', '3min', '20min', '30min', '25min', '40min', '60min+'],
    'Scale':        ['Small', 'Medium', 'Medium', 'Large', 'Very Large', 'Large', 'Large', 'Any'],
    'Cold Start':   ['Poor', 'Poor', 'Poor', 'Poor', 'Good', 'Fair', 'Fair', 'Excellent']
})

print("추천 시스템 모델 성능 비교 (MovieLens 1M 기준)")
print("=" * 90)
print(comparison.to_string(index=False))

print("\n모델 선택 가이드:")
print("  - 소규모 (~100K 상호작용): SVD, User-based KNN")
print("  - 중규모 (~1M 상호작용): NCF, BPR-MF")
print("  - 대규모 (10M+): Two-Tower + LightGCN + SASRec")
print("  - 콜드 스타트 중요: LLM 기반 인코딩 + Two-Tower")
print("  - 실시간 추천: Two-Tower (사전 계산된 임베딩) + Faiss")

마무리

이 가이드에서는 추천 시스템의 전체 스펙트럼을 다루었습니다.

핵심 요약:

  1. 기초 이해: 협업 필터링의 원리와 평가 지표 (Precision@K, NDCG)
  2. 행렬 분해: SVD, BPR — 강력한 기준선 모델
  3. NCF: 딥러닝으로 MF의 한계 극복
  4. Two-Tower: 대규모 서비스의 핵심 아키텍처
  5. 시퀀스 모델: SASRec, BERT4Rec — 사용자 행동 순서 활용
  6. 그래프 모델: LightGCN — 고차 연결성 학습
  7. LLM 추천: 텍스트 의미 이해로 콜드 스타트 해결

실전 조언:

  • 항상 BPR-MF로 강력한 기준선부터 구축하세요
  • 서비스 규모가 커지면 멀티 스테이지 아키텍처가 필수입니다
  • 시퀀스 정보가 있다면 SASRec이 뛰어납니다
  • 콜드 스타트가 중요하면 LLM 임베딩을 활용하세요

참고 자료:

Deep Learning Recommendation Systems Complete Guide: From Collaborative Filtering to LLM-based RecSys

Introduction

Recommendation systems power the modern digital experience — Netflix movie suggestions, Amazon product recommendations, Spotify music discovery. Netflix alone reportedly creates over $1 billion in annual value from its recommendation engine.

This guide takes you from classical collaborative filtering all the way through graph neural networks and LLM-based recommendation. Every section includes production-ready PyTorch code.


1. Recommendation System Fundamentals

1.1 Three Types of Recommendation Systems

Collaborative Filtering leverages patterns from similar users or items.

  • User-based: "People like you also enjoyed this."
  • Item-based: "Because you liked X, you may enjoy Y."

Content-Based Filtering analyzes item attributes (genre, director, description, etc.).

  • Can recommend new items (mitigates the cold-start problem)
  • Feature engineering quality matters

Hybrid Methods combine both approaches to compensate for their respective weaknesses.

1.2 Implicit vs. Explicit Feedback

Explicit feedback: ratings, likes, dislikes — clear intent but sparse. Implicit feedback: clicks, watch time, purchases — abundant but noisy.

Real-world systems rely heavily on implicit feedback because it is far more plentiful.

1.3 Evaluation Metrics

import numpy as np
from sklearn.metrics import ndcg_score

def precision_at_k(recommended, relevant, k):
    """Precision@K: fraction of top-K items that are relevant"""
    hits = len(set(recommended[:k]) & set(relevant))
    return hits / k

def recall_at_k(recommended, relevant, k):
    """Recall@K: fraction of relevant items found in top-K"""
    hits = len(set(recommended[:k]) & set(relevant))
    return hits / len(relevant) if relevant else 0

def average_precision_at_k(recommended, relevant, k):
    """AP@K: cumulative average of precision at each hit (rank-aware)"""
    if not relevant:
        return 0.0
    hits, sum_prec = 0, 0.0
    for i, item in enumerate(recommended[:k]):
        if item in relevant:
            hits += 1
            sum_prec += hits / (i + 1)
    return sum_prec / min(len(relevant), k)

def ndcg_at_k(recommended, relevant, k):
    """NDCG@K: Normalized Discounted Cumulative Gain"""
    relevance = [1 if item in relevant else 0 for item in recommended[:k]]
    if not any(relevance):
        return 0.0
    dcg  = sum(rel / np.log2(i + 2) for i, rel in enumerate(relevance))
    idcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(sorted(relevance, reverse=True)))
    return dcg / idcg if idcg > 0 else 0.0

# Example
recommended = [1, 4, 7, 2, 9, 3, 5, 6, 8, 10]
relevant    = {1, 2, 5, 7, 8}

print("=" * 40)
print("Recommendation Evaluation Metrics")
print("=" * 40)
for k in [5, 10]:
    print(f"\nk = {k}")
    print(f"  Precision@{k}: {precision_at_k(recommended, relevant, k):.4f}")
    print(f"  Recall@{k}:    {recall_at_k(recommended, relevant, k):.4f}")
    print(f"  AP@{k}:        {average_precision_at_k(recommended, relevant, k):.4f}")
    print(f"  NDCG@{k}:      {ndcg_at_k(recommended, relevant, k):.4f}")

2. Matrix Factorization

2.1 Core Concept

Matrix Factorization decomposes the user-item interaction matrix R into two lower-rank matrices:

R ≈ U × V^T

  • U: user embedding matrix (n_users × k)
  • V: item embedding matrix (n_items × k)
  • k: number of latent factors

2.2 Matrix Factorization in PyTorch

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

def generate_synthetic_ratings(n_users=1000, n_items=500, n_ratings=50000):
    np.random.seed(42)
    k = 20
    user_f = np.random.randn(n_users, k) * 0.5
    item_f = np.random.randn(n_items, k) * 0.5

    true_r = user_f @ item_f.T
    true_r = (true_r - true_r.min()) / (true_r.max() - true_r.min()) * 4 + 1

    uid = np.random.choice(n_users, n_ratings)
    iid = np.random.choice(n_items, n_ratings)
    rat = np.clip(true_r[uid, iid] + np.random.randn(n_ratings) * 0.3, 1, 5)

    df = pd.DataFrame({'user_id': uid, 'item_id': iid, 'rating': rat})
    return df.drop_duplicates(subset=['user_id', 'item_id'])

ratings_df = generate_synthetic_ratings()
train_df, test_df = train_test_split(ratings_df, test_size=0.2, random_state=42)

n_users = ratings_df['user_id'].max() + 1
n_items = ratings_df['item_id'].max() + 1

print(f"Total ratings: {len(ratings_df)}")
print(f"Users: {n_users}  Items: {n_items}")


class RatingsDataset(Dataset):
    def __init__(self, df):
        self.users   = torch.LongTensor(df['user_id'].values)
        self.items   = torch.LongTensor(df['item_id'].values)
        self.ratings = torch.FloatTensor(df['rating'].values)

    def __len__(self):
        return len(self.ratings)

    def __getitem__(self, idx):
        return self.users[idx], self.items[idx], self.ratings[idx]


class MatrixFactorization(nn.Module):
    def __init__(self, n_users, n_items, n_factors=64):
        super().__init__()
        self.user_embedding = nn.Embedding(n_users, n_factors)
        self.item_embedding = nn.Embedding(n_items, n_factors)
        self.user_bias      = nn.Embedding(n_users, 1)
        self.item_bias      = nn.Embedding(n_items, 1)
        self.global_bias    = nn.Parameter(torch.zeros(1))

        nn.init.normal_(self.user_embedding.weight, 0, 0.01)
        nn.init.normal_(self.item_embedding.weight, 0, 0.01)
        nn.init.zeros_(self.user_bias.weight)
        nn.init.zeros_(self.item_bias.weight)

    def forward(self, user_ids, item_ids):
        u = self.user_embedding(user_ids)
        v = self.item_embedding(item_ids)
        dot   = (u * v).sum(dim=1)
        u_b   = self.user_bias(user_ids).squeeze()
        i_b   = self.item_bias(item_ids).squeeze()
        return dot + u_b + i_b + self.global_bias


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
mf_model = MatrixFactorization(n_users, n_items, n_factors=64).to(device)

train_loader = DataLoader(RatingsDataset(train_df), batch_size=512, shuffle=True)
test_loader  = DataLoader(RatingsDataset(test_df),  batch_size=512, shuffle=False)

optimizer = optim.Adam(mf_model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion = nn.MSELoss()


def train_mf(model, loader, optimizer, criterion, device):
    model.train()
    total = 0
    for u, i, r in loader:
        u, i, r = u.to(device), i.to(device), r.to(device)
        optimizer.zero_grad()
        loss = criterion(model(u, i), r)
        loss.backward()
        optimizer.step()
        total += loss.item() * len(r)
    return (total / len(loader.dataset)) ** 0.5


def eval_mf(model, loader, device):
    model.eval()
    preds, targets = [], []
    with torch.no_grad():
        for u, i, r in loader:
            preds.extend(model(u.to(device), i.to(device)).cpu().tolist())
            targets.extend(r.tolist())
    p, t = np.array(preds), np.array(targets)
    return np.sqrt(((p-t)**2).mean()), np.abs(p-t).mean()


for epoch in range(30):
    tr = train_mf(mf_model, train_loader, optimizer, criterion, device)
    if (epoch+1) % 10 == 0:
        rmse, mae = eval_mf(mf_model, test_loader, device)
        print(f"Epoch {epoch+1:2d} | Train RMSE: {tr:.4f} | Test RMSE: {rmse:.4f} | MAE: {mae:.4f}")

2.3 BPR (Bayesian Personalized Ranking)

BPR is a pairwise learning method for implicit feedback. It assumes users prefer items they interacted with over those they did not.

class BPRModel(nn.Module):
    def __init__(self, n_users, n_items, n_factors=64):
        super().__init__()
        self.user_emb = nn.Embedding(n_users, n_factors)
        self.item_emb = nn.Embedding(n_items, n_factors)
        nn.init.normal_(self.user_emb.weight, 0, 0.01)
        nn.init.normal_(self.item_emb.weight, 0, 0.01)

    def forward(self, u, pos, neg):
        ue  = self.user_emb(u)
        pe  = self.item_emb(pos)
        ne  = self.item_emb(neg)
        return (ue * pe).sum(-1), (ue * ne).sum(-1)

    def predict(self, u, i):
        return (self.user_emb(u) * self.item_emb(i)).sum(-1)


def bpr_loss(pos_score, neg_score, model=None, reg=1e-5):
    loss = -torch.log(torch.sigmoid(pos_score - neg_score)).mean()
    if model and reg:
        loss += reg * sum(p.norm(2) for p in model.parameters())
    return loss


class BPRDataset(Dataset):
    def __init__(self, df, n_items):
        self.users     = df['user_id'].values
        self.pos_items = df['item_id'].values
        self.n_items   = n_items
        self.user_items = df.groupby('user_id')['item_id'].apply(set).to_dict()

    def __len__(self):
        return len(self.users)

    def __getitem__(self, idx):
        u = self.users[idx]
        p = self.pos_items[idx]
        n = np.random.randint(self.n_items)
        while n in self.user_items.get(u, set()):
            n = np.random.randint(self.n_items)
        return torch.tensor(u), torch.tensor(p), torch.tensor(n)


bpr_model   = BPRModel(n_users, n_items).to(device)
bpr_loader  = DataLoader(BPRDataset(train_df, n_items), batch_size=512, shuffle=True)
bpr_opt     = optim.Adam(bpr_model.parameters(), lr=1e-3)

for epoch in range(20):
    bpr_model.train()
    total = 0
    for u, p, n in bpr_loader:
        u, p, n = u.to(device), p.to(device), n.to(device)
        bpr_opt.zero_grad()
        ps, ns = bpr_model(u, p, n)
        loss   = bpr_loss(ps, ns, bpr_model)
        loss.backward()
        bpr_opt.step()
        total += loss.item()
    if (epoch+1) % 5 == 0:
        print(f"BPR Epoch {epoch+1:2d} | Loss: {total/len(bpr_loader):.4f}")

3. Neural Collaborative Filtering (NCF)

3.1 Architecture Overview

NCF extends matrix factorization with deep learning by combining two complementary paths.

GMF (Generalized Matrix Factorization): Element-wise product of embeddings — a generalization of MF. MLP: Concatenated embeddings passed through non-linear layers.

The two outputs are fused into a final prediction.

class NCF(nn.Module):
    """
    Neural Collaborative Filtering
    He et al., 2017 — arxiv.org/abs/1708.05031
    """
    def __init__(self, n_users, n_items, n_factors=64, mlp_dims=None, dropout=0.2):
        super().__init__()
        if mlp_dims is None:
            mlp_dims = [256, 128, 64]

        # GMF embeddings
        self.gmf_user = nn.Embedding(n_users, n_factors)
        self.gmf_item = nn.Embedding(n_items, n_factors)

        # MLP embeddings (separate from GMF)
        self.mlp_user = nn.Embedding(n_users, n_factors)
        self.mlp_item = nn.Embedding(n_items, n_factors)

        # MLP layers
        mlp_layers, in_dim = [], n_factors * 2
        for dim in mlp_dims:
            mlp_layers += [nn.Linear(in_dim, dim), nn.BatchNorm1d(dim), nn.ReLU(), nn.Dropout(dropout)]
            in_dim = dim
        self.mlp = nn.Sequential(*mlp_layers)

        # Final prediction head
        self.output = nn.Linear(n_factors + mlp_dims[-1], 1)

        for emb in [self.gmf_user, self.gmf_item, self.mlp_user, self.mlp_item]:
            nn.init.normal_(emb.weight, 0, 0.01)

    def forward(self, user_ids, item_ids):
        gmf_out = self.gmf_user(user_ids) * self.gmf_item(item_ids)
        mlp_in  = torch.cat([self.mlp_user(user_ids), self.mlp_item(item_ids)], dim=1)
        mlp_out = self.mlp(mlp_in)
        return torch.sigmoid(self.output(torch.cat([gmf_out, mlp_out], dim=1))).squeeze()


class ImplicitDataset(Dataset):
    def __init__(self, pos_df, n_items, neg_ratio=4):
        self.users, self.items, self.labels = [], [], []
        ui = pos_df.groupby('user_id')['item_id'].apply(set).to_dict()
        for _, row in pos_df.iterrows():
            u, i = row['user_id'], row['item_id']
            self.users.append(u); self.items.append(i); self.labels.append(1.0)
            for _ in range(neg_ratio):
                n = np.random.randint(n_items)
                while n in ui.get(u, set()):
                    n = np.random.randint(n_items)
                self.users.append(u); self.items.append(n); self.labels.append(0.0)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return (torch.tensor(self.users[idx]), torch.tensor(self.items[idx]),
                torch.tensor(self.labels[idx]))


implicit_train = train_df[train_df['rating'] >= 3.5].copy()
impl_loader    = DataLoader(ImplicitDataset(implicit_train, n_items), batch_size=1024, shuffle=True)

ncf = NCF(n_users, n_items, n_factors=64).to(device)
ncf_opt  = optim.Adam(ncf.parameters(), lr=1e-3, weight_decay=1e-5)
bce_loss = nn.BCELoss()

print(f"NCF parameters: {sum(p.numel() for p in ncf.parameters()):,}")

for epoch in range(20):
    ncf.train()
    total = 0
    for u, i, lbl in impl_loader:
        u, i, lbl = u.to(device), i.to(device), lbl.to(device)
        ncf_opt.zero_grad()
        loss = bce_loss(ncf(u, i), lbl)
        loss.backward()
        ncf_opt.step()
        total += loss.item()
    if (epoch+1) % 5 == 0:
        print(f"NCF Epoch {epoch+1:2d} | Loss: {total/len(impl_loader):.4f}")

4. Two-Tower Model

4.1 Architecture Overview

The Two-Tower model (Dual Encoder / Bi-Encoder) learns user and item representations independently. Recommendation scores are dot products between the two towers' embeddings.

Advantages at scale:

  • Item embeddings can be pre-computed offline.
  • Fast Approximate Nearest Neighbor (ANN) search at inference time.
  • Scales to billions of items.

Used at YouTube, Google, Spotify, Pinterest, and more.

class UserTower(nn.Module):
    def __init__(self, n_users, feat_dim, embed_dim=128, hidden_dims=None):
        super().__init__()
        if hidden_dims is None:
            hidden_dims = [256, 128]
        self.id_emb = nn.Embedding(n_users, embed_dim)
        layers, in_d = [], embed_dim + feat_dim
        for h in hidden_dims:
            layers += [nn.Linear(in_d, h), nn.LayerNorm(h), nn.ReLU(), nn.Dropout(0.1)]
            in_d = h
        layers.append(nn.Linear(in_d, embed_dim))
        self.net = nn.Sequential(*layers)

    def forward(self, user_ids, user_features):
        x = torch.cat([self.id_emb(user_ids), user_features], dim=1)
        return nn.functional.normalize(self.net(x), dim=-1)


class ItemTower(nn.Module):
    def __init__(self, n_items, feat_dim, embed_dim=128, hidden_dims=None):
        super().__init__()
        if hidden_dims is None:
            hidden_dims = [256, 128]
        self.id_emb = nn.Embedding(n_items, embed_dim)
        layers, in_d = [], embed_dim + feat_dim
        for h in hidden_dims:
            layers += [nn.Linear(in_d, h), nn.LayerNorm(h), nn.ReLU(), nn.Dropout(0.1)]
            in_d = h
        layers.append(nn.Linear(in_d, embed_dim))
        self.net = nn.Sequential(*layers)

    def forward(self, item_ids, item_features):
        x = torch.cat([self.id_emb(item_ids), item_features], dim=1)
        return nn.functional.normalize(self.net(x), dim=-1)


class TwoTowerModel(nn.Module):
    def __init__(self, n_users, n_items, user_fd, item_fd, embed_dim=128):
        super().__init__()
        self.user_tower = UserTower(n_users, user_fd, embed_dim)
        self.item_tower = ItemTower(n_items, item_fd, embed_dim)
        self.temperature = nn.Parameter(torch.tensor(0.07))

    def forward(self, uid, uf, iid, if_):
        return self.user_tower(uid, uf), self.item_tower(iid, if_)

    def similarity(self, u_emb, i_emb):
        return torch.matmul(u_emb, i_emb.T) / self.temperature.exp()


def info_nce_loss(sim):
    n = sim.size(0)
    labels = torch.arange(n, device=sim.device)
    return nn.CrossEntropyLoss()(sim, labels)


user_feat_dim = 16
item_feat_dim = 32
user_feats = torch.FloatTensor(np.random.randn(n_users, user_feat_dim))
item_feats = torch.FloatTensor(np.random.randn(n_items, item_feat_dim))

two_tower = TwoTowerModel(n_users, n_items, user_feat_dim, item_feat_dim).to(device)
print(f"Two-Tower parameters: {sum(p.numel() for p in two_tower.parameters()):,}")


def precompute_item_embeddings(model, n_items, item_feats, batch_size=256, device='cpu'):
    model.eval()
    all_embs = []
    with torch.no_grad():
        for s in range(0, n_items, batch_size):
            e   = min(s + batch_size, n_items)
            ids = torch.arange(s, e, device=device)
            emb = model.item_tower(ids, item_feats[s:e].to(device))
            all_embs.append(emb.cpu())
    return torch.cat(all_embs)

item_cache = precompute_item_embeddings(two_tower, n_items, item_feats, device=device)
print(f"Pre-computed item embeddings: {item_cache.shape}")

4.2 ANN Search with Faiss

def demo_faiss():
    """
    Approximate Nearest Neighbor search with Faiss.
    Install: pip install faiss-cpu  (or faiss-gpu)
    """
    usage = """
    import faiss

    embed_dim = 128
    item_embs = item_cache.numpy().astype('float32')
    faiss.normalize_L2(item_embs)

    index = faiss.IndexFlatIP(embed_dim)    # exact inner-product
    # For large scale: IndexIVFFlat or IndexHNSWFlat
    index.add(item_embs)

    user_q = user_emb.numpy().astype('float32')
    faiss.normalize_L2(user_q)

    scores, indices = index.search(user_q, k=100)
    print("Top-100 candidate items:", indices[0])
    """
    print("Faiss index types:")
    print("  IndexFlatIP   — exact inner-product (small scale)")
    print("  IndexIVFFlat  — inverted file index (medium scale)")
    print("  IndexHNSWFlat — hierarchical graph (large scale, fast)")
    print("  IndexPQ       — product quantization (memory-efficient)")

demo_faiss()

5. Sequential Recommendation

5.1 SASRec (Self-Attentive Sequential Recommendation)

SASRec uses Transformer Self-Attention to identify important items in a user's interaction sequence.

class SASRecBlock(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        self.attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True)
        self.ff   = nn.Sequential(
            nn.Linear(d_model, d_model*4), nn.GELU(), nn.Dropout(dropout),
            nn.Linear(d_model*4, d_model), nn.Dropout(dropout)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        L = x.size(1)
        causal = torch.triu(torch.ones(L, L, device=x.device), diagonal=1).bool()
        a, _   = self.attn(x, x, x, attn_mask=causal)
        x = self.norm1(x + a)
        return self.norm2(x + self.ff(x))


class SASRec(nn.Module):
    """
    Self-Attentive Sequential Recommendation
    Kang and McAuley, 2018 — arxiv.org/abs/1808.09781
    """
    def __init__(self, n_items, max_seq_len, d_model=128, n_heads=4, num_layers=2, dropout=0.1):
        super().__init__()
        self.item_emb = nn.Embedding(n_items + 1, d_model, padding_idx=0)
        self.pos_emb  = nn.Embedding(max_seq_len, d_model)
        self.blocks   = nn.ModuleList([SASRecBlock(d_model, n_heads, dropout) for _ in range(num_layers)])
        self.norm     = nn.LayerNorm(d_model)
        self.dropout  = nn.Dropout(dropout)

    def forward(self, seq):
        L   = seq.size(1)
        pos = torch.arange(L, device=seq.device).unsqueeze(0)
        x   = self.dropout(self.item_emb(seq) + self.pos_emb(pos))
        for blk in self.blocks:
            x = blk(x)
        return self.norm(x)

    def predict(self, seq, candidates):
        repr_ = self.forward(seq)[:, -1, :]              # (batch, d_model)
        c_emb = self.item_emb(candidates)                # (batch, n_cand, d_model)
        return (repr_.unsqueeze(1) * c_emb).sum(-1)      # (batch, n_cand)


class SequentialDataset(Dataset):
    def __init__(self, df, max_seq_len=50, min_seq_len=5):
        self.max_seq_len = max_seq_len
        self.sequences   = []

        for user_id, grp in df.sort_values('user_id').groupby('user_id'):
            items = grp['item_id'].tolist()
            if len(items) < min_seq_len:
                continue
            for i in range(min_seq_len, len(items) + 1):
                seq    = items[max(0, i-max_seq_len-1):i-1]
                target = items[i-1] + 1
                padded = ([0] * (max_seq_len - len(seq)) + seq)[-max_seq_len:]
                self.sequences.append((padded, target))

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        seq, tgt = self.sequences[idx]
        return torch.LongTensor(seq), torch.tensor(tgt)


sasrec = SASRec(n_items, max_seq_len=50, d_model=128, n_heads=4, num_layers=2).to(device)
print(f"SASRec parameters: {sum(p.numel() for p in sasrec.parameters()):,}")

5.2 BERT4Rec

BERT4Rec applies BERT's Masked Language Modeling (MLM) to sequential recommendation. Random items are masked and predicted, enabling the model to learn from bidirectional context.

class BERT4Rec(nn.Module):
    """
    BERT4Rec: Sequential Recommendation with BERT
    Sun et al., 2019
    """
    def __init__(self, n_items, max_seq_len, d_model=256, n_heads=4,
                 num_layers=2, dropout=0.1, mask_prob=0.15):
        super().__init__()
        self.mask_id   = n_items + 1
        self.n_items   = n_items
        self.mask_prob = mask_prob

        self.item_emb = nn.Embedding(n_items + 2, d_model, padding_idx=0)
        self.pos_emb  = nn.Embedding(max_seq_len, d_model)

        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=n_heads, dim_feedforward=d_model*4,
            dropout=dropout, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.norm   = nn.LayerNorm(d_model)
        self.output = nn.Linear(d_model, n_items + 2)

    def forward(self, seq):
        L   = seq.size(1)
        pos = torch.arange(L, device=seq.device).unsqueeze(0)
        x   = self.item_emb(seq) + self.pos_emb(pos)
        return self.output(self.norm(self.transformer(x)))

    def mask_seq(self, seq):
        masked = seq.clone()
        mask   = (torch.rand_like(seq.float()) < self.mask_prob) & (seq != 0)
        masked[mask] = self.mask_id
        return masked, mask

6. Graph-Based Recommendation: LightGCN

6.1 LightGCN Architecture

LightGCN (Light Graph Convolution Network) learns high-order connectivity on the user-item bipartite graph via message passing. Removing transformation matrices and non-linear activations keeps it lightweight.

class LightGCN(nn.Module):
    """
    LightGCN: Simplifying and Powering GCN for Recommendation
    He et al., 2020 — arxiv.org/abs/2202.01151
    """
    def __init__(self, n_users, n_items, embed_dim=64, n_layers=3):
        super().__init__()
        self.n_users = n_users
        self.n_items = n_items
        self.n_layers = n_layers
        self.embed_dim = embed_dim

        self.user_emb = nn.Embedding(n_users, embed_dim)
        self.item_emb = nn.Embedding(n_items, embed_dim)
        nn.init.normal_(self.user_emb.weight, std=0.1)
        nn.init.normal_(self.item_emb.weight, std=0.1)

    def compute_adj(self, interactions, device):
        n = self.n_users + self.n_items
        uid = interactions[:, 0]
        iid = interactions[:, 1] + self.n_users
        row = torch.cat([uid, iid])
        col = torch.cat([iid, uid])
        edge_index = torch.stack([row, col]).to(device)

        deg = torch.zeros(n, device=device)
        deg.scatter_add_(0, row, torch.ones(len(row), device=device))
        d_inv_sqrt = deg.pow(-0.5)
        d_inv_sqrt[d_inv_sqrt == float('inf')] = 0

        return edge_index, d_inv_sqrt[row] * d_inv_sqrt[col], n

    def forward(self, interactions):
        dev = self.user_emb.weight.device
        edge_idx, edge_wt, n = self.compute_adj(interactions, dev)

        all_emb = torch.cat([self.user_emb.weight, self.item_emb.weight])
        layers  = [all_emb]

        for _ in range(self.n_layers):
            agg = torch.zeros_like(all_emb)
            agg.scatter_add_(
                0,
                edge_idx[1].unsqueeze(1).expand(-1, self.embed_dim),
                all_emb[edge_idx[0]] * edge_wt.unsqueeze(1)
            )
            all_emb = agg
            layers.append(all_emb)

        final = torch.stack(layers).mean(0)
        return final[:self.n_users], final[self.n_users:]

    def bpr_loss(self, u_emb, i_emb, users, pos, neg, lam=1e-4):
        ue = u_emb[users]; pe = i_emb[pos]; ne = i_emb[neg]
        ps = (ue * pe).sum(-1); ns = (ue * ne).sum(-1)
        loss = -torch.log(torch.sigmoid(ps - ns)).mean()
        reg  = (self.user_emb.weight[users].norm(2).pow(2) +
                self.item_emb.weight[pos].norm(2).pow(2) +
                self.item_emb.weight[neg].norm(2).pow(2)) / (2 * len(users))
        return loss + lam * reg


lightgcn = LightGCN(n_users, n_items, embed_dim=64, n_layers=3).to(device)
interactions_t = torch.LongTensor(train_df[['user_id', 'item_id']].values)
print(f"LightGCN parameters: {sum(p.numel() for p in lightgcn.parameters()):,}")

7. LLM-Based Recommendation

7.1 How to Use LLMs in RecSys

LLMs can enrich recommendation systems in several ways:

  1. Item feature encoding: Embed item descriptions using an LLM.
  2. Prompt-based recommendation: Ask an LLM to rank items directly.
  3. User profile text: Convert user behavior into natural language.
  4. Explanation generation: Generate natural-language reasons for recommendations.
import torch.nn.functional as F

def mean_pooling(model_output, attention_mask):
    """Mean-pool token embeddings"""
    tok = model_output[0]
    mask_exp = attention_mask.unsqueeze(-1).expand(tok.size()).float()
    return torch.sum(tok * mask_exp, 1) / torch.clamp(mask_exp.sum(1), min=1e-9)


class LLMItemEncoder:
    """Encode item descriptions into embeddings using an LLM"""

    def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
        self.model_name = model_name
        print(f"LLM encoder: {model_name}")

    def encode(self, texts, batch_size=32):
        """
        Real implementation:
        from transformers import AutoTokenizer, AutoModel

        tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        model     = AutoModel.from_pretrained(self.model_name)

        embeddings = []
        for i in range(0, len(texts), batch_size):
            batch   = texts[i:i+batch_size]
            encoded = tokenizer(batch, padding=True, truncation=True,
                                max_length=128, return_tensors='pt')
            with torch.no_grad():
                out = model(**encoded)
            emb = F.normalize(mean_pooling(out, encoded['attention_mask']), dim=1)
            embeddings.append(emb)
        return torch.cat(embeddings)
        """
        return torch.randn(len(texts), 384)  # simulation


movie_descriptions = [
    "A thrilling sci-fi adventure set in space with stunning visual effects",
    "A heartwarming romantic comedy about finding love in unexpected places",
    "An intense psychological thriller with unexpected plot twists",
    "An animated fantasy film perfect for families and children",
    "A gripping crime drama based on true events"
]

encoder     = LLMItemEncoder()
item_llm_emb = encoder.encode(movie_descriptions)
sim_matrix   = torch.matmul(item_llm_emb, item_llm_emb.T)
print(f"LLM item embedding shape: {item_llm_emb.shape}")
print("\nItem-to-item similarity matrix:")
print(sim_matrix.numpy().round(3))

7.2 Prompt-Based Recommendation

def build_rec_prompt(user_history, candidates, user_profile=None):
    history_str   = "\n".join(f"  - {m}" for m in user_history)
    candidate_str = "\n".join(f"  {i+1}. {m}" for i, m in enumerate(candidates))
    profile_str   = f"\nUser profile: {user_profile}" if user_profile else ""

    return f"""You are an expert personalized movie recommender.{profile_str}

Movies the user has recently enjoyed:
{history_str}

From the following candidate movies, rank those the user would most likely enjoy.
Provide a one-sentence explanation for each recommendation.

Candidates:
{candidate_str}

Please respond in this format:
1. [Movie title] - [Reason]
2. [Movie title] - [Reason]
3. [Movie title] - [Reason]"""


user_history = ["Interstellar (2014)", "The Matrix (1999)", "Blade Runner 2049 (2017)"]
candidates   = ["Avatar: The Way of Water (2022)", "The Notebook (2004)",
                "Parasite (2019)", "Dune (2021)", "About Time (2013)"]

prompt = build_rec_prompt(
    user_history=user_history,
    candidates=candidates,
    user_profile="Prefers sci-fi and thrillers; values world-building and visual craft"
)
print("Generated prompt:")
print("=" * 60)
print(prompt)
print("=" * 60)

8. Industrial-Scale Recommendation Systems

8.1 Multi-Stage Architecture

Real-world large-scale systems run in multiple stages:

class IndustrialRecSystem:
    """
    Industrial RecSys overview:

    Stage 1 — Retrieval:  millions → hundreds of candidates  (Two-Tower + ANN)
    Stage 2 — Ranking:    hundreds → top ~50               (DCN / xDeepFM)
    Stage 3 — Re-ranking: top ~50  → final 20              (diversity / freshness)
    """

    def __init__(self):
        print("Industrial RecSys initialized")
        print("  Stage 1 — Retrieval: Two-Tower + Faiss (sub-millisecond)")
        print("  Stage 2 — Ranking:   DCN with feature crosses")
        print("  Stage 3 — Re-rank:   MMR / DPP for diversity")

    def retrieval(self, user_emb, index, k=500):
        print(f"  Retrieval: {k} candidates")
        return list(range(k))

    def ranking(self, user_feats, candidates):
        print(f"  Ranking: {len(candidates)} -> 50")
        return candidates[:50]

    def reranking(self, ranked, diversity_weight=0.3):
        print(f"  Re-ranking: diversity_weight={diversity_weight}")
        return ranked[:20]


class DeepCrossNetwork(nn.Module):
    """
    Deep & Cross Network (DCN)
    Wang et al., 2017 — automatic feature crossing
    """
    def __init__(self, input_dim, cross_layers=3, deep_dims=None, dropout=0.1):
        super().__init__()
        if deep_dims is None:
            deep_dims = [256, 128, 64]

        self.cross_w = nn.ParameterList([nn.Parameter(torch.randn(input_dim, 1)) for _ in range(cross_layers)])
        self.cross_b = nn.ParameterList([nn.Parameter(torch.zeros(input_dim)) for _ in range(cross_layers)])

        deep, in_d = [], input_dim
        for d in deep_dims:
            deep += [nn.Linear(in_d, d), nn.LayerNorm(d), nn.ReLU(), nn.Dropout(dropout)]
            in_d = d
        self.deep   = nn.Sequential(*deep)
        self.output = nn.Linear(input_dim + deep_dims[-1], 1)

    def cross(self, x0, x):
        for w, b in zip(self.cross_w, self.cross_b):
            x = x0 * (torch.matmul(x, w) + b.unsqueeze(0)) + x
        return x

    def forward(self, x):
        cross_out = self.cross(x, x.clone())
        deep_out  = self.deep(x)
        return torch.sigmoid(self.output(torch.cat([cross_out, deep_out], dim=1))).squeeze()


dcn = DeepCrossNetwork(128).to(device)
print(f"DCN parameters: {sum(p.numel() for p in dcn.parameters()):,}")

8.2 Cold-Start Handling

class ColdStartStrategies:

    @staticmethod
    def content_for_new_items(description, encoder, existing_embs):
        """New item: find similar existing items via content embeddings"""
        new_emb = encoder.encode([description])
        sims    = torch.matmul(new_emb, existing_embs.T)
        return sims.topk(5).indices[0]

    @staticmethod
    def onboarding_for_new_users():
        print("New user cold-start strategies:")
        print("  1. Onboarding survey (preferred genres, popularity preference)")
        print("  2. Demographic-based group recommendations")
        print("  3. Explore-exploit bandit (epsilon-greedy)")
        print("  4. Rapid implicit feedback collection")

    @staticmethod
    def epsilon_greedy(n_items, epsilon=0.1):
        """Epsilon-greedy exploration-exploitation"""
        if np.random.random() < epsilon:
            return np.random.randint(n_items)  # explore
        return 0  # exploit: highest-scoring item

ColdStartStrategies.onboarding_for_new_users()

9. Real-World Implementation: Surprise Library

9.1 Quick RecSys with Surprise

def demo_surprise():
    """
    Surprise library for collaborative filtering.
    Install: pip install scikit-surprise
    """
    usage = """
    from surprise import Dataset, SVD, KNNBasic
    from surprise.model_selection import cross_validate, train_test_split
    from surprise import accuracy

    data = Dataset.load_builtin('ml-100k')
    trainset, testset = train_test_split(data, test_size=0.2, random_state=42)

    svd = SVD(n_factors=100, n_epochs=20, lr_all=0.005, reg_all=0.02)
    svd.fit(trainset)
    preds = svd.test(testset)
    print(f"SVD RMSE: {accuracy.rmse(preds):.4f}")

    # Cross-validation
    cv = cross_validate(SVD(), data, measures=['RMSE', 'MAE'], cv=5, verbose=True)
    print(f"Mean CV RMSE: {cv['test_rmse'].mean():.4f}")

    # Top-N recommendations for a user
    user_id     = '196'
    inner_id    = trainset.to_inner_uid(user_id)
    rated       = {iid for iid, _ in trainset.ur[inner_id]}
    unrated     = set(trainset.all_items()) - rated
    preds_unrated = sorted(
        [svd.predict(user_id, trainset.to_raw_iid(i)) for i in unrated],
        key=lambda x: x.est, reverse=True
    )[:10]
    """
    print("Surprise library algorithms:")
    print("  SVD    — Matrix factorization (Netflix Prize baseline)")
    print("  SVD++  — SVD with implicit feedback")
    print("  NMF    — Non-negative Matrix Factorization")
    print("  KNNBasic/Means/Baseline — Neighborhood methods")

demo_surprise()

9.2 Hybrid Recommendation with LightFM

def demo_lightfm():
    """
    LightFM: Hybrid collaborative + content-based filtering.
    Install: pip install lightfm
    """
    usage = """
    from lightfm import LightFM
    from lightfm.evaluation import precision_at_k, auc_score
    from lightfm.datasets import fetch_movielens

    data = fetch_movielens()
    train, test = data['train'], data['test']

    # BPR loss
    model_bpr = LightFM(no_components=30, loss='bpr', learning_rate=0.05)
    model_bpr.fit(train, epochs=30, num_threads=4)

    # WARP loss (stronger ranking signal)
    model_warp = LightFM(no_components=30, loss='warp', learning_rate=0.05)
    model_warp.fit(train, epochs=30, num_threads=4)

    print(f"BPR  Precision@10: {precision_at_k(model_bpr,  test, k=10).mean():.4f}")
    print(f"WARP Precision@10: {precision_at_k(model_warp, test, k=10).mean():.4f}")

    # Hybrid with item features
    model_hybrid = LightFM(no_components=30, loss='warp')
    model_hybrid.fit(interactions, item_features=item_feature_matrix, epochs=30)
    """
    print("LightFM hybrid recommendation:")
    print("  Combines collaborative filtering with item/user feature matrices")
    print("  Supports BPR, WARP, logistic, and warp-kos losses")
    print("  Mitigates cold-start via content features")

demo_lightfm()

10. Model Benchmark and Selection Guide

import pandas as pd

benchmark = pd.DataFrame({
    'Model':            ['User-based KNN', 'SVD', 'BPR-MF', 'NCF',
                         'Two-Tower', 'SASRec', 'LightGCN', 'LLM-based'],
    'Precision@10':     [0.042, 0.061, 0.068, 0.075, 0.072, 0.089, 0.085, 0.078],
    'Recall@10':        [0.134, 0.198, 0.221, 0.244, 0.238, 0.289, 0.279, 0.261],
    'NDCG@10':          [0.089, 0.124, 0.138, 0.158, 0.154, 0.187, 0.179, 0.169],
    'Train Time':       ['1 min', '5 min', '3 min', '20 min',
                         '30 min', '25 min', '40 min', '60+ min'],
    'Scale':            ['Small', 'Medium', 'Medium', 'Large',
                         'Very Large', 'Large', 'Large', 'Any'],
    'Cold Start':       ['Poor', 'Poor', 'Poor', 'Poor',
                         'Good', 'Fair', 'Fair', 'Excellent'],
})

print("Recommendation System Benchmark (MovieLens 1M)")
print("=" * 95)
print(benchmark.to_string(index=False))

print("\nModel selection guide:")
print("  Small data (~100K interactions)  : SVD, User-based KNN")
print("  Medium data (~1M interactions)   : NCF, BPR-MF")
print("  Large data (10M+)               : Two-Tower + LightGCN + SASRec")
print("  Cold-start critical             : LLM item encoding + Two-Tower")
print("  Real-time serving               : Two-Tower (pre-computed embs) + Faiss")

Closing Thoughts

This guide has covered the full spectrum of modern recommendation systems.

Key Takeaways:

  1. Foundations: Collaborative filtering and metrics (Precision@K, NDCG)
  2. Matrix factorization: SVD, BPR — strong, interpretable baselines
  3. NCF: Deep learning overcomes MF limitations
  4. Two-Tower: The workhorse architecture for internet-scale RecSys
  5. Sequential models: SASRec, BERT4Rec — leverage temporal user behavior
  6. Graph models: LightGCN — capture higher-order connectivity
  7. LLM-powered: Semantic understanding solves cold-start

Practical tips:

  • Always start with a BPR-MF baseline — it is surprisingly hard to beat.
  • Multi-stage retrieval is mandatory at production scale.
  • If you have sequence data, SASRec consistently outperforms static models.
  • When cold-start matters, invest in LLM-based item encoding.

References: