Розробка AI-системи детекції аномалій у телеком-мережі

Проектуємо та впроваджуємо системи штучного інтелекту: від прототипу до production-ready рішення. Наша команда поєднує експертизу в машинному навчанні, дата-інжинірингу та MLOps, щоб AI працював не в лабораторії, а в реальному бізнесі.
Показано 1 з 1Усі 1566 послуг
Розробка AI-системи детекції аномалій у телеком-мережі
Середній
~1-2 тижні
Часті запитання

Напрямки AI-розробки

Етапи розробки AI-рішення

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    902
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1122
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    859

Розробка AI-системи детекції аномалій у телеком-мережі

Телеком-мережа генерує мільйони метрик за хвилину. Традиційні статичні пороги – 80% CPU, packet loss > 1% – не вловлюють тонкі аномалії: повільний дрейф, кореловані деградації за кількома KPI, нетиповий патерн трафіку. ML-детекція працює без заданих порогів, адаптуючись до нормальної поведінки кожного елемента.

Багатовимірна аномалія на рівні мережі

Чому статичні пороги недостатні:

  • Нормальний CPU маршрутизатор у піковий час = 75% (не аномалія)
  • CPU 50% о 3 ночі в суботу = аномалія (можлива атака або витік пам'яті)
  • Одночасна деградація 5 KPI на одному елементі = аномалія, хоча кожен окремо в нормі

Контекстно-залежні пороги:

import pandas as pd
import numpy as np
from prophet import Prophet

class ContextualAnomalyDetector:
    def __init__(self, kpi_name: str):
        self.kpi_name = kpi_name
        self.prophet_model = Prophet(
            daily_seasonality=True,
            weekly_seasonality=True,
            interval_width=0.99
        )
        self.fitted = False

    def fit(self, historical_data: pd.DataFrame):
        """
        historical_data: DataFrame с колонками ds (datetime), y (значение KPI)
        Минимум 4 недели истории для корректной сезонности.
        """
        self.prophet_model.fit(historical_data)
        self.fitted = True

    def detect(self, current_value: float, current_time: pd.Timestamp) -> dict:
        future = pd.DataFrame({'ds': [current_time]})
        forecast = self.prophet_model.predict(future)

        yhat = forecast['yhat'].values[0]
        yhat_lower = forecast['yhat_lower'].values[0]
        yhat_upper = forecast['yhat_upper'].values[0]

        is_anomaly = current_value < yhat_lower or current_value > yhat_upper
        deviation = (current_value - yhat) / (abs(yhat) + 1e-9)

        return {
            'kpi': self.kpi_name,
            'value': current_value,
            'expected': yhat,
            'bounds': (yhat_lower, yhat_upper),
            'anomaly': is_anomaly,
            'relative_deviation': deviation
        }

Multivariate Anomaly Detection

Isolation Forest на векторі метрик вузла:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

class NetworkElementAnomalyDetector:
    """
    Каждый сетевой элемент имеет свою модель Isolation Forest.
    Обучение: 30 дней нормальной работы.
    Инференс: каждые 5 минут на текущем векторе KPI.
    """
    def __init__(self, element_id: str, contamination=0.01):
        self.element_id = element_id
        self.scaler = StandardScaler()
        self.model = IsolationForest(
            contamination=contamination,
            n_estimators=100,
            random_state=42
        )

    def fit(self, normal_kpi_matrix: np.ndarray):
        """
        normal_kpi_matrix: (N_samples × N_kpis)
        """
        X = self.scaler.fit_transform(normal_kpi_matrix)
        self.model.fit(X)
        # Калибровка порога на нормальных данных
        scores = self.model.score_samples(X)
        self.threshold = np.percentile(scores, 1)  # 1% ложных срабатываний

    def score(self, kpi_vector: np.ndarray) -> dict:
        X = self.scaler.transform([kpi_vector])
        raw_score = self.model.score_samples(X)[0]
        anomaly_score = -raw_score  # выше = аномальнее

        return {
            'element_id': self.element_id,
            'anomaly_score': float(anomaly_score),
            'is_anomaly': raw_score < self.threshold,
            'severity': self._score_to_severity(anomaly_score)
        }

    def _score_to_severity(self, score):
        if score > 0.7: return 'critical'
        if score > 0.5: return 'major'
        if score > 0.3: return 'minor'
        return 'normal'

Traffic Pattern Anomaly

Детекція нетипового трафіку (DDoS, BGP hijack):

def detect_traffic_anomaly(traffic_matrix: pd.DataFrame,
                            baseline_stats: dict) -> list:
    """
    traffic_matrix: src_ip × dst_ip × bytes за 5 минут (NetFlow/IPFIX)
    Аномалии трафика: объёмные (DDoS), структурные (BGP hijack), протокольные
    """
    anomalies = []

    # 1. Объёмная аномалия: резкий рост входящего трафика
    current_total = traffic_matrix['bytes'].sum()
    baseline_total = baseline_stats['total_bytes_mean']
    baseline_std = baseline_stats['total_bytes_std']

    volume_z_score = (current_total - baseline_total) / (baseline_std + 1e-9)
    if volume_z_score > 5:
        anomalies.append({
            'type': 'volumetric_spike',
            'severity': 'critical',
            'z_score': volume_z_score,
            'possible_cause': 'DDoS attack or flash crowd'
        })

    # 2. Новые источники: IP, которых не было в baseline
    current_sources = set(traffic_matrix['src_ip'].unique())
    known_sources = baseline_stats.get('known_src_ips', set())
    new_sources = current_sources - known_sources
    if len(new_sources) > baseline_stats.get('new_ip_threshold', 1000):
        anomalies.append({
            'type': 'new_source_flood',
            'severity': 'major',
            'new_ips_count': len(new_sources)
        })

    # 3. Протокольная аномалия: рост ICMP или UDP flood
    protocol_ratios = traffic_matrix.groupby('protocol')['bytes'].sum() / current_total
    for proto in ['ICMP', 'UDP']:
        if protocol_ratios.get(proto, 0) > 0.5:
            anomalies.append({
                'type': f'{proto}_flood',
                'severity': 'major',
                'ratio': protocol_ratios[proto]
            })

    return anomalies

BGP та маршрутизація

Детекція BGP anomaly:

def analyze_bgp_events(bgp_updates: pd.DataFrame, baseline_prefix_count: int) -> dict:
    """
    BGP hijack: внезапное появление нового AS-path для известного префикса.
    BGP leak: маршруты от одного провайдера рекламируются другому.
    Route flap: частые обновления = нестабильность соединения.
    """
    # Route flapping
    prefix_update_counts = bgp_updates.groupby('prefix').size()
    flapping_prefixes = prefix_update_counts[prefix_update_counts > 10].index.tolist()

    # Новые AS-origin для известных префиксов
    known_origins = {}  # prefix → expected AS
    hijack_candidates = []
    for _, row in bgp_updates.iterrows():
        if row['prefix'] in known_origins:
            if row['origin_as'] != known_origins[row['prefix']]:
                hijack_candidates.append({
                    'prefix': row['prefix'],
                    'expected_as': known_origins[row['prefix']],
                    'detected_as': row['origin_as']
                })

    return {
        'flapping_prefixes': flapping_prefixes,
        'hijack_candidates': hijack_candidates,
        'route_instability': len(flapping_prefixes) > 5
    }

Alert Correlation та придушення шуму

Граф кореляцій аномалій: При збої аплінка маршрутизатора → сотні downstream аномалій. Алгоритм: будуємо граф залежностей з топології CMDB → визначаємо джерело upstream → групуємо в один інцидент.

Терміни: Prophet контекстуальна аномалія + Isolation Forest на вузлах + traffic anomaly - 3-4 тижні. BGP anomaly, alert correlation граф, автоматичний RCA, NOC інтеграція – 2-3 місяці.