Разработка AI-рекомендательной системы
Рекомендательная система — это ML-модель, которая предсказывает, что пользователь захочет посмотреть, купить или прочитать дальше. Разница между хорошей и плохой реализацией: 15-20% выручки в первом случае против 2-3% lift во втором. Архитектурных подходов несколько — выбор зависит от объёма данных, холодного старта и бизнес-контекста.
Выбор архитектуры по данным
| Объём транзакций | Рекомендованный подход | Recall@10 | Latency |
|---|---|---|---|
| < 10K | Content-based + правила | 15-25% | < 5ms |
| 10K – 500K | Matrix Factorization (ALS) | 25-40% | < 20ms |
| 500K – 5M | Two-tower neural + MF ensemble | 35-50% | < 50ms |
| > 5M | Two-tower + GNN + re-ranking | 45-65% | < 100ms |
Two-Tower нейронная архитектура
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader, Dataset
class UserTower(nn.Module):
"""Энкодер пользователя"""
def __init__(self, n_users: int, n_categories: int,
embedding_dim: int = 64, hidden_dim: int = 128):
super().__init__()
self.user_emb = nn.Embedding(n_users + 1, embedding_dim, padding_idx=0)
self.category_emb = nn.Embedding(n_categories + 1, 16, padding_idx=0)
self.mlp = nn.Sequential(
nn.Linear(embedding_dim + 16 * 5 + 10, hidden_dim),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim, 64),
nn.LayerNorm(64)
)
def forward(self, user_id, top_categories, behavior_features):
user_vec = self.user_emb(user_id)
cat_vecs = self.category_emb(top_categories).view(top_categories.shape[0], -1)
combined = torch.cat([user_vec, cat_vecs, behavior_features], dim=1)
return self.mlp(combined)
class ItemTower(nn.Module):
"""Энкодер предмета (товара/контента)"""
def __init__(self, n_items: int, n_categories: int,
embedding_dim: int = 64, text_dim: int = 128):
super().__init__()
self.item_emb = nn.Embedding(n_items + 1, embedding_dim, padding_idx=0)
self.category_emb = nn.Embedding(n_categories + 1, 16, padding_idx=0)
self.mlp = nn.Sequential(
nn.Linear(embedding_dim + 16 + text_dim + 5, 128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 64),
nn.LayerNorm(64)
)
def forward(self, item_id, category_id, text_features, item_features):
item_vec = self.item_emb(item_id)
cat_vec = self.category_emb(category_id)
combined = torch.cat([item_vec, cat_vec, text_features, item_features], dim=1)
return self.mlp(combined)
class TwoTowerModel(nn.Module):
def __init__(self, n_users, n_items, n_categories, text_dim=128):
super().__init__()
self.user_tower = UserTower(n_users, n_categories)
self.item_tower = ItemTower(n_items, n_categories, text_dim=text_dim)
self.temperature = nn.Parameter(torch.ones(1) * 0.05)
def forward(self, user_inputs, item_inputs):
user_emb = self.user_tower(**user_inputs)
item_emb = self.item_tower(**item_inputs)
# Cosine similarity
user_norm = nn.functional.normalize(user_emb, dim=1)
item_norm = nn.functional.normalize(item_emb, dim=1)
scores = torch.sum(user_norm * item_norm, dim=1) / self.temperature
return scores
def get_user_embedding(self, user_inputs) -> torch.Tensor:
with torch.no_grad():
return nn.functional.normalize(self.user_tower(**user_inputs), dim=1)
def get_item_embedding(self, item_inputs) -> torch.Tensor:
with torch.no_grad():
return nn.functional.normalize(self.item_tower(**item_inputs), dim=1)
In-Batch Negative Sampling обучение
class RecommendationTrainer:
def __init__(self, model: TwoTowerModel, lr: float = 1e-3):
self.model = model
self.optimizer = torch.optim.Adam(model.parameters(), lr=lr)
self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
self.optimizer, T_max=10
)
def train_epoch(self, dataloader: DataLoader) -> float:
self.model.train()
total_loss = 0
for batch in dataloader:
user_inputs, pos_item_inputs = batch['user'], batch['positive_item']
# User embeddings: [batch_size, dim]
user_embs = nn.functional.normalize(
self.model.user_tower(**user_inputs), dim=1
)
# Item embeddings: [batch_size, dim]
item_embs = nn.functional.normalize(
self.model.item_tower(**pos_item_inputs), dim=1
)
# In-batch negatives: матрица схожестей [batch_size x batch_size]
scores = torch.matmul(user_embs, item_embs.T) / self.model.temperature
# Диагональ = позитивные примеры
labels = torch.arange(len(user_embs)).to(scores.device)
loss = nn.functional.cross_entropy(scores, labels)
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
self.optimizer.step()
total_loss += loss.item()
self.scheduler.step()
return total_loss / len(dataloader)
ANN-индекс для реального времени
import faiss
class RecommendationIndex:
"""Индекс для быстрого поиска top-K рекомендаций"""
def __init__(self, dim: int = 64, n_items: int = 100000):
# IVF-PQ индекс: компромисс скорость/точность
quantizer = faiss.IndexFlatIP(dim)
n_lists = min(int(np.sqrt(n_items)), 4096)
self.index = faiss.IndexIVFPQ(quantizer, dim, n_lists, 8, 8)
self.index.nprobe = 32 # Число clusters для поиска
self.item_ids = []
def build(self, item_embeddings: np.ndarray, item_ids: list):
"""Построение индекса из эмбеддингов"""
embeddings_norm = item_embeddings / np.linalg.norm(
item_embeddings, axis=1, keepdims=True
)
self.index.train(embeddings_norm.astype(np.float32))
self.index.add(embeddings_norm.astype(np.float32))
self.item_ids = item_ids
def recommend(self, user_embedding: np.ndarray,
k: int = 50,
exclude_ids: list = None) -> list[tuple]:
"""Top-K рекомендаций для пользователя"""
user_norm = user_embedding / np.linalg.norm(user_embedding)
scores, indices = self.index.search(
user_norm.reshape(1, -1).astype(np.float32),
k + (len(exclude_ids) if exclude_ids else 0) + 10
)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx < 0:
continue
item_id = self.item_ids[idx]
if exclude_ids and item_id in exclude_ids:
continue
results.append((item_id, float(score)))
if len(results) >= k:
break
return results
Re-ranking и бизнес-правила
class RecommendationReranker:
"""Применение бизнес-логики к кандидатам от retrieval модели"""
def rerank(self, candidates: list[tuple],
user_context: dict,
business_rules: dict) -> list[tuple]:
"""
candidates: [(item_id, base_score)]
business_rules: {'boost_new_items': 1.2, 'boost_on_sale': 1.15, 'max_same_category': 3}
"""
scored_candidates = []
category_count = {}
for item_id, base_score in candidates:
item = self._get_item_metadata(item_id)
if item is None:
continue
# Применение буста
score = base_score
if business_rules.get('boost_new_items') and item.get('is_new'):
score *= business_rules['boost_new_items']
if business_rules.get('boost_on_sale') and item.get('on_sale'):
score *= business_rules['boost_on_sale']
# Лимит категорий
cat = item.get('category')
max_per_cat = business_rules.get('max_same_category', 5)
if category_count.get(cat, 0) >= max_per_cat:
continue
category_count[cat] = category_count.get(cat, 0) + 1
scored_candidates.append((item_id, score))
return sorted(scored_candidates, key=lambda x: x[1], reverse=True)
Типовые метрики после внедрения рекомендательной системы: CTR рекомендаций 8-15% (против 2-4% популярных товаров), конверсия из рекомендации 3-7%, дополнительная выручка 12-25% от общего оборота. Время холодного старта для новых пользователей без истории: 3-5 взаимодействий для получения персонализированных рекомендаций.







