Навчання моделі кластеризації даних
Кластеризація — навчання без учителя, яке виявляє приховану структуру даних: сегменти клієнтів, тематичні кластери документів, аномальні групи транзакцій. Вибір алгоритму та правильне визначення кількості кластерів критично впливають на бізнес-інтерпретацію результатів.
Вибір алгоритму кластеризації
| Алгоритм | Кільк. кластерів | Форма кластерів | Масштаб | Застосування |
|---|---|---|---|---|
| K-Means | Потрібно задати | Сферичні | >100K | Сегментація клієнтів |
| DBSCAN | Авто | Будь-яка | ~50K | Аномалії, геодані |
| HDBSCAN | Авто | Будь-яка | >100K | Тексти, зображення |
| Agglomerative | Потрібно задати | Будь-яка | ~10K | Ієрархія документів |
| GMM | Потрібно задати | Еліпсоїдні | ~50K | М'які ймовірності |
K-Means з оптимальною кількістю кластерів
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, calinski_harabasz_score
import numpy as np
import matplotlib.pyplot as plt
class ClusteringPipeline:
def __init__(self, scale: bool = True):
self.scaler = StandardScaler() if scale else None
self.model = None
def find_optimal_k(self, X: np.ndarray,
k_range: range = range(2, 20)) -> int:
"""Метод локтя + силует для визначення K"""
if self.scaler:
X = self.scaler.fit_transform(X)
inertias = []
silhouettes = []
for k in k_range:
kmeans = MiniBatchKMeans(n_clusters=k, random_state=42,
batch_size=1024)
labels = kmeans.fit_predict(X)
inertias.append(kmeans.inertia_)
if len(X) > 50000:
sample_idx = np.random.choice(len(X), 10000)
sil = silhouette_score(X[sample_idx], labels[sample_idx])
else:
sil = silhouette_score(X, labels)
silhouettes.append(sil)
# Метод локтя — точка перегину
diffs = np.diff(inertias)
diffs2 = np.diff(diffs)
elbow_k = k_range[np.argmax(diffs2) + 2]
# Найкращий силует
best_sil_k = k_range[np.argmax(silhouettes)]
# Консенсус: найближчий k з двох методів
optimal_k = (elbow_k + best_sil_k) // 2
print(f"Elbow method: k={elbow_k}, Silhouette: k={best_sil_k}, Chosen: k={optimal_k}")
return optimal_k
def fit(self, X: np.ndarray, k: int = None):
if self.scaler:
X_scaled = self.scaler.fit_transform(X)
else:
X_scaled = X
if k is None:
k = self.find_optimal_k(X_scaled)
self.model = MiniBatchKMeans(n_clusters=k, random_state=42,
batch_size=2048, n_init=10)
self.labels = self.model.fit_predict(X_scaled)
return self
def evaluate(self, X: np.ndarray) -> dict:
X_scaled = self.scaler.transform(X) if self.scaler else X
return {
'silhouette': silhouette_score(X_scaled, self.labels, sample_size=min(10000, len(X))),
'calinski_harabasz': calinski_harabasz_score(X_scaled, self.labels),
'n_clusters': len(np.unique(self.labels)),
'cluster_sizes': dict(zip(*np.unique(self.labels, return_counts=True)))
}
HDBSCAN для текстових даних
import hdbscan
from sentence_transformers import SentenceTransformer
def cluster_documents(texts: list[str], min_cluster_size: int = 10) -> list[int]:
# Вбудовування
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(texts, batch_size=256, show_progress_bar=True)
# Зниження розмірності перед кластеризацією
from umap import UMAP
umap_model = UMAP(n_components=10, random_state=42, metric='cosine')
reduced = umap_model.fit_transform(embeddings)
# HDBSCAN
clusterer = hdbscan.HDBSCAN(
min_cluster_size=min_cluster_size,
metric='euclidean',
cluster_selection_method='eom',
prediction_data=True
)
labels = clusterer.fit_predict(reduced)
# -1 = шум/викид
print(f"Found {len(np.unique(labels[labels >= 0]))} clusters")
print(f"Noise points: {(labels == -1).sum()}")
return labels
Інтерпретація кластерів
def describe_clusters(X_df: pd.DataFrame, labels: np.ndarray) -> dict:
"""Автоматичний опис кожного кластеру"""
cluster_descriptions = {}
for cluster_id in np.unique(labels):
if cluster_id == -1:
continue
mask = labels == cluster_id
cluster_df = X_df[mask]
# Центроїд кластеру в просторі ознак
centroid = cluster_df.mean()
# Найбільш відмітні ознаки (вище/нижче середнього)
overall_mean = X_df.mean()
diff = (centroid - overall_mean) / X_df.std()
top_features = diff.abs().nlargest(5).index.tolist()
cluster_descriptions[cluster_id] = {
'size': mask.sum(),
'size_pct': mask.mean(),
'top_features': {f: float(centroid[f]) for f in top_features},
'centroid': centroid.to_dict()
}
return cluster_descriptions
Хороша кластеризація клієнтів має коефіцієнт силету > 0.3, бізнес-інтерпретовні кластери та стабільність при повторних запусках (подібність Жаккара > 0.8 між запусками).







