Розробка AI-системи аналізу логів і автоматичного виявлення аномалій

Проектуємо та впроваджуємо системи штучного інтелекту: від прототипу до production-ready рішення. Наша команда поєднує експертизу в машинному навчанні, дата-інжинірингу та MLOps, щоб AI працював не в лабораторії, а в реальному бізнесі.
Показано 1 з 1Усі 1566 послуг
Розробка AI-системи аналізу логів і автоматичного виявлення аномалій
Середній
~1-2 тижні
Часті запитання

Напрямки AI-розробки

Етапи розробки AI-рішення

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    901
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1119
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    853

Розробка AI-системи аналізу логів та автоматичного виявлення аномалій

Логи - найбагатше джерело інформації про стан системи, але їх обсяг (мільярди рядків на добу) робить ручний аналіз неможливим. ML-система автоматично структурує неструктуровані логи, виявляє аномальні патерни та пріоритизує інциденти.

Проблема масштабу

Обсяги: велика мікросервісна система генерує 1-10 Гб логів за хвилину. Традиційний grep та Kibana dashboard – реактивні інструменти. ML-система працює проактивно.

Структура лог-даних:

# Неструктурированная строка лога
raw_log = "2024-01-15 14:23:45 ERROR ServiceA [req-id-123] Failed to connect to DB: Connection timeout after 30000ms, retry 3/5"

# После ML-парсинга:
parsed_log = {
    'timestamp': '2024-01-15T14:23:45Z',
    'level': 'ERROR',
    'service': 'ServiceA',
    'request_id': 'req-id-123',
    'event_template': 'Failed to connect to <*>: Connection timeout after <*>ms, retry <*/5>',
    'template_id': 'template_db_connection_timeout',
    'parameters': {
        'target': 'DB',
        'timeout_ms': 30000,
        'retry_attempt': 3
    }
}

Парсинг та шаблонізація

Drain3 - онлайн парсинг:

from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig

config = TemplateMinerConfig()
config.drain_sim_th = 0.4   # порог схожести шаблонов
config.drain_depth = 4       # глубина дерева

miner = TemplateMiner(config=config)

def process_log_stream(log_stream):
    template_stats = {}

    for log_line in log_stream:
        result = miner.add_log_message(log_line.message)

        cluster_id = result['cluster_id']
        if cluster_id not in template_stats:
            template_stats[cluster_id] = {
                'template': result['template_mined'],
                'count': 0,
                'timestamps': [],
                'log_level_dist': {}
            }

        template_stats[cluster_id]['count'] += 1
        template_stats[cluster_id]['timestamps'].append(log_line.timestamp)

    return template_stats

Альтернативи Drain3:

  • Spell: Streaming Parser для Execution Logs
  • IPLoM: Iterative Partitioning Log Mining
  • LLM-based: GPT/Claude для семантичного парсингу складних, нестандартних форматів

Детекція аномальних патернів

Частотна аномалія (count-based):

import pandas as pd
from collections import deque
import numpy as np

class TemplateFrequencyMonitor:
    """
    Мониторинг частоты появления каждого template-шаблона
    Аномалия: резкий рост ERROR шаблона
    """
    def __init__(self, window_minutes=10, baseline_minutes=60):
        self.baseline_window = deque(maxlen=baseline_minutes)
        self.current_window = deque(maxlen=window_minutes)

    def update(self, template_counts_per_minute):
        self.baseline_window.append(template_counts_per_minute)
        self.current_window.append(template_counts_per_minute)

        if len(self.current_window) < self.current_window.maxlen:
            return {}  # недостаточно данных

        anomalies = {}
        current = pd.DataFrame(list(self.current_window)).mean()
        baseline = pd.DataFrame(list(self.baseline_window)).mean()

        for template_id in current.index:
            base_rate = baseline.get(template_id, 1)
            curr_rate = current[template_id]
            spike_ratio = curr_rate / (base_rate + 0.1)

            if spike_ratio > 5 and curr_rate > 10:  # 5x spike и минимальный объём
                anomalies[template_id] = {
                    'spike_ratio': spike_ratio,
                    'current_rate': curr_rate,
                    'baseline_rate': base_rate
                }

        return anomalies

Семантична аномалія (embedding-based):

from sentence_transformers import SentenceTransformer
from sklearn.ensemble import IsolationForest

