Реализация AI-обработки потоков данных для ML-пайплайнов
Потоковая обработка для ML — это инференс моделей на непрерывном потоке событий с задержкой менее 100 мс. Fraud detection, real-time рекомендации, динамическое ценообразование — все они требуют онлайн-вычисления признаков и инференса в одном пайплайне без батчевых задержек.
Архитектура потокового ML-пайплайна
[Kafka / Kinesis / Pulsar]
↓
[Feature Computation] ← Flink / Spark Streaming / Kafka Streams
(агрегации, окна, joins)
↓
[Feature Store Online] ← Redis / DynamoDB (< 5ms lookup)
↓
[Model Inference] ← Triton / TorchServe / ONNX Runtime
(< 20ms)
↓
[Decision Engine] ← бизнес-правила + ML score
↓
[Action / Output Kafka] ← downstream системы
Kafka Streams + онлайн-признаки
from confluent_kafka import Consumer, Producer
import json
import redis
import numpy as np
import time
from collections import deque, defaultdict
import threading
class StreamFeatureComputer:
"""Вычисление признаков в реальном времени"""
def __init__(self, kafka_config: dict, redis_url: str):
self.consumer = Consumer(kafka_config)
self.producer = Producer({'bootstrap.servers': kafka_config['bootstrap.servers']})
self.redis = redis.from_url(redis_url)
self.window_store = defaultdict(lambda: deque(maxlen=1000))
def compute_user_features(self, user_id: str, event: dict) -> dict:
"""Online-признаки для пользователя"""
key_prefix = f"user:{user_id}"
now = event['timestamp']
# Sliding window агрегации через Redis
pipe = self.redis.pipeline()
# Транзакционные признаки
event_key = f"{key_prefix}:events"
pipe.lpush(event_key, json.dumps({
'amount': event.get('amount', 0),
'ts': now,
'type': event.get('type', 'unknown')
}))
pipe.ltrim(event_key, 0, 999) # Держим последние 1000 событий
pipe.expire(event_key, 86400) # TTL 24 часа
pipe.execute()
# Агрегации за разные окна
raw_events = self.redis.lrange(event_key, 0, -1)
events = [json.loads(e) for e in raw_events]
# Сортировка по времени
events.sort(key=lambda x: x['ts'], reverse=True)
window_1h = [e for e in events if now - e['ts'] <= 3600]
window_24h = [e for e in events if now - e['ts'] <= 86400]
amounts_1h = [e['amount'] for e in window_1h]
amounts_24h = [e['amount'] for e in window_24h]
features = {
'user_id': user_id,
'tx_count_1h': len(window_1h),
'tx_count_24h': len(window_24h),
'tx_amount_sum_1h': sum(amounts_1h),
'tx_amount_sum_24h': sum(amounts_24h),
'tx_amount_avg_1h': np.mean(amounts_1h) if amounts_1h else 0,
'tx_amount_max_1h': max(amounts_1h) if amounts_1h else 0,
'tx_amount_std_1h': np.std(amounts_1h) if len(amounts_1h) > 1 else 0,
'unique_merchants_1h': len(set(e.get('merchant_id') for e in window_1h)),
'time_since_last_tx': now - events[0]['ts'] if events else 9999,
}
return features
def compute_velocity_features(self, entity_id: str,
event_type: str,
windows: list[int] = [60, 300, 3600]) -> dict:
"""Velocity checks: частота событий за разные окна"""
features = {}
now = int(time.time())
for window in windows:
key = f"velocity:{entity_id}:{event_type}:{window}"
# Increment и expire
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, window)
count, _ = pipe.execute()
features[f"count_{window}s"] = count
return features
Потоковый инференс
import onnxruntime as ort
import asyncio
from aiohttp import ClientSession
class StreamMLInference:
"""Низколатентный инференс в потоке"""
def __init__(self, model_path: str, feature_store: redis.Redis):
# ONNX для максимальной скорости
opts = ort.SessionOptions()
opts.inter_op_num_threads = 2
opts.intra_op_num_threads = 2
opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.session = ort.InferenceSession(
model_path,
sess_options=opts,
providers=['CPUExecutionProvider']
)
self.feature_store = feature_store
self.input_names = [inp.name for inp in self.session.get_inputs()]
def predict(self, features: dict) -> dict:
"""Инференс < 5ms для tabular модели"""
# Формирование input tensor
feature_vector = np.array([[
features.get(name, 0.0) for name in self.input_names
]], dtype=np.float32)
start = time.perf_counter()
outputs = self.session.run(None, {self.input_names[0]: feature_vector})
latency_ms = (time.perf_counter() - start) * 1000
score = float(outputs[0][0][1]) # Probability of positive class
return {
'score': score,
'decision': 'block' if score > 0.8 else 'review' if score > 0.5 else 'allow',
'latency_ms': latency_ms
}
def batch_predict(self, features_list: list[dict]) -> list[dict]:
"""Батч-инференс для микробатчей"""
if not features_list:
return []
feature_matrix = np.array([
[f.get(name, 0.0) for name in self.input_names]
for f in features_list
], dtype=np.float32)
outputs = self.session.run(None, {self.input_names[0]: feature_matrix})
scores = outputs[0][:, 1].tolist()
return [
{'score': s, 'decision': 'block' if s > 0.8 else 'review' if s > 0.5 else 'allow'}
for s in scores
]
Apache Flink пайплайн (Python API)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink
from pyflink.common import WatermarkStrategy, Types
from pyflink.datastream.window import TumblingEventTimeWindows, SlidingEventTimeWindows
from pyflink.common.time import Time
def build_flink_ml_pipeline():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Kafka source
source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("transactions") \
.set_group_id("ml-pipeline") \
.set_value_only_deserializer(JsonRowDeserializationSchema()) \
.build()
stream = env.from_source(
source,
WatermarkStrategy.for_monotonous_timestamps(),
"Kafka Source"
)
# Вычисление агрегатов за 5-минутное скользящее окно
windowed = stream \
.key_by(lambda event: event['user_id']) \
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(30))) \
.aggregate(TransactionAggregator())
# Присоединение к static features из базы
enriched = windowed.map(EnrichWithStaticFeatures())
# ML инференс
scored = enriched.map(MLScoringFunction())
# Sink: действия в реальном времени
sink = KafkaSink.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_record_serializer(JsonRowSerializationSchema("ml-decisions")) \
.build()
scored.sink_to(sink)
env.execute("ML Streaming Pipeline")
Мониторинг потокового пайплайна
class StreamPipelineMonitor:
"""Метрики для real-time ML пайплайна"""
def __init__(self, prometheus_port: int = 8000):
from prometheus_client import Counter, Histogram, Gauge, start_http_server
self.events_processed = Counter('ml_events_total', 'Total events processed',
['decision'])
self.inference_latency = Histogram('ml_inference_latency_ms',
'Inference latency in milliseconds',
buckets=[1, 5, 10, 20, 50, 100, 500])
self.feature_lag = Gauge('feature_store_lag_ms',
'Time between event and feature availability')
self.model_score_dist = Histogram('ml_model_score',
'Distribution of model scores',
buckets=[0.1*i for i in range(11)])
start_http_server(prometheus_port)
def record_inference(self, result: dict):
self.events_processed.labels(decision=result['decision']).inc()
self.inference_latency.observe(result.get('latency_ms', 0))
self.model_score_dist.observe(result['score'])
Производительность по сценариям
| Сценарий | Throughput | P99 Latency | Инфраструктура |
|---|---|---|---|
| Fraud detection | 50K events/sec | 45ms | 4 CPU pods |
| Real-time recsys | 10K events/sec | 80ms | 8 CPU pods |
| Dynamic pricing | 2K events/sec | 150ms | 2 CPU + Redis |
| Content moderation | 500 items/sec | 200ms | 2 GPU pods |
Инициализация типового потокового ML-пайплайна (Kafka + Flink + Redis Feature Store + ONNX) занимает 3-4 недели разработки. Latency бюджет: 20ms на вычисление признаков + 5ms инференс + 10ms доставка решения = 35ms end-to-end.







