Реалізація детекції аномалій у тимчасових рядах
Детекція аномалій у часових рядах — завдання з безліччю підходів та жодним універсальним рішенням. Вибір алгоритму визначається типом аномалії (точкова, контекстуальна, колективна), доступністю міток та обчислювальними обмеженнями. Практична система поєднує кілька методів для мінімізації перепусток та хибних спрацьовувань.
Типологія аномалій
Точкові аномалії (Point Anomalies): Одиничне значення різко вибивається із низки. Приклад: прочитання датчика температури 200°C при нормі 50°C.
Контекстуальні аномалії: Значення нормально саме собою, але аномально у цьому контексті. Приклад: температура 35 ° C у січні (норма влітку, аномалія взимку).
Колективні аномалії: Послідовність значень нормальна окремо, але аномальна разом. Приклад: кілька стандартних транзакцій, що утворюють патерн шахрайства.
Статистичні методи
Z-Score та MAD:
import numpy as np
from scipy.stats import median_abs_deviation
def zscore_anomalies(series, threshold=3.0):
"""
Z-score: хорошо для нормально распределённых данных
"""
z_scores = np.abs((series - series.mean()) / series.std())
return z_scores > threshold
def mad_anomalies(series, threshold=3.5):
"""
MAD (Median Absolute Deviation): устойчив к выбросам в обучающих данных
Предпочтительнее z-score для данных с артефактами
"""
median = np.median(series)
mad = median_abs_deviation(series)
modified_z = 0.6745 * (series - median) / mad
return np.abs(modified_z) > threshold
CUSUM для поступових змін:
def cusum_detector(series, k=0.5, h=5.0):
"""
CUSUM: накапливает отклонения → детектирует shift в среднем
k: reference value (чувствительность)
h: threshold (порог срабатывания)
"""
mean = series[:50].mean() # baseline на начале ряда
std = series[:50].std()
S_pos = np.zeros(len(series))
S_neg = np.zeros(len(series))
for t in range(1, len(series)):
xi = (series[t] - mean) / std
S_pos[t] = max(0, S_pos[t-1] + xi - k)
S_neg[t] = max(0, S_neg[t-1] - xi - k)
return (S_pos > h) | (S_neg > h)
STL-декомпозиція + residual detection:
from statsmodels.tsa.seasonal import STL
def stl_anomaly_detection(series, period=24, threshold=3.5):
"""
Разложение на тренд + сезонность + остаток
Аномалия = большой остаток
"""
stl = STL(series, period=period, robust=True)
result = stl.fit()
residuals = result.resid
# MAD на остатках
mad = median_abs_deviation(residuals)
modified_z = np.abs(0.6745 * (residuals - np.median(residuals)) / mad)
return modified_z > threshold, result
ML-методи
Isolation Forest:
from sklearn.ensemble import IsolationForest
def isolation_forest_detector(series, contamination=0.05, window=10):
"""
Isolation Forest: эффективен на многомерных данных
contamination: ожидаемая доля аномалий
window: размер скользящего окна для создания фич
"""
# Создание оконных фич
features = []
for i in range(window, len(series)):
window_data = series[i-window:i]
features.append([
window_data.mean(),
window_data.std(),
window_data.max() - window_data.min(),
window_data[-1] - window_data.mean(), # current deviation
np.corrcoef(np.arange(window), window_data)[0,1] # trend
])
features = np.array(features)
iso_forest = IsolationForest(contamination=contamination, random_state=42)
predictions = iso_forest.fit_predict(features)
# -1 = аномалия, 1 = нормально
return predictions == -1
LSTM Autoencoder:
import torch
import torch.nn as nn
class LSTMAutoencoder(nn.Module):
def __init__(self, input_size, hidden_size=64, num_layers=2):
super().__init__()
# Encoder
self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
# Decoder
self.decoder = nn.LSTM(hidden_size, input_size, num_layers, batch_first=True)
def forward(self, x):
# Encode
_, (h_n, c_n) = self.encoder(x)
# Decode: repeat hidden state for decoder input
decoder_input = h_n[-1].unsqueeze(1).repeat(1, x.size(1), 1)
reconstruction, _ = self.decoder(decoder_input)
return reconstruction
def detect_autoencoder_anomalies(model, series, threshold_quantile=0.95):
"""
Reconstruction error как мера аномальности
Высокий RE = модель не может восстановить паттерн = аномалия
"""
with torch.no_grad():
reconstruction = model(series)
re = torch.mean((series - reconstruction)**2, dim=[1, 2])
threshold = torch.quantile(re, threshold_quantile)
return re > threshold
Online (потокова) детекція
Streaming Anomaly Detection:
from collections import deque
import numpy as np
class OnlineAnomalyDetector:
"""
Полностью онлайн: работает без накопления истории в памяти
Обновляет статистику при каждой новой точке
"""
def __init__(self, window_size=200, threshold=3.5):
self.window = deque(maxlen=window_size)
self.threshold = threshold
self.n = 0
self.mean = 0
self.M2 = 0 # Welford's algorithm для online variance
def update(self, value):
self.window.append(value)
self.n += 1
# Welford's online mean and variance
delta = value - self.mean
self.mean += delta / self.n
delta2 = value - self.mean
self.M2 += delta * delta2
variance = self.M2 / (self.n - 1) if self.n > 1 else 0
std = np.sqrt(variance)
if std > 0 and self.n > 30: # warmup period
z_score = abs(value - self.mean) / std
return z_score > self.threshold
return False
Оцінка якості детектора
Метрики за наявності міток:
from sklearn.metrics import precision_score, recall_score, f1_score, average_precision_score
def evaluate_detector(y_true, y_pred, y_scores=None):
metrics = {
'precision': precision_score(y_true, y_pred),
'recall': recall_score(y_true, y_pred),
'f1': f1_score(y_true, y_pred),
}
if y_scores is not None:
metrics['average_precision'] = average_precision_score(y_true, y_scores)
return metrics
Без тегів — відносна оцінка:
- False Positive Rate: частка часу, коли система в "аномалії" за нормального режиму
- Alert fatigue: якщо > 5% часу генеруються алерти – система надто чутлива
- Operational feedback: інженери позначають алерти як true/false positive → continuously improved model
Практичні сценарії
Метрики інфраструктури: Prometheus метрики → STL декомпозиція + Isolation Forest. Основна проблема: деплої створюють хибні аномалії. Рішення: suppress detection window ±10 хвилин від деплою.
Фінансові транзакції: Висока class imbalance (аномалії <0.1%). LSTM Autoencoder або Isolation Forest краще supervised методів.
Промислові датчики: Часто мають фізично обумовлені обмеження → threshold + статистичний hybrid.
Терміни: STL + Isolation Forest + online Z-score + базовий дашборд - 3-4 тижні. LSTM Autoencoder, streaming detection, feedback loop для перенавчання, multi-sensor fusion - 2-3 місяці.