class SemanticLogAnomalyDetector:
    """
    Embedding логов → Isolation Forest
    Обнаруживает семантически необычные сообщения
    даже если их частота нормальная
    """
    def __init__(self):
        self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
        self.detector = IsolationForest(contamination=0.01)

    def fit(self, normal_log_templates):
        embeddings = self.encoder.encode(normal_log_templates)
        self.detector.fit(embeddings)

    def detect(self, new_log_line):
        embedding = self.encoder.encode([new_log_line])
        score = self.detector.score_samples(embedding)[0]
        is_anomaly = score < self.detector.threshold_
        return is_anomaly, score

Послідовна аномалія (sequence-based):

def detect_sequence_anomalies(log_sequence, expected_sequences, n_gram=3):
    """
    Некоторые последовательности событий нормальны
    Нетипичная последовательность = аномалия в workflow системы
    """
    # N-gram модель нормальных последовательностей
    from collections import Counter
    normal_ngrams = Counter()

    for seq in expected_sequences:
        for i in range(len(seq) - n_gram + 1):
            ngram = tuple(seq[i:i+n_gram])
            normal_ngrams[ngram] += 1

    # Обнаружение необычных переходов в новой последовательности
    anomalous_transitions = []
    for i in range(len(log_sequence) - n_gram + 1):
        ngram = tuple(log_sequence[i:i+n_gram])
        if ngram not in normal_ngrams:
            anomalous_transitions.append({
                'position': i,
                'ngram': ngram,
                'context': log_sequence[max(0, i-2):i+n_gram+2]
            })

    return anomalous_transitions

ML-класифікація критичності

Severity Classification:

from transformers import pipeline

class LogSeverityClassifier:
    def __init__(self):
        # Fine-tuned BERT на размеченных логах
        self.classifier = pipeline(
            'text-classification',
            model='path/to/finetuned-log-severity-bert'
        )

    def classify(self, log_message):
        """
        Классы: informational / warning / error / critical
        Не просто по уровню логирования (ERROR != critical impact),
        а по семантике сообщения
        """
        result = self.classifier(log_message[:512])[0]
        return {
            'severity': result['label'],
            'confidence': result['score'],
            'requires_immediate_action': result['label'] == 'critical' and result['score'] > 0.85
        }

Лог-агрегація та correlation

Cross-service Log Correlation:

def correlate_across_services(logs_by_service, request_id_field='request_id'):
    """
    Один request_id может проходить через 10+ сервисов
    Агрегация всех логов одного запроса → полная картина ошибки
    """
    request_logs = {}

    for service, logs in logs_by_service.items():
        for log in logs:
            req_id = log.get(request_id_field)
            if req_id:
                if req_id not in request_logs:
                    request_logs[req_id] = []
                request_logs[req_id].append({
                    'service': service,
                    'timestamp': log['timestamp'],
                    'level': log['level'],
                    'message': log['message']
                })

    # Сортировка по времени для каждого request
    for req_id in request_logs:
        request_logs[req_id].sort(key=lambda x: x['timestamp'])

    # Находим запросы с ERROR в цепочке
    failed_requests = {
        req_id: logs for req_id, logs in request_logs.items()
        if any(l['level'] == 'ERROR' for l in logs)
    }

    return failed_requests

Практична реалізація

ELK Stack + ML Layer:

  • Elasticsearch: зберігання та пошук логів
  • Logstash/Fluent Bit: collection та форматування
  • Kibana: базова візуалізація
  • ML Layer (Python FastAPI): Drain3 + anomaly detection + severity classifier

Kafka для streaming:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'logs-topic',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

template_monitor = TemplateFrequencyMonitor()
anomaly_detector = SemanticLogAnomalyDetector()

for message in consumer:
    log_entry = message.value
    # Парсинг
    parsed = miner.add_log_message(log_entry['message'])
    # Частотная аномалия
    freq_anomalies = template_monitor.update_single(parsed['template_mined'])
    # Семантическая аномалия
    is_anomaly, score = anomaly_detector.detect(log_entry['message'])

    if freq_anomalies or is_anomaly:
        send_to_alert_manager(log_entry, freq_anomalies, score)

Терміни: Drain3 парсинг + частотна аномалія + Elasticsearch інтеграція - 3-4 тижні. Семантична аномалія (BERT), sequence detection, cross-service correlation, severity classification - 2-3 місяці.