Обучение модели кластеризации данных
Кластеризация — обучение без учителя, которое выявляет скрытую структуру данных: сегменты клиентов, тематические кластеры документов, аномальные группы транзакций. Выбор алгоритма и правильное определение числа кластеров критически влияют на бизнес-интерпретацию результатов.
Выбор алгоритма кластеризации
| Алгоритм | Кол-во кластеров | Форма кластеров | Масштаб | Применение |
|---|---|---|---|---|
| K-Means | Нужно задать | Сферические | >100K | Сегментация клиентов |
| DBSCAN | Авто | Любая | ~50K | Аномалии, геоданные |
| HDBSCAN | Авто | Любая | >100K | Тексты, изображения |
| Agglomerative | Нужно задать | Любая | ~10K | Иерархия документов |
| GMM | Нужно задать | Эллипсоидные | ~50K | Мягкие вероятности |
K-Means с оптимальным числом кластеров
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, calinski_harabasz_score
import numpy as np
import matplotlib.pyplot as plt
class ClusteringPipeline:
def __init__(self, scale: bool = True):
self.scaler = StandardScaler() if scale else None
self.model = None
def find_optimal_k(self, X: np.ndarray,
k_range: range = range(2, 20)) -> int:
"""Метод локтя + силуэт для определения K"""
if self.scaler:
X = self.scaler.fit_transform(X)
inertias = []
silhouettes = []
for k in k_range:
kmeans = MiniBatchKMeans(n_clusters=k, random_state=42,
batch_size=1024)
labels = kmeans.fit_predict(X)
inertias.append(kmeans.inertia_)
if len(X) > 50000:
sample_idx = np.random.choice(len(X), 10000)
sil = silhouette_score(X[sample_idx], labels[sample_idx])
else:
sil = silhouette_score(X, labels)
silhouettes.append(sil)
# Метод локтя — точка перегиба
diffs = np.diff(inertias)
diffs2 = np.diff(diffs)
elbow_k = k_range[np.argmax(diffs2) + 2]
# Лучший силуэт
best_sil_k = k_range[np.argmax(silhouettes)]
# Консенсус: ближайшие k из двух методов
optimal_k = (elbow_k + best_sil_k) // 2
print(f"Elbow method: k={elbow_k}, Silhouette: k={best_sil_k}, Chosen: k={optimal_k}")
return optimal_k
def fit(self, X: np.ndarray, k: int = None):
if self.scaler:
X_scaled = self.scaler.fit_transform(X)
else:
X_scaled = X
if k is None:
k = self.find_optimal_k(X_scaled)
self.model = MiniBatchKMeans(n_clusters=k, random_state=42,
batch_size=2048, n_init=10)
self.labels = self.model.fit_predict(X_scaled)
return self
def evaluate(self, X: np.ndarray) -> dict:
X_scaled = self.scaler.transform(X) if self.scaler else X
return {
'silhouette': silhouette_score(X_scaled, self.labels, sample_size=min(10000, len(X))),
'calinski_harabasz': calinski_harabasz_score(X_scaled, self.labels),
'n_clusters': len(np.unique(self.labels)),
'cluster_sizes': dict(zip(*np.unique(self.labels, return_counts=True)))
}
HDBSCAN для текстовых данных
import hdbscan
from sentence_transformers import SentenceTransformer
def cluster_documents(texts: list[str], min_cluster_size: int = 10) -> list[int]:
# Эмбеддинги
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(texts, batch_size=256, show_progress_bar=True)
# Снижение размерности перед кластеризацией
from umap import UMAP
umap_model = UMAP(n_components=10, random_state=42, metric='cosine')
reduced = umap_model.fit_transform(embeddings)
# HDBSCAN
clusterer = hdbscan.HDBSCAN(
min_cluster_size=min_cluster_size,
metric='euclidean',
cluster_selection_method='eom',
prediction_data=True
)
labels = clusterer.fit_predict(reduced)
# -1 = noise/outlier
print(f"Found {len(np.unique(labels[labels >= 0]))} clusters")
print(f"Noise points: {(labels == -1).sum()}")
return labels
Интерпретация кластеров
def describe_clusters(X_df: pd.DataFrame, labels: np.ndarray) -> dict:
"""Автоматическое описание каждого кластера"""
cluster_descriptions = {}
for cluster_id in np.unique(labels):
if cluster_id == -1:
continue
mask = labels == cluster_id
cluster_df = X_df[mask]
# Центроид кластера в признаковом пространстве
centroid = cluster_df.mean()
# Наиболее отличительные признаки (выше/ниже среднего)
overall_mean = X_df.mean()
diff = (centroid - overall_mean) / X_df.std()
top_features = diff.abs().nlargest(5).index.tolist()
cluster_descriptions[cluster_id] = {
'size': mask.sum(),
'size_pct': mask.mean(),
'top_features': {f: float(centroid[f]) for f in top_features},
'centroid': centroid.to_dict()
}
return cluster_descriptions
Хорошая кластеризация клиентов имеет силуэтный коэффициент > 0.3, бизнес-интерпретируемые кластеры и стабильность при повторных запусках (Jaccard similarity > 0.8 между запусками).







