Розробка AI-системи аналізу логів та автоматичного виявлення аномалій
Логи - найбагатше джерело інформації про стан системи, але їх обсяг (мільярди рядків на добу) робить ручний аналіз неможливим. ML-система автоматично структурує неструктуровані логи, виявляє аномальні патерни та пріоритизує інциденти.
Проблема масштабу
Обсяги: велика мікросервісна система генерує 1-10 Гб логів за хвилину. Традиційний grep та Kibana dashboard – реактивні інструменти. ML-система працює проактивно.
Структура лог-даних:
# Неструктурированная строка лога
raw_log = "2024-01-15 14:23:45 ERROR ServiceA [req-id-123] Failed to connect to DB: Connection timeout after 30000ms, retry 3/5"
# После ML-парсинга:
parsed_log = {
'timestamp': '2024-01-15T14:23:45Z',
'level': 'ERROR',
'service': 'ServiceA',
'request_id': 'req-id-123',
'event_template': 'Failed to connect to <*>: Connection timeout after <*>ms, retry <*/5>',
'template_id': 'template_db_connection_timeout',
'parameters': {
'target': 'DB',
'timeout_ms': 30000,
'retry_attempt': 3
}
}
Парсинг та шаблонізація
Drain3 - онлайн парсинг:
from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig
config = TemplateMinerConfig()
config.drain_sim_th = 0.4 # порог схожести шаблонов
config.drain_depth = 4 # глубина дерева
miner = TemplateMiner(config=config)
def process_log_stream(log_stream):
template_stats = {}
for log_line in log_stream:
result = miner.add_log_message(log_line.message)
cluster_id = result['cluster_id']
if cluster_id not in template_stats:
template_stats[cluster_id] = {
'template': result['template_mined'],
'count': 0,
'timestamps': [],
'log_level_dist': {}
}
template_stats[cluster_id]['count'] += 1
template_stats[cluster_id]['timestamps'].append(log_line.timestamp)
return template_stats
Альтернативи Drain3:
- Spell: Streaming Parser для Execution Logs
- IPLoM: Iterative Partitioning Log Mining
- LLM-based: GPT/Claude для семантичного парсингу складних, нестандартних форматів
Детекція аномальних патернів
Частотна аномалія (count-based):
import pandas as pd
from collections import deque
import numpy as np
class TemplateFrequencyMonitor:
"""
Мониторинг частоты появления каждого template-шаблона
Аномалия: резкий рост ERROR шаблона
"""
def __init__(self, window_minutes=10, baseline_minutes=60):
self.baseline_window = deque(maxlen=baseline_minutes)
self.current_window = deque(maxlen=window_minutes)
def update(self, template_counts_per_minute):
self.baseline_window.append(template_counts_per_minute)
self.current_window.append(template_counts_per_minute)
if len(self.current_window) < self.current_window.maxlen:
return {} # недостаточно данных
anomalies = {}
current = pd.DataFrame(list(self.current_window)).mean()
baseline = pd.DataFrame(list(self.baseline_window)).mean()
for template_id in current.index:
base_rate = baseline.get(template_id, 1)
curr_rate = current[template_id]
spike_ratio = curr_rate / (base_rate + 0.1)
if spike_ratio > 5 and curr_rate > 10: # 5x spike и минимальный объём
anomalies[template_id] = {
'spike_ratio': spike_ratio,
'current_rate': curr_rate,
'baseline_rate': base_rate
}
return anomalies
Семантична аномалія (embedding-based):
from sentence_transformers import SentenceTransformer
from sklearn.ensemble import IsolationForest
class SemanticLogAnomalyDetector:
"""
Embedding логов → Isolation Forest
Обнаруживает семантически необычные сообщения
даже если их частота нормальная
"""
def __init__(self):
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.detector = IsolationForest(contamination=0.01)
def fit(self, normal_log_templates):
embeddings = self.encoder.encode(normal_log_templates)
self.detector.fit(embeddings)
def detect(self, new_log_line):
embedding = self.encoder.encode([new_log_line])
score = self.detector.score_samples(embedding)[0]
is_anomaly = score < self.detector.threshold_
return is_anomaly, score
Послідовна аномалія (sequence-based):
def detect_sequence_anomalies(log_sequence, expected_sequences, n_gram=3):
"""
Некоторые последовательности событий нормальны
Нетипичная последовательность = аномалия в workflow системы
"""
# N-gram модель нормальных последовательностей
from collections import Counter
normal_ngrams = Counter()
for seq in expected_sequences:
for i in range(len(seq) - n_gram + 1):
ngram = tuple(seq[i:i+n_gram])
normal_ngrams[ngram] += 1
# Обнаружение необычных переходов в новой последовательности
anomalous_transitions = []
for i in range(len(log_sequence) - n_gram + 1):
ngram = tuple(log_sequence[i:i+n_gram])
if ngram not in normal_ngrams:
anomalous_transitions.append({
'position': i,
'ngram': ngram,
'context': log_sequence[max(0, i-2):i+n_gram+2]
})
return anomalous_transitions
ML-класифікація критичності
Severity Classification:
from transformers import pipeline
class LogSeverityClassifier:
def __init__(self):
# Fine-tuned BERT на размеченных логах
self.classifier = pipeline(
'text-classification',
model='path/to/finetuned-log-severity-bert'
)
def classify(self, log_message):
"""
Классы: informational / warning / error / critical
Не просто по уровню логирования (ERROR != critical impact),
а по семантике сообщения
"""
result = self.classifier(log_message[:512])[0]
return {
'severity': result['label'],
'confidence': result['score'],
'requires_immediate_action': result['label'] == 'critical' and result['score'] > 0.85
}
Лог-агрегація та correlation
Cross-service Log Correlation:
def correlate_across_services(logs_by_service, request_id_field='request_id'):
"""
Один request_id может проходить через 10+ сервисов
Агрегация всех логов одного запроса → полная картина ошибки
"""
request_logs = {}
for service, logs in logs_by_service.items():
for log in logs:
req_id = log.get(request_id_field)
if req_id:
if req_id not in request_logs:
request_logs[req_id] = []
request_logs[req_id].append({
'service': service,
'timestamp': log['timestamp'],
'level': log['level'],
'message': log['message']
})
# Сортировка по времени для каждого request
for req_id in request_logs:
request_logs[req_id].sort(key=lambda x: x['timestamp'])
# Находим запросы с ERROR в цепочке
failed_requests = {
req_id: logs for req_id, logs in request_logs.items()
if any(l['level'] == 'ERROR' for l in logs)
}
return failed_requests
Практична реалізація
ELK Stack + ML Layer:
- Elasticsearch: зберігання та пошук логів
- Logstash/Fluent Bit: collection та форматування
- Kibana: базова візуалізація
- ML Layer (Python FastAPI): Drain3 + anomaly detection + severity classifier
Kafka для streaming:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'logs-topic',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
template_monitor = TemplateFrequencyMonitor()
anomaly_detector = SemanticLogAnomalyDetector()
for message in consumer:
log_entry = message.value
# Парсинг
parsed = miner.add_log_message(log_entry['message'])
# Частотная аномалия
freq_anomalies = template_monitor.update_single(parsed['template_mined'])
# Семантическая аномалия
is_anomaly, score = anomaly_detector.detect(log_entry['message'])
if freq_anomalies or is_anomaly:
send_to_alert_manager(log_entry, freq_anomalies, score)
Терміни: Drain3 парсинг + частотна аномалія + Elasticsearch інтеграція - 3-4 тижні. Семантична аномалія (BERT), sequence detection, cross-service correlation, severity classification - 2-3 місяці.







