Разработка AI-системы анализа логов и автоматического обнаружения аномалий
Логи — богатейший источник информации о состоянии системы, но их объём (миллиарды строк в сутки) делает ручной анализ невозможным. ML-система автоматически структурирует неструктурированные логи, обнаруживает аномальные паттерны и приоритизирует инциденты.
Проблема масштаба
Объёмы: крупная микросервисная система генерирует 1-10 Гб логов в минуту. Традиционный grep и Kibana dashboard — реактивные инструменты. ML-система работает проактивно.
Структура лог-данных:
# Неструктурированная строка лога
raw_log = "2024-01-15 14:23:45 ERROR ServiceA [req-id-123] Failed to connect to DB: Connection timeout after 30000ms, retry 3/5"
# После ML-парсинга:
parsed_log = {
'timestamp': '2024-01-15T14:23:45Z',
'level': 'ERROR',
'service': 'ServiceA',
'request_id': 'req-id-123',
'event_template': 'Failed to connect to <*>: Connection timeout after <*>ms, retry <*/5>',
'template_id': 'template_db_connection_timeout',
'parameters': {
'target': 'DB',
'timeout_ms': 30000,
'retry_attempt': 3
}
}
Парсинг и шаблонизация
Drain3 — онлайн парсинг:
from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig
config = TemplateMinerConfig()
config.drain_sim_th = 0.4 # порог схожести шаблонов
config.drain_depth = 4 # глубина дерева
miner = TemplateMiner(config=config)
def process_log_stream(log_stream):
template_stats = {}
for log_line in log_stream:
result = miner.add_log_message(log_line.message)
cluster_id = result['cluster_id']
if cluster_id not in template_stats:
template_stats[cluster_id] = {
'template': result['template_mined'],
'count': 0,
'timestamps': [],
'log_level_dist': {}
}
template_stats[cluster_id]['count'] += 1
template_stats[cluster_id]['timestamps'].append(log_line.timestamp)
return template_stats
Альтернативы Drain3:
- Spell: Streaming Parser для Execution Logs
- IPLoM: Iterative Partitioning Log Mining
- LLM-based: GPT/Claude для семантического парсинга сложных, нестандартных форматов
Детекция аномальных паттернов
Частотная аномалия (count-based):
import pandas as pd
from collections import deque
import numpy as np
class TemplateFrequencyMonitor:
"""
Мониторинг частоты появления каждого template-шаблона
Аномалия: резкий рост ERROR шаблона
"""
def __init__(self, window_minutes=10, baseline_minutes=60):
self.baseline_window = deque(maxlen=baseline_minutes)
self.current_window = deque(maxlen=window_minutes)
def update(self, template_counts_per_minute):
self.baseline_window.append(template_counts_per_minute)
self.current_window.append(template_counts_per_minute)
if len(self.current_window) < self.current_window.maxlen:
return {} # недостаточно данных
anomalies = {}
current = pd.DataFrame(list(self.current_window)).mean()
baseline = pd.DataFrame(list(self.baseline_window)).mean()
for template_id in current.index:
base_rate = baseline.get(template_id, 1)
curr_rate = current[template_id]
spike_ratio = curr_rate / (base_rate + 0.1)
if spike_ratio > 5 and curr_rate > 10: # 5x spike и минимальный объём
anomalies[template_id] = {
'spike_ratio': spike_ratio,
'current_rate': curr_rate,
'baseline_rate': base_rate
}
return anomalies
Семантическая аномалия (embedding-based):
from sentence_transformers import SentenceTransformer
from sklearn.ensemble import IsolationForest
class SemanticLogAnomalyDetector:
"""
Embedding логов → Isolation Forest
Обнаруживает семантически необычные сообщения
даже если их частота нормальная
"""
def __init__(self):
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.detector = IsolationForest(contamination=0.01)
def fit(self, normal_log_templates):
embeddings = self.encoder.encode(normal_log_templates)
self.detector.fit(embeddings)
def detect(self, new_log_line):
embedding = self.encoder.encode([new_log_line])
score = self.detector.score_samples(embedding)[0]
is_anomaly = score < self.detector.threshold_
return is_anomaly, score
Последовательная аномалия (sequence-based):
def detect_sequence_anomalies(log_sequence, expected_sequences, n_gram=3):
"""
Некоторые последовательности событий нормальны
Нетипичная последовательность = аномалия в workflow системы
"""
# N-gram модель нормальных последовательностей
from collections import Counter
normal_ngrams = Counter()
for seq in expected_sequences:
for i in range(len(seq) - n_gram + 1):
ngram = tuple(seq[i:i+n_gram])
normal_ngrams[ngram] += 1
# Обнаружение необычных переходов в новой последовательности
anomalous_transitions = []
for i in range(len(log_sequence) - n_gram + 1):
ngram = tuple(log_sequence[i:i+n_gram])
if ngram not in normal_ngrams:
anomalous_transitions.append({
'position': i,
'ngram': ngram,
'context': log_sequence[max(0, i-2):i+n_gram+2]
})
return anomalous_transitions
ML-классификация критичности
Severity Classification:
from transformers import pipeline
class LogSeverityClassifier:
def __init__(self):
# Fine-tuned BERT на размеченных логах
self.classifier = pipeline(
'text-classification',
model='path/to/finetuned-log-severity-bert'
)
def classify(self, log_message):
"""
Классы: informational / warning / error / critical
Не просто по уровню логирования (ERROR != critical impact),
а по семантике сообщения
"""
result = self.classifier(log_message[:512])[0]
return {
'severity': result['label'],
'confidence': result['score'],
'requires_immediate_action': result['label'] == 'critical' and result['score'] > 0.85
}
Лог-агрегация и correlation
Cross-service Log Correlation:
def correlate_across_services(logs_by_service, request_id_field='request_id'):
"""
Один request_id может проходить через 10+ сервисов
Агрегация всех логов одного запроса → полная картина ошибки
"""
request_logs = {}
for service, logs in logs_by_service.items():
for log in logs:
req_id = log.get(request_id_field)
if req_id:
if req_id not in request_logs:
request_logs[req_id] = []
request_logs[req_id].append({
'service': service,
'timestamp': log['timestamp'],
'level': log['level'],
'message': log['message']
})
# Сортировка по времени для каждого request
for req_id in request_logs:
request_logs[req_id].sort(key=lambda x: x['timestamp'])
# Находим запросы с ERROR в цепочке
failed_requests = {
req_id: logs for req_id, logs in request_logs.items()
if any(l['level'] == 'ERROR' for l in logs)
}
return failed_requests
Практическая реализация
ELK Stack + ML Layer:
- Elasticsearch: хранение и поиск логов
- Logstash/Fluent Bit: collection и форматирование
- Kibana: базовая визуализация
- ML Layer (Python FastAPI): Drain3 + anomaly detection + severity classifier
Kafka для streaming:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'logs-topic',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
template_monitor = TemplateFrequencyMonitor()
anomaly_detector = SemanticLogAnomalyDetector()
for message in consumer:
log_entry = message.value
# Парсинг
parsed = miner.add_log_message(log_entry['message'])
# Частотная аномалия
freq_anomalies = template_monitor.update_single(parsed['template_mined'])
# Семантическая аномалия
is_anomaly, score = anomaly_detector.detect(log_entry['message'])
if freq_anomalies or is_anomaly:
send_to_alert_manager(log_entry, freq_anomalies, score)
Сроки: Drain3 парсинг + частотная аномалия + Elasticsearch интеграция — 3-4 недели. Семантическая аномалия (BERT), sequence detection, cross-service correlation, severity classification — 2-3 месяца.







