- Authors

- Name
- Youngju Kim
- @fjvbn20031
はじめに
推薦システムは現代のデジタル体験を支えています。Netflixの映画提案、Amazonの商品推薦、Spotifyの音楽発見。Netflixだけで推薦エンジンから年間10億ドル以上の価値が生まれると報告されています。
このガイドは古典的な協調フィルタリングからグラフニューラルネットワーク、LLMベースの推薦まで幅広くカバーします。各セクションにはプロダクション対応のPyTorchコードが含まれています。
1. 推薦システムの基礎
1.1 推薦システムの3つのタイプ
協調フィルタリングは類似したユーザーやアイテムのパターンを活用します。
- ユーザーベース: 「あなたに似た人はこれも楽しんでいます。」
- アイテムベース: 「Xが好きなら、Yも気に入るかもしれません。」
コンテンツベースフィルタリングはアイテムの属性(ジャンル、監督、説明など)を分析します。
- 新しいアイテムの推薦が可能(コールドスタート問題を軽減)
- 特徴量エンジニアリングの品質が重要
ハイブリッド手法は両方のアプローチを組み合わせて互いの弱点を補います。
1.2 暗黙的フィードバックと明示的フィードバック
明示的フィードバック: 評価、いいね、嫌いなど — 意図は明確だがスパース。 暗黙的フィードバック: クリック、視聴時間、購入など — 豊富だがノイジー。
実世界のシステムは、量がはるかに多い暗黙的フィードバックに大きく依存します。
1.3 評価指標
import numpy as np
from sklearn.metrics import ndcg_score
def precision_at_k(recommended, relevant, k):
"""Precision@K: 上位K件のアイテムのうち関連するものの割合"""
hits = len(set(recommended[:k]) & set(relevant))
return hits / k
def recall_at_k(recommended, relevant, k):
"""Recall@K: 上位K件に含まれる関連アイテムの割合"""
hits = len(set(recommended[:k]) & set(relevant))
return hits / len(relevant) if relevant else 0
def average_precision_at_k(recommended, relevant, k):
"""AP@K: 各ヒット位置でのPrecisionの累積平均(ランク考慮)"""
if not relevant:
return 0.0
hits, sum_prec = 0, 0.0
for i, item in enumerate(recommended[:k]):
if item in relevant:
hits += 1
sum_prec += hits / (i + 1)
return sum_prec / min(len(relevant), k)
def ndcg_at_k(recommended, relevant, k):
"""NDCG@K: 正規化割引累積ゲイン"""
relevance = [1 if item in relevant else 0 for item in recommended[:k]]
if not any(relevance):
return 0.0
dcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(relevance))
idcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(sorted(relevance, reverse=True)))
return dcg / idcg if idcg > 0 else 0.0
# 例
recommended = [1, 4, 7, 2, 9, 3, 5, 6, 8, 10]
relevant = {1, 2, 5, 7, 8}
print("=" * 40)
print("推薦システム評価指標")
print("=" * 40)
for k in [5, 10]:
print(f"\nk = {k}")
print(f" Precision@{k}: {precision_at_k(recommended, relevant, k):.4f}")
print(f" Recall@{k}: {recall_at_k(recommended, relevant, k):.4f}")
print(f" AP@{k}: {average_precision_at_k(recommended, relevant, k):.4f}")
print(f" NDCG@{k}: {ndcg_at_k(recommended, relevant, k):.4f}")
2. 行列分解
2.1 コアコンセプト
行列分解はユーザー-アイテムインタラクション行列Rを2つの低ランク行列に分解します:
R ≈ U × V^T
- U: ユーザー埋め込み行列(n_users × k)
- V: アイテム埋め込み行列(n_items × k)
- k: 潜在因子の数
2.2 PyTorchによる行列分解
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
def generate_synthetic_ratings(n_users=1000, n_items=500, n_ratings=50000):
np.random.seed(42)
k = 20
user_f = np.random.randn(n_users, k) * 0.5
item_f = np.random.randn(n_items, k) * 0.5
true_r = user_f @ item_f.T
true_r = (true_r - true_r.min()) / (true_r.max() - true_r.min()) * 4 + 1
uid = np.random.choice(n_users, n_ratings)
iid = np.random.choice(n_items, n_ratings)
rat = np.clip(true_r[uid, iid] + np.random.randn(n_ratings) * 0.3, 1, 5)
df = pd.DataFrame({'user_id': uid, 'item_id': iid, 'rating': rat})
return df.drop_duplicates(subset=['user_id', 'item_id'])
ratings_df = generate_synthetic_ratings()
train_df, test_df = train_test_split(ratings_df, test_size=0.2, random_state=42)
n_users = ratings_df['user_id'].max() + 1
n_items = ratings_df['item_id'].max() + 1
print(f"総評価数: {len(ratings_df)}")
print(f"ユーザー数: {n_users} アイテム数: {n_items}")
class RatingsDataset(Dataset):
def __init__(self, df):
self.users = torch.LongTensor(df['user_id'].values)
self.items = torch.LongTensor(df['item_id'].values)
self.ratings = torch.FloatTensor(df['rating'].values)
def __len__(self):
return len(self.ratings)
def __getitem__(self, idx):
return self.users[idx], self.items[idx], self.ratings[idx]
class MatrixFactorization(nn.Module):
def __init__(self, n_users, n_items, n_factors=64):
super().__init__()
self.user_embedding = nn.Embedding(n_users, n_factors)
self.item_embedding = nn.Embedding(n_items, n_factors)
self.user_bias = nn.Embedding(n_users, 1)
self.item_bias = nn.Embedding(n_items, 1)
self.global_bias = nn.Parameter(torch.zeros(1))
nn.init.normal_(self.user_embedding.weight, 0, 0.01)
nn.init.normal_(self.item_embedding.weight, 0, 0.01)
nn.init.zeros_(self.user_bias.weight)
nn.init.zeros_(self.item_bias.weight)
def forward(self, user_ids, item_ids):
u = self.user_embedding(user_ids)
v = self.item_embedding(item_ids)
dot = (u * v).sum(dim=1)
u_b = self.user_bias(user_ids).squeeze()
i_b = self.item_bias(item_ids).squeeze()
return dot + u_b + i_b + self.global_bias
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
mf_model = MatrixFactorization(n_users, n_items, n_factors=64).to(device)
train_loader = DataLoader(RatingsDataset(train_df), batch_size=512, shuffle=True)
test_loader = DataLoader(RatingsDataset(test_df), batch_size=512, shuffle=False)
optimizer = optim.Adam(mf_model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion = nn.MSELoss()
def train_mf(model, loader, optimizer, criterion, device):
model.train()
total = 0
for u, i, r in loader:
u, i, r = u.to(device), i.to(device), r.to(device)
optimizer.zero_grad()
loss = criterion(model(u, i), r)
loss.backward()
optimizer.step()
total += loss.item() * len(r)
return (total / len(loader.dataset)) ** 0.5
def eval_mf(model, loader, device):
model.eval()
preds, targets = [], []
with torch.no_grad():
for u, i, r in loader:
preds.extend(model(u.to(device), i.to(device)).cpu().tolist())
targets.extend(r.tolist())
p, t = np.array(preds), np.array(targets)
return np.sqrt(((p-t)**2).mean()), np.abs(p-t).mean()
for epoch in range(30):
tr = train_mf(mf_model, train_loader, optimizer, criterion, device)
if (epoch+1) % 10 == 0:
rmse, mae = eval_mf(mf_model, test_loader, device)
print(f"Epoch {epoch+1:2d} | Train RMSE: {tr:.4f} | Test RMSE: {rmse:.4f} | MAE: {mae:.4f}")
2.3 BPR(ベイジアン個人化ランキング)
BPRは暗黙的フィードバックに対するペアワイズ学習手法です。ユーザーはインタラクションのないアイテムよりもインタラクションしたアイテムを好むと仮定します。
class BPRModel(nn.Module):
def __init__(self, n_users, n_items, n_factors=64):
super().__init__()
self.user_emb = nn.Embedding(n_users, n_factors)
self.item_emb = nn.Embedding(n_items, n_factors)
nn.init.normal_(self.user_emb.weight, 0, 0.01)
nn.init.normal_(self.item_emb.weight, 0, 0.01)
def forward(self, u, pos, neg):
ue = self.user_emb(u)
pe = self.item_emb(pos)
ne = self.item_emb(neg)
return (ue * pe).sum(-1), (ue * ne).sum(-1)
def predict(self, u, i):
return (self.user_emb(u) * self.item_emb(i)).sum(-1)
def bpr_loss(pos_score, neg_score, model=None, reg=1e-5):
loss = -torch.log(torch.sigmoid(pos_score - neg_score)).mean()
if model and reg:
loss += reg * sum(p.norm(2) for p in model.parameters())
return loss
class BPRDataset(Dataset):
def __init__(self, df, n_items):
self.users = df['user_id'].values
self.pos_items = df['item_id'].values
self.n_items = n_items
self.user_items = df.groupby('user_id')['item_id'].apply(set).to_dict()
def __len__(self):
return len(self.users)
def __getitem__(self, idx):
u = self.users[idx]
p = self.pos_items[idx]
n = np.random.randint(self.n_items)
while n in self.user_items.get(u, set()):
n = np.random.randint(self.n_items)
return torch.tensor(u), torch.tensor(p), torch.tensor(n)
bpr_model = BPRModel(n_users, n_items).to(device)
bpr_loader = DataLoader(BPRDataset(train_df, n_items), batch_size=512, shuffle=True)
bpr_opt = optim.Adam(bpr_model.parameters(), lr=1e-3)
for epoch in range(20):
bpr_model.train()
total = 0
for u, p, n in bpr_loader:
u, p, n = u.to(device), p.to(device), n.to(device)
bpr_opt.zero_grad()
ps, ns = bpr_model(u, p, n)
loss = bpr_loss(ps, ns, bpr_model)
loss.backward()
bpr_opt.step()
total += loss.item()
if (epoch+1) % 5 == 0:
print(f"BPR Epoch {epoch+1:2d} | Loss: {total/len(bpr_loader):.4f}")
3. ニューラル協調フィルタリング(NCF)
3.1 アーキテクチャ概要
NCFは2つの相補的なパスを組み合わせて行列分解を深層学習で拡張します。
GMF(一般化行列分解): 埋め込みの要素積 — MFの一般化。 MLP: 非線形層を通過する連結埋め込み。
2つの出力を最終予測に融合します。
class NCF(nn.Module):
"""
Neural Collaborative Filtering
He et al., 2017 — arxiv.org/abs/1708.05031
"""
def __init__(self, n_users, n_items, n_factors=64, mlp_dims=None, dropout=0.2):
super().__init__()
if mlp_dims is None:
mlp_dims = [256, 128, 64]
# GMF埋め込み
self.gmf_user = nn.Embedding(n_users, n_factors)
self.gmf_item = nn.Embedding(n_items, n_factors)
# MLP埋め込み(GMFとは分離)
self.mlp_user = nn.Embedding(n_users, n_factors)
self.mlp_item = nn.Embedding(n_items, n_factors)
# MLP層
mlp_layers, in_dim = [], n_factors * 2
for dim in mlp_dims:
mlp_layers += [nn.Linear(in_dim, dim), nn.BatchNorm1d(dim), nn.ReLU(), nn.Dropout(dropout)]
in_dim = dim
self.mlp = nn.Sequential(*mlp_layers)
# 最終予測ヘッド
self.output = nn.Linear(n_factors + mlp_dims[-1], 1)
for emb in [self.gmf_user, self.gmf_item, self.mlp_user, self.mlp_item]:
nn.init.normal_(emb.weight, 0, 0.01)
def forward(self, user_ids, item_ids):
gmf_out = self.gmf_user(user_ids) * self.gmf_item(item_ids)
mlp_in = torch.cat([self.mlp_user(user_ids), self.mlp_item(item_ids)], dim=1)
mlp_out = self.mlp(mlp_in)
return torch.sigmoid(self.output(torch.cat([gmf_out, mlp_out], dim=1))).squeeze()
class ImplicitDataset(Dataset):
def __init__(self, pos_df, n_items, neg_ratio=4):
self.users, self.items, self.labels = [], [], []
ui = pos_df.groupby('user_id')['item_id'].apply(set).to_dict()
for _, row in pos_df.iterrows():
u, i = row['user_id'], row['item_id']
self.users.append(u); self.items.append(i); self.labels.append(1.0)
for _ in range(neg_ratio):
n = np.random.randint(n_items)
while n in ui.get(u, set()):
n = np.random.randint(n_items)
self.users.append(u); self.items.append(n); self.labels.append(0.0)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
return (torch.tensor(self.users[idx]), torch.tensor(self.items[idx]),
torch.tensor(self.labels[idx]))
implicit_train = train_df[train_df['rating'] >= 3.5].copy()
impl_loader = DataLoader(ImplicitDataset(implicit_train, n_items), batch_size=1024, shuffle=True)
ncf = NCF(n_users, n_items, n_factors=64).to(device)
ncf_opt = optim.Adam(ncf.parameters(), lr=1e-3, weight_decay=1e-5)
bce_loss = nn.BCELoss()
print(f"NCFパラメータ数: {sum(p.numel() for p in ncf.parameters()):,}")
for epoch in range(20):
ncf.train()
total = 0
for u, i, lbl in impl_loader:
u, i, lbl = u.to(device), i.to(device), lbl.to(device)
ncf_opt.zero_grad()
loss = bce_loss(ncf(u, i), lbl)
loss.backward()
ncf_opt.step()
total += loss.item()
if (epoch+1) % 5 == 0:
print(f"NCF Epoch {epoch+1:2d} | Loss: {total/len(impl_loader):.4f}")
4. Two-Towerモデル
4.1 アーキテクチャ概要
Two-Towerモデル(Dual Encoder / Bi-Encoder)はユーザーとアイテムの表現を独立して学習します。推薦スコアは2つのタワーの埋め込みのドット積です。
スケールの利点:
- アイテム埋め込みをオフラインで事前計算可能。
- 推論時の高速な近似最近傍(ANN)検索。
- 数十億のアイテムにスケール可能。
YouTube、Google、Spotify、Pinterestなどで使用されています。
class UserTower(nn.Module):
def __init__(self, n_users, feat_dim, embed_dim=128, hidden_dims=None):
super().__init__()
if hidden_dims is None:
hidden_dims = [256, 128]
self.id_emb = nn.Embedding(n_users, embed_dim)
layers, in_d = [], embed_dim + feat_dim
for h in hidden_dims:
layers += [nn.Linear(in_d, h), nn.LayerNorm(h), nn.ReLU(), nn.Dropout(0.1)]
in_d = h
layers.append(nn.Linear(in_d, embed_dim))
self.net = nn.Sequential(*layers)
def forward(self, user_ids, user_features):
x = torch.cat([self.id_emb(user_ids), user_features], dim=1)
return nn.functional.normalize(self.net(x), dim=-1)
class ItemTower(nn.Module):
def __init__(self, n_items, feat_dim, embed_dim=128, hidden_dims=None):
super().__init__()
if hidden_dims is None:
hidden_dims = [256, 128]
self.id_emb = nn.Embedding(n_items, embed_dim)
layers, in_d = [], embed_dim + feat_dim
for h in hidden_dims:
layers += [nn.Linear(in_d, h), nn.LayerNorm(h), nn.ReLU(), nn.Dropout(0.1)]
in_d = h
layers.append(nn.Linear(in_d, embed_dim))
self.net = nn.Sequential(*layers)
def forward(self, item_ids, item_features):
x = torch.cat([self.id_emb(item_ids), item_features], dim=1)
return nn.functional.normalize(self.net(x), dim=-1)
class TwoTowerModel(nn.Module):
def __init__(self, n_users, n_items, user_fd, item_fd, embed_dim=128):
super().__init__()
self.user_tower = UserTower(n_users, user_fd, embed_dim)
self.item_tower = ItemTower(n_items, item_fd, embed_dim)
self.temperature = nn.Parameter(torch.tensor(0.07))
def forward(self, uid, uf, iid, if_):
return self.user_tower(uid, uf), self.item_tower(iid, if_)
def similarity(self, u_emb, i_emb):
return torch.matmul(u_emb, i_emb.T) / self.temperature.exp()
def info_nce_loss(sim):
n = sim.size(0)
labels = torch.arange(n, device=sim.device)
return nn.CrossEntropyLoss()(sim, labels)
user_feat_dim = 16
item_feat_dim = 32
user_feats = torch.FloatTensor(np.random.randn(n_users, user_feat_dim))
item_feats = torch.FloatTensor(np.random.randn(n_items, item_feat_dim))
two_tower = TwoTowerModel(n_users, n_items, user_feat_dim, item_feat_dim).to(device)
print(f"Two-Towerパラメータ数: {sum(p.numel() for p in two_tower.parameters()):,}")
def precompute_item_embeddings(model, n_items, item_feats, batch_size=256, device='cpu'):
model.eval()
all_embs = []
with torch.no_grad():
for s in range(0, n_items, batch_size):
e = min(s + batch_size, n_items)
ids = torch.arange(s, e, device=device)
emb = model.item_tower(ids, item_feats[s:e].to(device))
all_embs.append(emb.cpu())
return torch.cat(all_embs)
item_cache = precompute_item_embeddings(two_tower, n_items, item_feats, device=device)
print(f"事前計算済みアイテム埋め込み: {item_cache.shape}")
4.2 FaissによるANN検索
def demo_faiss():
"""
FaissによるApproximate Nearest Neighbor検索。
インストール: pip install faiss-cpu (またはfaiss-gpu)
"""
usage = """
import faiss
embed_dim = 128
item_embs = item_cache.numpy().astype('float32')
faiss.normalize_L2(item_embs)
index = faiss.IndexFlatIP(embed_dim) # 正確な内積
# 大規模向け: IndexIVFFlatまたはIndexHNSWFlat
index.add(item_embs)
user_q = user_emb.numpy().astype('float32')
faiss.normalize_L2(user_q)
scores, indices = index.search(user_q, k=100)
print("上位100件候補アイテム:", indices[0])
"""
print("Faissインデックスタイプ:")
print(" IndexFlatIP — 正確な内積(小規模)")
print(" IndexIVFFlat — 転置ファイルインデックス(中規模)")
print(" IndexHNSWFlat — 階層グラフ(大規模、高速)")
print(" IndexPQ — 積量子化(メモリ効率)")
demo_faiss()
5. 逐次推薦
5.1 SASRec(自己注意逐次推薦)
SASRecはTransformerの自己注意を使ってユーザーのインタラクション系列の重要なアイテムを特定します。
class SASRecBlock(nn.Module):
def __init__(self, d_model, n_heads, dropout=0.1):
super().__init__()
self.attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True)
self.ff = nn.Sequential(
nn.Linear(d_model, d_model*4), nn.GELU(), nn.Dropout(dropout),
nn.Linear(d_model*4, d_model), nn.Dropout(dropout)
)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
def forward(self, x):
L = x.size(1)
causal = torch.triu(torch.ones(L, L, device=x.device), diagonal=1).bool()
a, _ = self.attn(x, x, x, attn_mask=causal)
x = self.norm1(x + a)
return self.norm2(x + self.ff(x))
class SASRec(nn.Module):
"""
Self-Attentive Sequential Recommendation
Kang and McAuley, 2018 — arxiv.org/abs/1808.09781
"""
def __init__(self, n_items, max_seq_len, d_model=128, n_heads=4, num_layers=2, dropout=0.1):
super().__init__()
self.item_emb = nn.Embedding(n_items + 1, d_model, padding_idx=0)
self.pos_emb = nn.Embedding(max_seq_len, d_model)
self.blocks = nn.ModuleList([SASRecBlock(d_model, n_heads, dropout) for _ in range(num_layers)])
self.norm = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, seq):
L = seq.size(1)
pos = torch.arange(L, device=seq.device).unsqueeze(0)
x = self.dropout(self.item_emb(seq) + self.pos_emb(pos))
for blk in self.blocks:
x = blk(x)
return self.norm(x)
def predict(self, seq, candidates):
repr_ = self.forward(seq)[:, -1, :] # (batch, d_model)
c_emb = self.item_emb(candidates) # (batch, n_cand, d_model)
return (repr_.unsqueeze(1) * c_emb).sum(-1) # (batch, n_cand)
sasrec = SASRec(n_items, max_seq_len=50, d_model=128, n_heads=4, num_layers=2).to(device)
print(f"SASRecパラメータ数: {sum(p.numel() for p in sasrec.parameters()):,}")
5.2 BERT4Rec
BERT4RecはBERTのMasked Language Modeling(MLM)を逐次推薦に適用します。ランダムなアイテムをマスクして予測することで、双方向コンテキストからモデルが学習できます。
class BERT4Rec(nn.Module):
"""
BERT4Rec: BERTによる逐次推薦
Sun et al., 2019
"""
def __init__(self, n_items, max_seq_len, d_model=256, n_heads=4,
num_layers=2, dropout=0.1, mask_prob=0.15):
super().__init__()
self.mask_id = n_items + 1
self.n_items = n_items
self.mask_prob = mask_prob
self.item_emb = nn.Embedding(n_items + 2, d_model, padding_idx=0)
self.pos_emb = nn.Embedding(max_seq_len, d_model)
enc_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=n_heads, dim_feedforward=d_model*4,
dropout=dropout, batch_first=True
)
self.transformer = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
self.norm = nn.LayerNorm(d_model)
self.output = nn.Linear(d_model, n_items + 2)
def forward(self, seq):
L = seq.size(1)
pos = torch.arange(L, device=seq.device).unsqueeze(0)
x = self.item_emb(seq) + self.pos_emb(pos)
return self.output(self.norm(self.transformer(x)))
def mask_seq(self, seq):
masked = seq.clone()
mask = (torch.rand_like(seq.float()) < self.mask_prob) & (seq != 0)
masked[mask] = self.mask_id
return masked, mask
6. グラフベース推薦: LightGCN
6.1 LightGCNアーキテクチャ
LightGCN(Light Graph Convolution Network)はメッセージパッシングによってユーザー-アイテム二部グラフの高次接続性を学習します。変換行列と非線形活性化を除去することで軽量になっています。
class LightGCN(nn.Module):
"""
LightGCN: GCNを推薦向けに簡略化・強化
He et al., 2020 — arxiv.org/abs/2202.01151
"""
def __init__(self, n_users, n_items, embed_dim=64, n_layers=3):
super().__init__()
self.n_users = n_users
self.n_items = n_items
self.n_layers = n_layers
self.embed_dim = embed_dim
self.user_emb = nn.Embedding(n_users, embed_dim)
self.item_emb = nn.Embedding(n_items, embed_dim)
nn.init.normal_(self.user_emb.weight, std=0.1)
nn.init.normal_(self.item_emb.weight, std=0.1)
def compute_adj(self, interactions, device):
n = self.n_users + self.n_items
uid = interactions[:, 0]
iid = interactions[:, 1] + self.n_users
row = torch.cat([uid, iid])
col = torch.cat([iid, uid])
edge_index = torch.stack([row, col]).to(device)
deg = torch.zeros(n, device=device)
deg.scatter_add_(0, row, torch.ones(len(row), device=device))
d_inv_sqrt = deg.pow(-0.5)
d_inv_sqrt[d_inv_sqrt == float('inf')] = 0
return edge_index, d_inv_sqrt[row] * d_inv_sqrt[col], n
def forward(self, interactions):
dev = self.user_emb.weight.device
edge_idx, edge_wt, n = self.compute_adj(interactions, dev)
all_emb = torch.cat([self.user_emb.weight, self.item_emb.weight])
layers = [all_emb]
for _ in range(self.n_layers):
agg = torch.zeros_like(all_emb)
agg.scatter_add_(
0,
edge_idx[1].unsqueeze(1).expand(-1, self.embed_dim),
all_emb[edge_idx[0]] * edge_wt.unsqueeze(1)
)
all_emb = agg
layers.append(all_emb)
final = torch.stack(layers).mean(0)
return final[:self.n_users], final[self.n_users:]
def bpr_loss(self, u_emb, i_emb, users, pos, neg, lam=1e-4):
ue = u_emb[users]; pe = i_emb[pos]; ne = i_emb[neg]
ps = (ue * pe).sum(-1); ns = (ue * ne).sum(-1)
loss = -torch.log(torch.sigmoid(ps - ns)).mean()
reg = (self.user_emb.weight[users].norm(2).pow(2) +
self.item_emb.weight[pos].norm(2).pow(2) +
self.item_emb.weight[neg].norm(2).pow(2)) / (2 * len(users))
return loss + lam * reg
lightgcn = LightGCN(n_users, n_items, embed_dim=64, n_layers=3).to(device)
interactions_t = torch.LongTensor(train_df[['user_id', 'item_id']].values)
print(f"LightGCNパラメータ数: {sum(p.numel() for p in lightgcn.parameters()):,}")
7. LLMベース推薦
7.1 推薦システムでのLLM活用方法
LLMは推薦システムを様々な方法で強化できます:
- アイテム特徴エンコーディング: LLMを使ってアイテム説明を埋め込む。
- プロンプトベース推薦: LLMにアイテムを直接ランク付けするよう依頼。
- ユーザープロファイルテキスト: ユーザーの行動を自然言語に変換。
- 説明生成: 推薦の自然言語による理由を生成。
import torch.nn.functional as F
def mean_pooling(model_output, attention_mask):
"""トークン埋め込みの平均プーリング"""
tok = model_output[0]
mask_exp = attention_mask.unsqueeze(-1).expand(tok.size()).float()
return torch.sum(tok * mask_exp, 1) / torch.clamp(mask_exp.sum(1), min=1e-9)
class LLMItemEncoder:
"""LLMを使ってアイテム説明を埋め込みにエンコード"""
def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
self.model_name = model_name
print(f"LLMエンコーダー: {model_name}")
def encode(self, texts, batch_size=32):
"""
実際の実装:
from transformers import AutoTokenizer, AutoModel
tokenizer = AutoTokenizer.from_pretrained(self.model_name)
model = AutoModel.from_pretrained(self.model_name)
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
encoded = tokenizer(batch, padding=True, truncation=True,
max_length=128, return_tensors='pt')
with torch.no_grad():
out = model(**encoded)
emb = F.normalize(mean_pooling(out, encoded['attention_mask']), dim=1)
embeddings.append(emb)
return torch.cat(embeddings)
"""
return torch.randn(len(texts), 384) # シミュレーション
movie_descriptions = [
"壮大な視覚効果を持つ宇宙が舞台のスリリングなSFアドベンチャー",
"予期せぬ場所で愛を見つける心温まるロマンティックコメディ",
"予想外の展開を持つ緊張感溢れる心理スリラー",
"家族と子供に最適なアニメーションファンタジー映画",
"実際の出来事に基づいた引き込まれる犯罪ドラマ"
]
encoder = LLMItemEncoder()
item_llm_emb = encoder.encode(movie_descriptions)
sim_matrix = torch.matmul(item_llm_emb, item_llm_emb.T)
print(f"LLMアイテム埋め込み形状: {item_llm_emb.shape}")
print("\nアイテム間類似度行列:")
print(sim_matrix.numpy().round(3))
7.2 プロンプトベース推薦
def build_rec_prompt(user_history, candidates, user_profile=None):
history_str = "\n".join(f" - {m}" for m in user_history)
candidate_str = "\n".join(f" {i+1}. {m}" for i, m in enumerate(candidates))
profile_str = f"\nユーザープロファイル: {user_profile}" if user_profile else ""
return f"""あなたは映画の個人化推薦の専門家です。{profile_str}
ユーザーが最近楽しんだ映画:
{history_str}
以下の候補映画の中から、ユーザーが最も楽しみそうなものをランク付けしてください。
各推薦に一文の説明を付けてください。
候補:
{candidate_str}
以下の形式で回答してください:
1. [映画タイトル] - [理由]
2. [映画タイトル] - [理由]
3. [映画タイトル] - [理由]"""
user_history = ["インターステラー (2014)", "マトリックス (1999)", "ブレードランナー 2049 (2017)"]
candidates = ["アバター: ウェイ・オブ・ウォーター (2022)", "ノートブック (2004)",
"パラサイト (2019)", "デューン (2021)", "アバウト・タイム (2013)"]
prompt = build_rec_prompt(
user_history=user_history,
candidates=candidates,
user_profile="SFとスリラーを好み、世界観構築と視覚的なクラフトを重視"
)
print("生成されたプロンプト:")
print("=" * 60)
print(prompt)
print("=" * 60)
8. 産業規模の推薦システム
8.1 多段階アーキテクチャ
実世界の大規模システムは複数のステージで動作します:
class IndustrialRecSystem:
"""
産業向け推薦システムの概要:
ステージ1 — 検索: 数百万 → 数百の候補 (Two-Tower + ANN)
ステージ2 — ランキング: 数百 → 上位50件 (DCN / xDeepFM)
ステージ3 — 再ランキング: 上位50件 → 最終20件 (多様性/鮮度)
"""
def __init__(self):
print("産業向け推薦システムを初期化")
print(" ステージ1 — 検索: Two-Tower + Faiss(ミリ秒以下)")
print(" ステージ2 — ランキング: DCN(特徴交差あり)")
print(" ステージ3 — 再ランキング: MMR/DPP(多様性確保)")
def retrieval(self, user_emb, index, k=500):
print(f" 検索: {k}件の候補")
return list(range(k))
def ranking(self, user_feats, candidates):
print(f" ランキング: {len(candidates)} -> 50")
return candidates[:50]
def reranking(self, ranked, diversity_weight=0.3):
print(f" 再ランキング: diversity_weight={diversity_weight}")
return ranked[:20]
class DeepCrossNetwork(nn.Module):
"""
Deep & Cross Network (DCN)
Wang et al., 2017 — 自動特徴交差
"""
def __init__(self, input_dim, cross_layers=3, deep_dims=None, dropout=0.1):
super().__init__()
if deep_dims is None:
deep_dims = [256, 128, 64]
self.cross_w = nn.ParameterList([nn.Parameter(torch.randn(input_dim, 1)) for _ in range(cross_layers)])
self.cross_b = nn.ParameterList([nn.Parameter(torch.zeros(input_dim)) for _ in range(cross_layers)])
deep, in_d = [], input_dim
for d in deep_dims:
deep += [nn.Linear(in_d, d), nn.LayerNorm(d), nn.ReLU(), nn.Dropout(dropout)]
in_d = d
self.deep = nn.Sequential(*deep)
self.output = nn.Linear(input_dim + deep_dims[-1], 1)
def cross(self, x0, x):
for w, b in zip(self.cross_w, self.cross_b):
x = x0 * (torch.matmul(x, w) + b.unsqueeze(0)) + x
return x
def forward(self, x):
cross_out = self.cross(x, x.clone())
deep_out = self.deep(x)
return torch.sigmoid(self.output(torch.cat([cross_out, deep_out], dim=1))).squeeze()
dcn = DeepCrossNetwork(128).to(device)
print(f"DCNパラメータ数: {sum(p.numel() for p in dcn.parameters()):,}")
8.2 コールドスタート対策
class ColdStartStrategies:
@staticmethod
def content_for_new_items(description, encoder, existing_embs):
"""新アイテム: コンテンツ埋め込みで既存の類似アイテムを検索"""
new_emb = encoder.encode([description])
sims = torch.matmul(new_emb, existing_embs.T)
return sims.topk(5).indices[0]
@staticmethod
def onboarding_for_new_users():
print("新規ユーザーのコールドスタート戦略:")
print(" 1. オンボーディング調査(好きなジャンル、人気度の好み)")
print(" 2. デモグラフィックベースのグループ推薦")
print(" 3. 探索-活用バンディット(イプシロン-greedy)")
print(" 4. 迅速な暗黙的フィードバック収集")
@staticmethod
def epsilon_greedy(n_items, epsilon=0.1):
"""イプシロン-greedyの探索と活用"""
if np.random.random() < epsilon:
return np.random.randint(n_items) # 探索
return 0 # 活用: 最高スコアのアイテム
ColdStartStrategies.onboarding_for_new_users()
9. 実世界の実装: Surpriseライブラリ
9.1 Surpriseで素早い推薦システム
def demo_surprise():
"""
Surpriseライブラリ(協調フィルタリング向け)。
インストール: pip install scikit-surprise
"""
usage = """
from surprise import Dataset, SVD, KNNBasic
from surprise.model_selection import cross_validate, train_test_split
from surprise import accuracy
data = Dataset.load_builtin('ml-100k')
trainset, testset = train_test_split(data, test_size=0.2, random_state=42)
svd = SVD(n_factors=100, n_epochs=20, lr_all=0.005, reg_all=0.02)
svd.fit(trainset)
preds = svd.test(testset)
print(f"SVD RMSE: {accuracy.rmse(preds):.4f}")
# クロスバリデーション
cv = cross_validate(SVD(), data, measures=['RMSE', 'MAE'], cv=5, verbose=True)
print(f"平均CV RMSE: {cv['test_rmse'].mean():.4f}")
"""
print("Surpriseライブラリのアルゴリズム:")
print(" SVD — 行列分解(Netflixプライズのベースライン)")
print(" SVD++ — 暗黙的フィードバック付きSVD")
print(" NMF — 非負値行列分解")
print(" KNNBasic/Means/Baseline — 近傍法")
demo_surprise()
9.2 LightFMによるハイブリッド推薦
def demo_lightfm():
"""
LightFM: ハイブリッド協調フィルタリング + コンテンツベース。
インストール: pip install lightfm
"""
usage = """
from lightfm import LightFM
from lightfm.evaluation import precision_at_k, auc_score
from lightfm.datasets import fetch_movielens
data = fetch_movielens()
train, test = data['train'], data['test']
# BPR損失
model_bpr = LightFM(no_components=30, loss='bpr', learning_rate=0.05)
model_bpr.fit(train, epochs=30, num_threads=4)
# WARP損失(より強いランキング信号)
model_warp = LightFM(no_components=30, loss='warp', learning_rate=0.05)
model_warp.fit(train, epochs=30, num_threads=4)
print(f"BPR Precision@10: {precision_at_k(model_bpr, test, k=10).mean():.4f}")
print(f"WARP Precision@10: {precision_at_k(model_warp, test, k=10).mean():.4f}")
"""
print("LightFMハイブリッド推薦:")
print(" 協調フィルタリングとアイテム/ユーザー特徴行列を組み合わせ")
print(" BPR、WARP、ロジスティック、warp-kos損失をサポート")
print(" コンテンツ特徴でコールドスタートを軽減")
demo_lightfm()
10. モデルベンチマークと選択ガイド
import pandas as pd
benchmark = pd.DataFrame({
'モデル': ['User-based KNN', 'SVD', 'BPR-MF', 'NCF',
'Two-Tower', 'SASRec', 'LightGCN', 'LLMベース'],
'Precision@10': [0.042, 0.061, 0.068, 0.075, 0.072, 0.089, 0.085, 0.078],
'Recall@10': [0.134, 0.198, 0.221, 0.244, 0.238, 0.289, 0.279, 0.261],
'NDCG@10': [0.089, 0.124, 0.138, 0.158, 0.154, 0.187, 0.179, 0.169],
'学習時間': ['1分', '5分', '3分', '20分',
'30分', '25分', '40分', '60分以上'],
'スケール': ['小規模', '中規模', '中規模', '大規模',
'超大規模', '大規模', '大規模', 'あらゆる規模'],
'コールドスタート': ['不良', '不良', '不良', '不良',
'良好', '普通', '普通', '優秀'],
})
print("推薦システムベンチマーク(MovieLens 1M)")
print("=" * 95)
print(benchmark.to_string(index=False))
print("\nモデル選択ガイド:")
print(" 小規模データ(~10万インタラクション): SVD、User-based KNN")
print(" 中規模データ(~100万インタラクション): NCF、BPR-MF")
print(" 大規模データ(1000万以上): Two-Tower + LightGCN + SASRec")
print(" コールドスタートが重要: LLMアイテムエンコーディング + Two-Tower")
print(" リアルタイムサービング: Two-Tower(事前計算済み埋め込み)+ Faiss")
おわりに
このガイドは現代の推薦システムの全スペクトラムをカバーしました。
重要なポイント:
- 基礎: 協調フィルタリングと評価指標(Precision@K、NDCG)
- 行列分解: SVD、BPR — 強力で解釈可能なベースライン
- NCF: 深層学習でMFの限界を超える
- Two-Tower: インターネット規模の推薦システムの主要アーキテクチャ
- 逐次モデル: SASRec、BERT4Rec — 時系列ユーザー行動を活用
- グラフモデル: LightGCN — 高次接続性をキャプチャ
- LLMパワード: セマンティック理解でコールドスタートを解決
実践的なヒント:
- 常にBPR-MFベースラインから始める — 打ち破るのは驚くほど難しい。
- プロダクション規模では多段階検索が必須。
- 系列データがある場合、SASRecは静的モデルを一貫して上回る。
- コールドスタートが重要な場合、LLMベースのアイテムエンコーディングに投資する。
参考文献: