ML Pipeline Architecture Design

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
ML Pipeline Architecture Design
Complex
~3-5 business days
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1212
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822

Designing the ML pipeline architecture

An ML pipeline is a reproducible sequence of steps from raw data to a working model in production. Without a formal architecture, an ML project turns into notebooks on a data scientist's laptop that "work for me." A proper architecture makes models reproducible, versionable, and scalable.

Production ML Pipeline Components

Each production ML pipeline consists of several independently deployable stages:

Raw Data → [Data Ingestion] → [Feature Engineering] → [Training] → [Evaluation] → [Registry] → [Serving]
                                        ↑                                                           ↓
                              [Feature Store]                                          [Monitoring]

Data Ingestion Layer: ingesting data from sources (S3, databases, streaming). Idempotency—restarting doesn't create duplicates. Partitioning by date for incremental recalculations.

Feature Engineering Layer: transformations are reproducible and tested. Separation between online features (computed in real time) and batch features (precomputed). The feature store is a single source of truth.

Training Layer: hyperparameter search, cross-validation, experiment logging. Checkpoints for long training runs.

Evaluation Layer: Automatic comparison with the baseline and champion models. Registration is rejected if metrics degrade.

Model Registry: model versioning, metadata, statuses (staging/production/archived).

Serving Layer: inference service with latency, throughput, drift monitoring.

Choosing an orchestrator

Task Recommended tool
ML pipelines (simple) Apache Airflow
ML pipelines (native) Kubeflow Pipelines, ZenML
Data engineering Prefect, Dagster
Experiments MLflow, W&B
Feature store Feast, Hopsworks

Typical implementation with ZenML

from zenml import step, pipeline
from zenml.steps import Output

@step
def data_ingestion(
    source_path: str,
    start_date: str,
    end_date: str
) -> Output(data=pd.DataFrame):
    """Загрузка данных за период. Идемпотентно."""
    return load_from_s3(source_path, start_date, end_date)

@step
def feature_engineering(
    data: pd.DataFrame
) -> Output(features=pd.DataFrame, feature_metadata=dict):
    """Трансформации. Те же трансформации применяются при инференсе."""
    transformer = FeatureTransformer()
    features = transformer.fit_transform(data)
    # Сохраняем артефакт трансформера для serving
    return features, transformer.get_metadata()

@step
def model_training(
    features: pd.DataFrame,
    hyperparams: dict
) -> Output(model=Any, metrics=dict):
    model = XGBClassifier(**hyperparams)
    X, y = split_features_target(features)
    model.fit(X, y)
    metrics = evaluate_model(model, X, y)
    return model, metrics

@step
def model_evaluation(
    model: Any,
    metrics: dict,
    baseline_metrics: dict
) -> Output(passed=bool):
    """Не пропускаем модель в registry если хуже baseline."""
    return metrics["f1"] > baseline_metrics["f1"] * 0.99

@pipeline
def training_pipeline(source: str, start_date: str, end_date: str, hyperparams: dict):
    data = data_ingestion(source, start_date, end_date)
    features, feature_metadata = feature_engineering(data)
    model, metrics = model_training(features, hyperparams)
    passed = model_evaluation(model, metrics, load_baseline_metrics())
    if passed:
        register_model(model, metrics, feature_metadata)

Feature Store - Preventing Training-Serving Skew

Training-serving skew is the main source of degradation for ML models in production. The reason: features are considered differently during training and inference. A feature store solves this: a single codebase defines a feature, and it's used for both training and serving.

from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float64, Int64

# Определение один раз
customer_stats = FeatureView(
    name="customer_stats",
    entities=["customer_id"],
    ttl=timedelta(days=1),
    schema=[
        Field(name="total_purchases_7d", dtype=Float64),
        Field(name="avg_order_value", dtype=Float64),
        Field(name="days_since_last_purchase", dtype=Int64),
    ],
    source=customer_stats_batch_source,
)

# При обучении
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=["customer_stats:total_purchases_7d", "customer_stats:avg_order_value"]
).to_df()

# При инференсе — те же фичи, те же вычисления
online_features = store.get_online_features(
    features=["customer_stats:total_purchases_7d"],
    entity_rows=[{"customer_id": "12345"}]
).to_dict()

Data Versioning and Reproducibility

Each pipeline launch must be reproducible within 6 months. Requirements:

  • Data versioning: DVC or Delta Lake (time travel)
  • Code versioning: git commit hash in model metadata
  • Environment versioning: Docker image digest
  • Configuration versioning: parameters in YAML, not in code
# experiment_config.yaml — все параметры в одном месте
data:
  source: s3://bucket/data/
  start_date: "2024-01-01"
  end_date: "2024-12-31"
  version: "v2.3"

features:
  categorical_encoding: "ordinal"
  numerical_scaling: "standard"
  handle_missing: "median"

model:
  type: "lgbm"
  n_estimators: 500
  learning_rate: 0.05
  max_depth: 6
  num_leaves: 31

CI/CD for ML pipelines

# .github/workflows/ml-pipeline.yml
on:
  push:
    paths:
      - 'pipelines/**'
      - 'features/**'

jobs:
  test-and-train:
    steps:
      - name: Unit tests для feature engineering
        run: pytest tests/features/ -v

      - name: Integration test на subset данных
        run: python run_pipeline.py --mode=test --data-fraction=0.01

      - name: Full training run
        if: github.ref == 'refs/heads/main'
        run: python run_pipeline.py --mode=full

      - name: Model evaluation gate
        run: python evaluate_model.py --fail-on-degradation

Pipeline monitoring

Metrics to monitor: execution time of each step, volume of processed data, input data distribution (data drift), model metrics on the validation set. Alerts for: step failure, abnormal change in data metrics, model metrics degradation below the threshold.

Design deadlines

Week 1–2: Audit of the existing ML stack, selection of tools, architecture design

Week 3-4: Implementing a basic pipeline for one model

Month 2: Feature store, model registry, evaluation gate

Month 3: CI/CD, monitoring, documentation. Migrating the second model to the new architecture