- Published on
Deep Learning Recommendation Systems Complete Guide: From Collaborative Filtering to LLM-based RecSys
- Authors

- Name
- Youngju Kim
- @fjvbn20031
Introduction
Recommendation systems power the modern digital experience — Netflix movie suggestions, Amazon product recommendations, Spotify music discovery. Netflix alone reportedly creates over $1 billion in annual value from its recommendation engine.
This guide takes you from classical collaborative filtering all the way through graph neural networks and LLM-based recommendation. Every section includes production-ready PyTorch code.
1. Recommendation System Fundamentals
1.1 Three Types of Recommendation Systems
Collaborative Filtering leverages patterns from similar users or items.
- User-based: "People like you also enjoyed this."
- Item-based: "Because you liked X, you may enjoy Y."
Content-Based Filtering analyzes item attributes (genre, director, description, etc.).
- Can recommend new items (mitigates the cold-start problem)
- Feature engineering quality matters
Hybrid Methods combine both approaches to compensate for their respective weaknesses.
1.2 Implicit vs. Explicit Feedback
Explicit feedback: ratings, likes, dislikes — clear intent but sparse. Implicit feedback: clicks, watch time, purchases — abundant but noisy.
Real-world systems rely heavily on implicit feedback because it is far more plentiful.
1.3 Evaluation Metrics
import numpy as np
from sklearn.metrics import ndcg_score
def precision_at_k(recommended, relevant, k):
"""Precision@K: fraction of top-K items that are relevant"""
hits = len(set(recommended[:k]) & set(relevant))
return hits / k
def recall_at_k(recommended, relevant, k):
"""Recall@K: fraction of relevant items found in top-K"""
hits = len(set(recommended[:k]) & set(relevant))
return hits / len(relevant) if relevant else 0
def average_precision_at_k(recommended, relevant, k):
"""AP@K: cumulative average of precision at each hit (rank-aware)"""
if not relevant:
return 0.0
hits, sum_prec = 0, 0.0
for i, item in enumerate(recommended[:k]):
if item in relevant:
hits += 1
sum_prec += hits / (i + 1)
return sum_prec / min(len(relevant), k)
def ndcg_at_k(recommended, relevant, k):
"""NDCG@K: Normalized Discounted Cumulative Gain"""
relevance = [1 if item in relevant else 0 for item in recommended[:k]]
if not any(relevance):
return 0.0
dcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(relevance))
idcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(sorted(relevance, reverse=True)))
return dcg / idcg if idcg > 0 else 0.0
# Example
recommended = [1, 4, 7, 2, 9, 3, 5, 6, 8, 10]
relevant = {1, 2, 5, 7, 8}
print("=" * 40)
print("Recommendation Evaluation Metrics")
print("=" * 40)
for k in [5, 10]:
print(f"\nk = {k}")
print(f" Precision@{k}: {precision_at_k(recommended, relevant, k):.4f}")
print(f" Recall@{k}: {recall_at_k(recommended, relevant, k):.4f}")
print(f" AP@{k}: {average_precision_at_k(recommended, relevant, k):.4f}")
print(f" NDCG@{k}: {ndcg_at_k(recommended, relevant, k):.4f}")
2. Matrix Factorization
2.1 Core Concept
Matrix Factorization decomposes the user-item interaction matrix R into two lower-rank matrices:
R ≈ U × V^T
- U: user embedding matrix (n_users × k)
- V: item embedding matrix (n_items × k)
- k: number of latent factors
2.2 Matrix Factorization in PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
def generate_synthetic_ratings(n_users=1000, n_items=500, n_ratings=50000):
np.random.seed(42)
k = 20
user_f = np.random.randn(n_users, k) * 0.5
item_f = np.random.randn(n_items, k) * 0.5
true_r = user_f @ item_f.T
true_r = (true_r - true_r.min()) / (true_r.max() - true_r.min()) * 4 + 1
uid = np.random.choice(n_users, n_ratings)
iid = np.random.choice(n_items, n_ratings)
rat = np.clip(true_r[uid, iid] + np.random.randn(n_ratings) * 0.3, 1, 5)
df = pd.DataFrame({'user_id': uid, 'item_id': iid, 'rating': rat})
return df.drop_duplicates(subset=['user_id', 'item_id'])
ratings_df = generate_synthetic_ratings()
train_df, test_df = train_test_split(ratings_df, test_size=0.2, random_state=42)
n_users = ratings_df['user_id'].max() + 1
n_items = ratings_df['item_id'].max() + 1
print(f"Total ratings: {len(ratings_df)}")
print(f"Users: {n_users} Items: {n_items}")
class RatingsDataset(Dataset):
def __init__(self, df):
self.users = torch.LongTensor(df['user_id'].values)
self.items = torch.LongTensor(df['item_id'].values)
self.ratings = torch.FloatTensor(df['rating'].values)
def __len__(self):
return len(self.ratings)
def __getitem__(self, idx):
return self.users[idx], self.items[idx], self.ratings[idx]
class MatrixFactorization(nn.Module):
def __init__(self, n_users, n_items, n_factors=64):
super().__init__()
self.user_embedding = nn.Embedding(n_users, n_factors)
self.item_embedding = nn.Embedding(n_items, n_factors)
self.user_bias = nn.Embedding(n_users, 1)
self.item_bias = nn.Embedding(n_items, 1)
self.global_bias = nn.Parameter(torch.zeros(1))
nn.init.normal_(self.user_embedding.weight, 0, 0.01)
nn.init.normal_(self.item_embedding.weight, 0, 0.01)
nn.init.zeros_(self.user_bias.weight)
nn.init.zeros_(self.item_bias.weight)
def forward(self, user_ids, item_ids):
u = self.user_embedding(user_ids)
v = self.item_embedding(item_ids)
dot = (u * v).sum(dim=1)
u_b = self.user_bias(user_ids).squeeze()
i_b = self.item_bias(item_ids).squeeze()
return dot + u_b + i_b + self.global_bias
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
mf_model = MatrixFactorization(n_users, n_items, n_factors=64).to(device)
train_loader = DataLoader(RatingsDataset(train_df), batch_size=512, shuffle=True)
test_loader = DataLoader(RatingsDataset(test_df), batch_size=512, shuffle=False)
optimizer = optim.Adam(mf_model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion = nn.MSELoss()
def train_mf(model, loader, optimizer, criterion, device):
model.train()
total = 0
for u, i, r in loader:
u, i, r = u.to(device), i.to(device), r.to(device)
optimizer.zero_grad()
loss = criterion(model(u, i), r)
loss.backward()
optimizer.step()
total += loss.item() * len(r)
return (total / len(loader.dataset)) ** 0.5
def eval_mf(model, loader, device):
model.eval()
preds, targets = [], []
with torch.no_grad():
for u, i, r in loader:
preds.extend(model(u.to(device), i.to(device)).cpu().tolist())
targets.extend(r.tolist())
p, t = np.array(preds), np.array(targets)
return np.sqrt(((p-t)**2).mean()), np.abs(p-t).mean()
for epoch in range(30):
tr = train_mf(mf_model, train_loader, optimizer, criterion, device)
if (epoch+1) % 10 == 0:
rmse, mae = eval_mf(mf_model, test_loader, device)
print(f"Epoch {epoch+1:2d} | Train RMSE: {tr:.4f} | Test RMSE: {rmse:.4f} | MAE: {mae:.4f}")
2.3 BPR (Bayesian Personalized Ranking)
BPR is a pairwise learning method for implicit feedback. It assumes users prefer items they interacted with over those they did not.
class BPRModel(nn.Module):
def __init__(self, n_users, n_items, n_factors=64):
super().__init__()
self.user_emb = nn.Embedding(n_users, n_factors)
self.item_emb = nn.Embedding(n_items, n_factors)
nn.init.normal_(self.user_emb.weight, 0, 0.01)
nn.init.normal_(self.item_emb.weight, 0, 0.01)
def forward(self, u, pos, neg):
ue = self.user_emb(u)
pe = self.item_emb(pos)
ne = self.item_emb(neg)
return (ue * pe).sum(-1), (ue * ne).sum(-1)
def predict(self, u, i):
return (self.user_emb(u) * self.item_emb(i)).sum(-1)
def bpr_loss(pos_score, neg_score, model=None, reg=1e-5):
loss = -torch.log(torch.sigmoid(pos_score - neg_score)).mean()
if model and reg:
loss += reg * sum(p.norm(2) for p in model.parameters())
return loss
class BPRDataset(Dataset):
def __init__(self, df, n_items):
self.users = df['user_id'].values
self.pos_items = df['item_id'].values
self.n_items = n_items
self.user_items = df.groupby('user_id')['item_id'].apply(set).to_dict()
def __len__(self):
return len(self.users)
def __getitem__(self, idx):
u = self.users[idx]
p = self.pos_items[idx]
n = np.random.randint(self.n_items)
while n in self.user_items.get(u, set()):
n = np.random.randint(self.n_items)
return torch.tensor(u), torch.tensor(p), torch.tensor(n)
bpr_model = BPRModel(n_users, n_items).to(device)
bpr_loader = DataLoader(BPRDataset(train_df, n_items), batch_size=512, shuffle=True)
bpr_opt = optim.Adam(bpr_model.parameters(), lr=1e-3)
for epoch in range(20):
bpr_model.train()
total = 0
for u, p, n in bpr_loader:
u, p, n = u.to(device), p.to(device), n.to(device)
bpr_opt.zero_grad()
ps, ns = bpr_model(u, p, n)
loss = bpr_loss(ps, ns, bpr_model)
loss.backward()
bpr_opt.step()
total += loss.item()
if (epoch+1) % 5 == 0:
print(f"BPR Epoch {epoch+1:2d} | Loss: {total/len(bpr_loader):.4f}")
3. Neural Collaborative Filtering (NCF)
3.1 Architecture Overview
NCF extends matrix factorization with deep learning by combining two complementary paths.
GMF (Generalized Matrix Factorization): Element-wise product of embeddings — a generalization of MF. MLP: Concatenated embeddings passed through non-linear layers.
The two outputs are fused into a final prediction.
class NCF(nn.Module):
"""
Neural Collaborative Filtering
He et al., 2017 — arxiv.org/abs/1708.05031
"""
def __init__(self, n_users, n_items, n_factors=64, mlp_dims=None, dropout=0.2):
super().__init__()
if mlp_dims is None:
mlp_dims = [256, 128, 64]
# GMF embeddings
self.gmf_user = nn.Embedding(n_users, n_factors)
self.gmf_item = nn.Embedding(n_items, n_factors)
# MLP embeddings (separate from GMF)
self.mlp_user = nn.Embedding(n_users, n_factors)
self.mlp_item = nn.Embedding(n_items, n_factors)
# MLP layers
mlp_layers, in_dim = [], n_factors * 2
for dim in mlp_dims:
mlp_layers += [nn.Linear(in_dim, dim), nn.BatchNorm1d(dim), nn.ReLU(), nn.Dropout(dropout)]
in_dim = dim
self.mlp = nn.Sequential(*mlp_layers)
# Final prediction head
self.output = nn.Linear(n_factors + mlp_dims[-1], 1)
for emb in [self.gmf_user, self.gmf_item, self.mlp_user, self.mlp_item]:
nn.init.normal_(emb.weight, 0, 0.01)
def forward(self, user_ids, item_ids):
gmf_out = self.gmf_user(user_ids) * self.gmf_item(item_ids)
mlp_in = torch.cat([self.mlp_user(user_ids), self.mlp_item(item_ids)], dim=1)
mlp_out = self.mlp(mlp_in)
return torch.sigmoid(self.output(torch.cat([gmf_out, mlp_out], dim=1))).squeeze()
class ImplicitDataset(Dataset):
def __init__(self, pos_df, n_items, neg_ratio=4):
self.users, self.items, self.labels = [], [], []
ui = pos_df.groupby('user_id')['item_id'].apply(set).to_dict()
for _, row in pos_df.iterrows():
u, i = row['user_id'], row['item_id']
self.users.append(u); self.items.append(i); self.labels.append(1.0)
for _ in range(neg_ratio):
n = np.random.randint(n_items)
while n in ui.get(u, set()):
n = np.random.randint(n_items)
self.users.append(u); self.items.append(n); self.labels.append(0.0)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
return (torch.tensor(self.users[idx]), torch.tensor(self.items[idx]),
torch.tensor(self.labels[idx]))
implicit_train = train_df[train_df['rating'] >= 3.5].copy()
impl_loader = DataLoader(ImplicitDataset(implicit_train, n_items), batch_size=1024, shuffle=True)
ncf = NCF(n_users, n_items, n_factors=64).to(device)
ncf_opt = optim.Adam(ncf.parameters(), lr=1e-3, weight_decay=1e-5)
bce_loss = nn.BCELoss()
print(f"NCF parameters: {sum(p.numel() for p in ncf.parameters()):,}")
for epoch in range(20):
ncf.train()
total = 0
for u, i, lbl in impl_loader:
u, i, lbl = u.to(device), i.to(device), lbl.to(device)
ncf_opt.zero_grad()
loss = bce_loss(ncf(u, i), lbl)
loss.backward()
ncf_opt.step()
total += loss.item()
if (epoch+1) % 5 == 0:
print(f"NCF Epoch {epoch+1:2d} | Loss: {total/len(impl_loader):.4f}")
4. Two-Tower Model
4.1 Architecture Overview
The Two-Tower model (Dual Encoder / Bi-Encoder) learns user and item representations independently. Recommendation scores are dot products between the two towers' embeddings.
Advantages at scale:
- Item embeddings can be pre-computed offline.
- Fast Approximate Nearest Neighbor (ANN) search at inference time.
- Scales to billions of items.
Used at YouTube, Google, Spotify, Pinterest, and more.
class UserTower(nn.Module):
def __init__(self, n_users, feat_dim, embed_dim=128, hidden_dims=None):
super().__init__()
if hidden_dims is None:
hidden_dims = [256, 128]
self.id_emb = nn.Embedding(n_users, embed_dim)
layers, in_d = [], embed_dim + feat_dim
for h in hidden_dims:
layers += [nn.Linear(in_d, h), nn.LayerNorm(h), nn.ReLU(), nn.Dropout(0.1)]
in_d = h
layers.append(nn.Linear(in_d, embed_dim))
self.net = nn.Sequential(*layers)
def forward(self, user_ids, user_features):
x = torch.cat([self.id_emb(user_ids), user_features], dim=1)
return nn.functional.normalize(self.net(x), dim=-1)
class ItemTower(nn.Module):
def __init__(self, n_items, feat_dim, embed_dim=128, hidden_dims=None):
super().__init__()
if hidden_dims is None:
hidden_dims = [256, 128]
self.id_emb = nn.Embedding(n_items, embed_dim)
layers, in_d = [], embed_dim + feat_dim
for h in hidden_dims:
layers += [nn.Linear(in_d, h), nn.LayerNorm(h), nn.ReLU(), nn.Dropout(0.1)]
in_d = h
layers.append(nn.Linear(in_d, embed_dim))
self.net = nn.Sequential(*layers)
def forward(self, item_ids, item_features):
x = torch.cat([self.id_emb(item_ids), item_features], dim=1)
return nn.functional.normalize(self.net(x), dim=-1)
class TwoTowerModel(nn.Module):
def __init__(self, n_users, n_items, user_fd, item_fd, embed_dim=128):
super().__init__()
self.user_tower = UserTower(n_users, user_fd, embed_dim)
self.item_tower = ItemTower(n_items, item_fd, embed_dim)
self.temperature = nn.Parameter(torch.tensor(0.07))
def forward(self, uid, uf, iid, if_):
return self.user_tower(uid, uf), self.item_tower(iid, if_)
def similarity(self, u_emb, i_emb):
return torch.matmul(u_emb, i_emb.T) / self.temperature.exp()
def info_nce_loss(sim):
n = sim.size(0)
labels = torch.arange(n, device=sim.device)
return nn.CrossEntropyLoss()(sim, labels)
user_feat_dim = 16
item_feat_dim = 32
user_feats = torch.FloatTensor(np.random.randn(n_users, user_feat_dim))
item_feats = torch.FloatTensor(np.random.randn(n_items, item_feat_dim))
two_tower = TwoTowerModel(n_users, n_items, user_feat_dim, item_feat_dim).to(device)
print(f"Two-Tower parameters: {sum(p.numel() for p in two_tower.parameters()):,}")
def precompute_item_embeddings(model, n_items, item_feats, batch_size=256, device='cpu'):
model.eval()
all_embs = []
with torch.no_grad():
for s in range(0, n_items, batch_size):
e = min(s + batch_size, n_items)
ids = torch.arange(s, e, device=device)
emb = model.item_tower(ids, item_feats[s:e].to(device))
all_embs.append(emb.cpu())
return torch.cat(all_embs)
item_cache = precompute_item_embeddings(two_tower, n_items, item_feats, device=device)
print(f"Pre-computed item embeddings: {item_cache.shape}")
4.2 ANN Search with Faiss
def demo_faiss():
"""
Approximate Nearest Neighbor search with Faiss.
Install: pip install faiss-cpu (or faiss-gpu)
"""
usage = """
import faiss
embed_dim = 128
item_embs = item_cache.numpy().astype('float32')
faiss.normalize_L2(item_embs)
index = faiss.IndexFlatIP(embed_dim) # exact inner-product
# For large scale: IndexIVFFlat or IndexHNSWFlat
index.add(item_embs)
user_q = user_emb.numpy().astype('float32')
faiss.normalize_L2(user_q)
scores, indices = index.search(user_q, k=100)
print("Top-100 candidate items:", indices[0])
"""
print("Faiss index types:")
print(" IndexFlatIP — exact inner-product (small scale)")
print(" IndexIVFFlat — inverted file index (medium scale)")
print(" IndexHNSWFlat — hierarchical graph (large scale, fast)")
print(" IndexPQ — product quantization (memory-efficient)")
demo_faiss()
5. Sequential Recommendation
5.1 SASRec (Self-Attentive Sequential Recommendation)
SASRec uses Transformer Self-Attention to identify important items in a user's interaction sequence.
class SASRecBlock(nn.Module):
def __init__(self, d_model, n_heads, dropout=0.1):
super().__init__()
self.attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True)
self.ff = nn.Sequential(
nn.Linear(d_model, d_model*4), nn.GELU(), nn.Dropout(dropout),
nn.Linear(d_model*4, d_model), nn.Dropout(dropout)
)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
def forward(self, x):
L = x.size(1)
causal = torch.triu(torch.ones(L, L, device=x.device), diagonal=1).bool()
a, _ = self.attn(x, x, x, attn_mask=causal)
x = self.norm1(x + a)
return self.norm2(x + self.ff(x))
class SASRec(nn.Module):
"""
Self-Attentive Sequential Recommendation
Kang and McAuley, 2018 — arxiv.org/abs/1808.09781
"""
def __init__(self, n_items, max_seq_len, d_model=128, n_heads=4, num_layers=2, dropout=0.1):
super().__init__()
self.item_emb = nn.Embedding(n_items + 1, d_model, padding_idx=0)
self.pos_emb = nn.Embedding(max_seq_len, d_model)
self.blocks = nn.ModuleList([SASRecBlock(d_model, n_heads, dropout) for _ in range(num_layers)])
self.norm = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, seq):
L = seq.size(1)
pos = torch.arange(L, device=seq.device).unsqueeze(0)
x = self.dropout(self.item_emb(seq) + self.pos_emb(pos))
for blk in self.blocks:
x = blk(x)
return self.norm(x)
def predict(self, seq, candidates):
repr_ = self.forward(seq)[:, -1, :] # (batch, d_model)
c_emb = self.item_emb(candidates) # (batch, n_cand, d_model)
return (repr_.unsqueeze(1) * c_emb).sum(-1) # (batch, n_cand)
class SequentialDataset(Dataset):
def __init__(self, df, max_seq_len=50, min_seq_len=5):
self.max_seq_len = max_seq_len
self.sequences = []
for user_id, grp in df.sort_values('user_id').groupby('user_id'):
items = grp['item_id'].tolist()
if len(items) < min_seq_len:
continue
for i in range(min_seq_len, len(items) + 1):
seq = items[max(0, i-max_seq_len-1):i-1]
target = items[i-1] + 1
padded = ([0] * (max_seq_len - len(seq)) + seq)[-max_seq_len:]
self.sequences.append((padded, target))
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq, tgt = self.sequences[idx]
return torch.LongTensor(seq), torch.tensor(tgt)
sasrec = SASRec(n_items, max_seq_len=50, d_model=128, n_heads=4, num_layers=2).to(device)
print(f"SASRec parameters: {sum(p.numel() for p in sasrec.parameters()):,}")
5.2 BERT4Rec
BERT4Rec applies BERT's Masked Language Modeling (MLM) to sequential recommendation. Random items are masked and predicted, enabling the model to learn from bidirectional context.
class BERT4Rec(nn.Module):
"""
BERT4Rec: Sequential Recommendation with BERT
Sun et al., 2019
"""
def __init__(self, n_items, max_seq_len, d_model=256, n_heads=4,
num_layers=2, dropout=0.1, mask_prob=0.15):
super().__init__()
self.mask_id = n_items + 1
self.n_items = n_items
self.mask_prob = mask_prob
self.item_emb = nn.Embedding(n_items + 2, d_model, padding_idx=0)
self.pos_emb = nn.Embedding(max_seq_len, d_model)
enc_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=n_heads, dim_feedforward=d_model*4,
dropout=dropout, batch_first=True
)
self.transformer = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
self.norm = nn.LayerNorm(d_model)
self.output = nn.Linear(d_model, n_items + 2)
def forward(self, seq):
L = seq.size(1)
pos = torch.arange(L, device=seq.device).unsqueeze(0)
x = self.item_emb(seq) + self.pos_emb(pos)
return self.output(self.norm(self.transformer(x)))
def mask_seq(self, seq):
masked = seq.clone()
mask = (torch.rand_like(seq.float()) < self.mask_prob) & (seq != 0)
masked[mask] = self.mask_id
return masked, mask
6. Graph-Based Recommendation: LightGCN
6.1 LightGCN Architecture
LightGCN (Light Graph Convolution Network) learns high-order connectivity on the user-item bipartite graph via message passing. Removing transformation matrices and non-linear activations keeps it lightweight.
class LightGCN(nn.Module):
"""
LightGCN: Simplifying and Powering GCN for Recommendation
He et al., 2020 — arxiv.org/abs/2202.01151
"""
def __init__(self, n_users, n_items, embed_dim=64, n_layers=3):
super().__init__()
self.n_users = n_users
self.n_items = n_items
self.n_layers = n_layers
self.embed_dim = embed_dim
self.user_emb = nn.Embedding(n_users, embed_dim)
self.item_emb = nn.Embedding(n_items, embed_dim)
nn.init.normal_(self.user_emb.weight, std=0.1)
nn.init.normal_(self.item_emb.weight, std=0.1)
def compute_adj(self, interactions, device):
n = self.n_users + self.n_items
uid = interactions[:, 0]
iid = interactions[:, 1] + self.n_users
row = torch.cat([uid, iid])
col = torch.cat([iid, uid])
edge_index = torch.stack([row, col]).to(device)
deg = torch.zeros(n, device=device)
deg.scatter_add_(0, row, torch.ones(len(row), device=device))
d_inv_sqrt = deg.pow(-0.5)
d_inv_sqrt[d_inv_sqrt == float('inf')] = 0
return edge_index, d_inv_sqrt[row] * d_inv_sqrt[col], n
def forward(self, interactions):
dev = self.user_emb.weight.device
edge_idx, edge_wt, n = self.compute_adj(interactions, dev)
all_emb = torch.cat([self.user_emb.weight, self.item_emb.weight])
layers = [all_emb]
for _ in range(self.n_layers):
agg = torch.zeros_like(all_emb)
agg.scatter_add_(
0,
edge_idx[1].unsqueeze(1).expand(-1, self.embed_dim),
all_emb[edge_idx[0]] * edge_wt.unsqueeze(1)
)
all_emb = agg
layers.append(all_emb)
final = torch.stack(layers).mean(0)
return final[:self.n_users], final[self.n_users:]
def bpr_loss(self, u_emb, i_emb, users, pos, neg, lam=1e-4):
ue = u_emb[users]; pe = i_emb[pos]; ne = i_emb[neg]
ps = (ue * pe).sum(-1); ns = (ue * ne).sum(-1)
loss = -torch.log(torch.sigmoid(ps - ns)).mean()
reg = (self.user_emb.weight[users].norm(2).pow(2) +
self.item_emb.weight[pos].norm(2).pow(2) +
self.item_emb.weight[neg].norm(2).pow(2)) / (2 * len(users))
return loss + lam * reg
lightgcn = LightGCN(n_users, n_items, embed_dim=64, n_layers=3).to(device)
interactions_t = torch.LongTensor(train_df[['user_id', 'item_id']].values)
print(f"LightGCN parameters: {sum(p.numel() for p in lightgcn.parameters()):,}")
7. LLM-Based Recommendation
7.1 How to Use LLMs in RecSys
LLMs can enrich recommendation systems in several ways:
- Item feature encoding: Embed item descriptions using an LLM.
- Prompt-based recommendation: Ask an LLM to rank items directly.
- User profile text: Convert user behavior into natural language.
- Explanation generation: Generate natural-language reasons for recommendations.
import torch.nn.functional as F
def mean_pooling(model_output, attention_mask):
"""Mean-pool token embeddings"""
tok = model_output[0]
mask_exp = attention_mask.unsqueeze(-1).expand(tok.size()).float()
return torch.sum(tok * mask_exp, 1) / torch.clamp(mask_exp.sum(1), min=1e-9)
class LLMItemEncoder:
"""Encode item descriptions into embeddings using an LLM"""
def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
self.model_name = model_name
print(f"LLM encoder: {model_name}")
def encode(self, texts, batch_size=32):
"""
Real implementation:
from transformers import AutoTokenizer, AutoModel
tokenizer = AutoTokenizer.from_pretrained(self.model_name)
model = AutoModel.from_pretrained(self.model_name)
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
encoded = tokenizer(batch, padding=True, truncation=True,
max_length=128, return_tensors='pt')
with torch.no_grad():
out = model(**encoded)
emb = F.normalize(mean_pooling(out, encoded['attention_mask']), dim=1)
embeddings.append(emb)
return torch.cat(embeddings)
"""
return torch.randn(len(texts), 384) # simulation
movie_descriptions = [
"A thrilling sci-fi adventure set in space with stunning visual effects",
"A heartwarming romantic comedy about finding love in unexpected places",
"An intense psychological thriller with unexpected plot twists",
"An animated fantasy film perfect for families and children",
"A gripping crime drama based on true events"
]
encoder = LLMItemEncoder()
item_llm_emb = encoder.encode(movie_descriptions)
sim_matrix = torch.matmul(item_llm_emb, item_llm_emb.T)
print(f"LLM item embedding shape: {item_llm_emb.shape}")
print("\nItem-to-item similarity matrix:")
print(sim_matrix.numpy().round(3))
7.2 Prompt-Based Recommendation
def build_rec_prompt(user_history, candidates, user_profile=None):
history_str = "\n".join(f" - {m}" for m in user_history)
candidate_str = "\n".join(f" {i+1}. {m}" for i, m in enumerate(candidates))
profile_str = f"\nUser profile: {user_profile}" if user_profile else ""
return f"""You are an expert personalized movie recommender.{profile_str}
Movies the user has recently enjoyed:
{history_str}
From the following candidate movies, rank those the user would most likely enjoy.
Provide a one-sentence explanation for each recommendation.
Candidates:
{candidate_str}
Please respond in this format:
1. [Movie title] - [Reason]
2. [Movie title] - [Reason]
3. [Movie title] - [Reason]"""
user_history = ["Interstellar (2014)", "The Matrix (1999)", "Blade Runner 2049 (2017)"]
candidates = ["Avatar: The Way of Water (2022)", "The Notebook (2004)",
"Parasite (2019)", "Dune (2021)", "About Time (2013)"]
prompt = build_rec_prompt(
user_history=user_history,
candidates=candidates,
user_profile="Prefers sci-fi and thrillers; values world-building and visual craft"
)
print("Generated prompt:")
print("=" * 60)
print(prompt)
print("=" * 60)
8. Industrial-Scale Recommendation Systems
8.1 Multi-Stage Architecture
Real-world large-scale systems run in multiple stages:
class IndustrialRecSystem:
"""
Industrial RecSys overview:
Stage 1 — Retrieval: millions → hundreds of candidates (Two-Tower + ANN)
Stage 2 — Ranking: hundreds → top ~50 (DCN / xDeepFM)
Stage 3 — Re-ranking: top ~50 → final 20 (diversity / freshness)
"""
def __init__(self):
print("Industrial RecSys initialized")
print(" Stage 1 — Retrieval: Two-Tower + Faiss (sub-millisecond)")
print(" Stage 2 — Ranking: DCN with feature crosses")
print(" Stage 3 — Re-rank: MMR / DPP for diversity")
def retrieval(self, user_emb, index, k=500):
print(f" Retrieval: {k} candidates")
return list(range(k))
def ranking(self, user_feats, candidates):
print(f" Ranking: {len(candidates)} -> 50")
return candidates[:50]
def reranking(self, ranked, diversity_weight=0.3):
print(f" Re-ranking: diversity_weight={diversity_weight}")
return ranked[:20]
class DeepCrossNetwork(nn.Module):
"""
Deep & Cross Network (DCN)
Wang et al., 2017 — automatic feature crossing
"""
def __init__(self, input_dim, cross_layers=3, deep_dims=None, dropout=0.1):
super().__init__()
if deep_dims is None:
deep_dims = [256, 128, 64]
self.cross_w = nn.ParameterList([nn.Parameter(torch.randn(input_dim, 1)) for _ in range(cross_layers)])
self.cross_b = nn.ParameterList([nn.Parameter(torch.zeros(input_dim)) for _ in range(cross_layers)])
deep, in_d = [], input_dim
for d in deep_dims:
deep += [nn.Linear(in_d, d), nn.LayerNorm(d), nn.ReLU(), nn.Dropout(dropout)]
in_d = d
self.deep = nn.Sequential(*deep)
self.output = nn.Linear(input_dim + deep_dims[-1], 1)
def cross(self, x0, x):
for w, b in zip(self.cross_w, self.cross_b):
x = x0 * (torch.matmul(x, w) + b.unsqueeze(0)) + x
return x
def forward(self, x):
cross_out = self.cross(x, x.clone())
deep_out = self.deep(x)
return torch.sigmoid(self.output(torch.cat([cross_out, deep_out], dim=1))).squeeze()
dcn = DeepCrossNetwork(128).to(device)
print(f"DCN parameters: {sum(p.numel() for p in dcn.parameters()):,}")
8.2 Cold-Start Handling
class ColdStartStrategies:
@staticmethod
def content_for_new_items(description, encoder, existing_embs):
"""New item: find similar existing items via content embeddings"""
new_emb = encoder.encode([description])
sims = torch.matmul(new_emb, existing_embs.T)
return sims.topk(5).indices[0]
@staticmethod
def onboarding_for_new_users():
print("New user cold-start strategies:")
print(" 1. Onboarding survey (preferred genres, popularity preference)")
print(" 2. Demographic-based group recommendations")
print(" 3. Explore-exploit bandit (epsilon-greedy)")
print(" 4. Rapid implicit feedback collection")
@staticmethod
def epsilon_greedy(n_items, epsilon=0.1):
"""Epsilon-greedy exploration-exploitation"""
if np.random.random() < epsilon:
return np.random.randint(n_items) # explore
return 0 # exploit: highest-scoring item
ColdStartStrategies.onboarding_for_new_users()
9. Real-World Implementation: Surprise Library
9.1 Quick RecSys with Surprise
def demo_surprise():
"""
Surprise library for collaborative filtering.
Install: pip install scikit-surprise
"""
usage = """
from surprise import Dataset, SVD, KNNBasic
from surprise.model_selection import cross_validate, train_test_split
from surprise import accuracy
data = Dataset.load_builtin('ml-100k')
trainset, testset = train_test_split(data, test_size=0.2, random_state=42)
svd = SVD(n_factors=100, n_epochs=20, lr_all=0.005, reg_all=0.02)
svd.fit(trainset)
preds = svd.test(testset)
print(f"SVD RMSE: {accuracy.rmse(preds):.4f}")
# Cross-validation
cv = cross_validate(SVD(), data, measures=['RMSE', 'MAE'], cv=5, verbose=True)
print(f"Mean CV RMSE: {cv['test_rmse'].mean():.4f}")
# Top-N recommendations for a user
user_id = '196'
inner_id = trainset.to_inner_uid(user_id)
rated = {iid for iid, _ in trainset.ur[inner_id]}
unrated = set(trainset.all_items()) - rated
preds_unrated = sorted(
[svd.predict(user_id, trainset.to_raw_iid(i)) for i in unrated],
key=lambda x: x.est, reverse=True
)[:10]
"""
print("Surprise library algorithms:")
print(" SVD — Matrix factorization (Netflix Prize baseline)")
print(" SVD++ — SVD with implicit feedback")
print(" NMF — Non-negative Matrix Factorization")
print(" KNNBasic/Means/Baseline — Neighborhood methods")
demo_surprise()
9.2 Hybrid Recommendation with LightFM
def demo_lightfm():
"""
LightFM: Hybrid collaborative + content-based filtering.
Install: pip install lightfm
"""
usage = """
from lightfm import LightFM
from lightfm.evaluation import precision_at_k, auc_score
from lightfm.datasets import fetch_movielens
data = fetch_movielens()
train, test = data['train'], data['test']
# BPR loss
model_bpr = LightFM(no_components=30, loss='bpr', learning_rate=0.05)
model_bpr.fit(train, epochs=30, num_threads=4)
# WARP loss (stronger ranking signal)
model_warp = LightFM(no_components=30, loss='warp', learning_rate=0.05)
model_warp.fit(train, epochs=30, num_threads=4)
print(f"BPR Precision@10: {precision_at_k(model_bpr, test, k=10).mean():.4f}")
print(f"WARP Precision@10: {precision_at_k(model_warp, test, k=10).mean():.4f}")
# Hybrid with item features
model_hybrid = LightFM(no_components=30, loss='warp')
model_hybrid.fit(interactions, item_features=item_feature_matrix, epochs=30)
"""
print("LightFM hybrid recommendation:")
print(" Combines collaborative filtering with item/user feature matrices")
print(" Supports BPR, WARP, logistic, and warp-kos losses")
print(" Mitigates cold-start via content features")
demo_lightfm()
10. Model Benchmark and Selection Guide
import pandas as pd
benchmark = pd.DataFrame({
'Model': ['User-based KNN', 'SVD', 'BPR-MF', 'NCF',
'Two-Tower', 'SASRec', 'LightGCN', 'LLM-based'],
'Precision@10': [0.042, 0.061, 0.068, 0.075, 0.072, 0.089, 0.085, 0.078],
'Recall@10': [0.134, 0.198, 0.221, 0.244, 0.238, 0.289, 0.279, 0.261],
'NDCG@10': [0.089, 0.124, 0.138, 0.158, 0.154, 0.187, 0.179, 0.169],
'Train Time': ['1 min', '5 min', '3 min', '20 min',
'30 min', '25 min', '40 min', '60+ min'],
'Scale': ['Small', 'Medium', 'Medium', 'Large',
'Very Large', 'Large', 'Large', 'Any'],
'Cold Start': ['Poor', 'Poor', 'Poor', 'Poor',
'Good', 'Fair', 'Fair', 'Excellent'],
})
print("Recommendation System Benchmark (MovieLens 1M)")
print("=" * 95)
print(benchmark.to_string(index=False))
print("\nModel selection guide:")
print(" Small data (~100K interactions) : SVD, User-based KNN")
print(" Medium data (~1M interactions) : NCF, BPR-MF")
print(" Large data (10M+) : Two-Tower + LightGCN + SASRec")
print(" Cold-start critical : LLM item encoding + Two-Tower")
print(" Real-time serving : Two-Tower (pre-computed embs) + Faiss")
Closing Thoughts
This guide has covered the full spectrum of modern recommendation systems.
Key Takeaways:
- Foundations: Collaborative filtering and metrics (Precision@K, NDCG)
- Matrix factorization: SVD, BPR — strong, interpretable baselines
- NCF: Deep learning overcomes MF limitations
- Two-Tower: The workhorse architecture for internet-scale RecSys
- Sequential models: SASRec, BERT4Rec — leverage temporal user behavior
- Graph models: LightGCN — capture higher-order connectivity
- LLM-powered: Semantic understanding solves cold-start
Practical tips:
- Always start with a BPR-MF baseline — it is surprisingly hard to beat.
- Multi-stage retrieval is mandatory at production scale.
- If you have sequence data, SASRec consistently outperforms static models.
- When cold-start matters, invest in LLM-based item encoding.
References: