Виявлення аномалій трафіку в реальному часі
Аномалія трафіку — відхилення від історично нормального паттерну: раптовий всплеск запитів, неочікуване зростання помилок, нетиповий розподіл по endpoint'ам. Автоматичне виявлення дозволяє реагувати на DDoS, спроби скрейпингу та інфраструктурні збої за секунди, а не години.
Що вважається аномалією
Обсягові аномалії: RPS, пропускна здатність, кількість унікальних IP раптово зростають.
Структурні аномалії: сторінка HTTP-методів змінюється (раптовий зростання GET при нормі 70/30 GET/POST), зростає частка запитів до конкретних endpoint'ів.
Якісні аномалії: частка помилок 4xx/5xx зростає, p99 latency пробиває історичну норму, збільшується частка 404 (сканування).
Статистичні методи виявлення
import numpy as np
from collections import deque
import time
class TrafficAnomalyDetector:
def __init__(self, window_size=60, sensitivity=3.0):
"""
window_size: розмір ковзного вікна в точках (секунди/хвилини)
sensitivity: поріг у сигмах (z-score)
"""
self.window_size = window_size
self.sensitivity = sensitivity
self.metrics = {} # {metric_name: deque of values}
def _get_window(self, metric: str) -> deque:
if metric not in self.metrics:
self.metrics[metric] = deque(maxlen=self.window_size)
return self.metrics[metric]
def add_point(self, metric: str, value: float):
"""Додати нову точку даних"""
self.metrics.setdefault(metric, deque(maxlen=self.window_size)).append(value)
def check(self, metric: str, current_value: float) -> dict:
"""Перевірити чи є поточне значення аномалією"""
window = self._get_window(metric)
if len(window) < 10:
# Недостатньо даних для аналізу
return {'anomaly': False, 'reason': 'insufficient_data'}
values = list(window)
mean = np.mean(values)
std = np.std(values)
if std == 0:
z_score = 0 if current_value == mean else float('inf')
else:
z_score = abs(current_value - mean) / std
is_anomaly = z_score > self.sensitivity
direction = 'spike' if current_value > mean else 'drop'
return {
'anomaly': is_anomaly,
'z_score': round(z_score, 2),
'direction': direction if is_anomaly else None,
'current': current_value,
'baseline_mean': round(mean, 2),
'baseline_std': round(std, 2),
'threshold': round(mean + self.sensitivity * std, 2)
}
Експоненціальне згладжування (EWMA)
Краще реагує на тренди, не чутлива до одиничних викидів:
class EWMADetector:
def __init__(self, alpha=0.1, k=3.0):
"""
alpha: коефіцієнт згладжування (0.05–0.2)
k: кількість стандартних відхилень для порога
"""
self.alpha = alpha
self.k = k
self.ewma = {} # {metric: {'mean': float, 'variance': float}}
def update_and_check(self, metric: str, value: float) -> dict:
if metric not in self.ewma:
self.ewma[metric] = {'mean': value, 'variance': 0}
return {'anomaly': False}
state = self.ewma[metric]
mean = state['mean']
variance = state['variance']
# Оновити EWMA mean та variance
new_mean = self.alpha * value + (1 - self.alpha) * mean
new_variance = (1 - self.alpha) * (variance + self.alpha * (value - mean) ** 2)
state['mean'] = new_mean
state['variance'] = new_variance
std = np.sqrt(new_variance) if new_variance > 0 else 0
threshold_high = new_mean + self.k * std
threshold_low = max(0, new_mean - self.k * std)
is_anomaly = value > threshold_high or value < threshold_low
return {
'anomaly': is_anomaly,
'direction': 'spike' if value > threshold_high else 'drop',
'current': value,
'expected': round(new_mean, 2),
'threshold_high': round(threshold_high, 2),
'deviation_pct': round(abs(value - new_mean) / max(new_mean, 1) * 100, 1)
}







