Designing the ML pipeline architecture
An ML pipeline is a reproducible sequence of steps from raw data to a working model in production. Without a formal architecture, an ML project turns into notebooks on a data scientist's laptop that "work for me." A proper architecture makes models reproducible, versionable, and scalable.
Production ML Pipeline Components
Each production ML pipeline consists of several independently deployable stages:
Raw Data → [Data Ingestion] → [Feature Engineering] → [Training] → [Evaluation] → [Registry] → [Serving]
↑ ↓
[Feature Store] [Monitoring]
Data Ingestion Layer: ingesting data from sources (S3, databases, streaming). Idempotency—restarting doesn't create duplicates. Partitioning by date for incremental recalculations.
Feature Engineering Layer: transformations are reproducible and tested. Separation between online features (computed in real time) and batch features (precomputed). The feature store is a single source of truth.
Training Layer: hyperparameter search, cross-validation, experiment logging. Checkpoints for long training runs.
Evaluation Layer: Automatic comparison with the baseline and champion models. Registration is rejected if metrics degrade.
Model Registry: model versioning, metadata, statuses (staging/production/archived).
Serving Layer: inference service with latency, throughput, drift monitoring.
Choosing an orchestrator
| Task | Recommended tool |
|---|---|
| ML pipelines (simple) | Apache Airflow |
| ML pipelines (native) | Kubeflow Pipelines, ZenML |
| Data engineering | Prefect, Dagster |
| Experiments | MLflow, W&B |
| Feature store | Feast, Hopsworks |
Typical implementation with ZenML
from zenml import step, pipeline
from zenml.steps import Output
@step
def data_ingestion(
source_path: str,
start_date: str,
end_date: str
) -> Output(data=pd.DataFrame):
"""Загрузка данных за период. Идемпотентно."""
return load_from_s3(source_path, start_date, end_date)
@step
def feature_engineering(
data: pd.DataFrame
) -> Output(features=pd.DataFrame, feature_metadata=dict):
"""Трансформации. Те же трансформации применяются при инференсе."""
transformer = FeatureTransformer()
features = transformer.fit_transform(data)
# Сохраняем артефакт трансформера для serving
return features, transformer.get_metadata()
@step
def model_training(
features: pd.DataFrame,
hyperparams: dict
) -> Output(model=Any, metrics=dict):
model = XGBClassifier(**hyperparams)
X, y = split_features_target(features)
model.fit(X, y)
metrics = evaluate_model(model, X, y)
return model, metrics
@step
def model_evaluation(
model: Any,
metrics: dict,
baseline_metrics: dict
) -> Output(passed=bool):
"""Не пропускаем модель в registry если хуже baseline."""
return metrics["f1"] > baseline_metrics["f1"] * 0.99
@pipeline
def training_pipeline(source: str, start_date: str, end_date: str, hyperparams: dict):
data = data_ingestion(source, start_date, end_date)
features, feature_metadata = feature_engineering(data)
model, metrics = model_training(features, hyperparams)
passed = model_evaluation(model, metrics, load_baseline_metrics())
if passed:
register_model(model, metrics, feature_metadata)
Feature Store - Preventing Training-Serving Skew
Training-serving skew is the main source of degradation for ML models in production. The reason: features are considered differently during training and inference. A feature store solves this: a single codebase defines a feature, and it's used for both training and serving.
from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float64, Int64
# Определение один раз
customer_stats = FeatureView(
name="customer_stats",
entities=["customer_id"],
ttl=timedelta(days=1),
schema=[
Field(name="total_purchases_7d", dtype=Float64),
Field(name="avg_order_value", dtype=Float64),
Field(name="days_since_last_purchase", dtype=Int64),
],
source=customer_stats_batch_source,
)
# При обучении
training_df = store.get_historical_features(
entity_df=entity_df,
features=["customer_stats:total_purchases_7d", "customer_stats:avg_order_value"]
).to_df()
# При инференсе — те же фичи, те же вычисления
online_features = store.get_online_features(
features=["customer_stats:total_purchases_7d"],
entity_rows=[{"customer_id": "12345"}]
).to_dict()
Data Versioning and Reproducibility
Each pipeline launch must be reproducible within 6 months. Requirements:
- Data versioning: DVC or Delta Lake (time travel)
- Code versioning: git commit hash in model metadata
- Environment versioning: Docker image digest
- Configuration versioning: parameters in YAML, not in code
# experiment_config.yaml — все параметры в одном месте
data:
source: s3://bucket/data/
start_date: "2024-01-01"
end_date: "2024-12-31"
version: "v2.3"
features:
categorical_encoding: "ordinal"
numerical_scaling: "standard"
handle_missing: "median"
model:
type: "lgbm"
n_estimators: 500
learning_rate: 0.05
max_depth: 6
num_leaves: 31
CI/CD for ML pipelines
# .github/workflows/ml-pipeline.yml
on:
push:
paths:
- 'pipelines/**'
- 'features/**'
jobs:
test-and-train:
steps:
- name: Unit tests для feature engineering
run: pytest tests/features/ -v
- name: Integration test на subset данных
run: python run_pipeline.py --mode=test --data-fraction=0.01
- name: Full training run
if: github.ref == 'refs/heads/main'
run: python run_pipeline.py --mode=full
- name: Model evaluation gate
run: python evaluate_model.py --fail-on-degradation
Pipeline monitoring
Metrics to monitor: execution time of each step, volume of processed data, input data distribution (data drift), model metrics on the validation set. Alerts for: step failure, abnormal change in data metrics, model metrics degradation below the threshold.
Design deadlines
Week 1–2: Audit of the existing ML stack, selection of tools, architecture design
Week 3-4: Implementing a basic pipeline for one model
Month 2: Feature store, model registry, evaluation gate
Month 3: CI/CD, monitoring, documentation. Migrating the second model to the new architecture







