Kubeflow ML Pipeline Setup and Configuration

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
Kubeflow ML Pipeline Setup and Configuration
Complex
~5 business days
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1243
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1170
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    873
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1086
  • image_logo-advance_0.png
    B2B Advance company logo design
    563
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    830

Configuring Kubeflow Pipelines for ML Pipelines

Kubeflow Pipelines (KFP) is a Kubernetes-native platform for orchestrating ML pipelines. Each pipeline step is a separate container, ensuring reproducibility, isolation, and parallel execution of steps.

Installing Kubeflow

# Установка на существующий K8s кластер
export PIPELINE_VERSION=2.2.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"

# Проверка
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
# → http://localhost:8080

Creating an ML pipeline

import kfp
from kfp import dsl
from kfp.dsl import component, pipeline, Input, Output, Dataset, Model, Metrics

@component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "boto3"]
)
def prepare_data(
    data_path: str,
    output_dataset: Output[Dataset],
    test_size: float = 0.2
):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_parquet(data_path)
    train, test = train_test_split(df, test_size=test_size, random_state=42)

    train.to_parquet(output_dataset.path + "/train.parquet")
    test.to_parquet(output_dataset.path + "/test.parquet")

@component(
    base_image="python:3.11-slim",
    packages_to_install=["lightgbm", "pandas", "scikit-learn", "mlflow"]
)
def train_model(
    dataset: Input[Dataset],
    model_output: Output[Model],
    metrics_output: Output[Metrics],
    learning_rate: float = 0.05,
    n_estimators: int = 500
):
    import pandas as pd
    from lightgbm import LGBMClassifier
    from sklearn.metrics import f1_score, roc_auc_score

    train = pd.read_parquet(dataset.path + "/train.parquet")
    test = pd.read_parquet(dataset.path + "/test.parquet")

    X_train, y_train = train.drop("target", axis=1), train["target"]
    X_test, y_test = test.drop("target", axis=1), test["target"]

    model = LGBMClassifier(learning_rate=learning_rate, n_estimators=n_estimators)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    f1 = f1_score(y_test, y_pred)
    auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])

    metrics_output.log_metric("f1", f1)
    metrics_output.log_metric("auc", auc)

    import joblib
    joblib.dump(model, model_output.path + "/model.pkl")

@component(base_image="python:3.11-slim",
           packages_to_install=["lightgbm", "mlflow", "boto3"])
def register_model(
    model: Input[Model],
    metrics: Input[Metrics],
    model_name: str,
    min_f1: float = 0.90
) -> bool:
    f1 = metrics.metadata.get("f1", 0)
    if f1 < min_f1:
        print(f"Model F1={f1:.3f} below threshold {min_f1}, skipping registration")
        return False

    import mlflow
    mlflow.set_tracking_uri("http://mlflow.mlops.svc.cluster.local:5000")
    mlflow.sklearn.log_model(
        joblib.load(model.path + "/model.pkl"),
        artifact_path="model",
        registered_model_name=model_name
    )
    return True

@pipeline(name="fraud-detection-training", description="Full training pipeline")
def fraud_detection_pipeline(
    data_path: str = "s3://bucket/fraud-data/v2.3/",
    model_name: str = "fraud-detector",
    learning_rate: float = 0.05,
    n_estimators: int = 500,
    min_f1: float = 0.90
):
    # Шаги выполняются последовательно (автоматически по зависимостям данных)
    data_task = prepare_data(data_path=data_path)

    train_task = train_model(
        dataset=data_task.outputs["output_dataset"],
        learning_rate=learning_rate,
        n_estimators=n_estimators
    )
    # GPU для обучения
    train_task.set_accelerator_type("NVIDIA_GPU").set_accelerator_limit(1)

    register_model(
        model=train_task.outputs["model_output"],
        metrics=train_task.outputs["metrics_output"],
        model_name=model_name,
        min_f1=min_f1
    )

# Компиляция в YAML
kfp.compiler.Compiler().compile(fraud_detection_pipeline, "pipeline.yaml")

Launching the pipeline

client = kfp.Client(host="http://ml-pipeline-ui.kubeflow.svc.cluster.local:80")

# Разовый запуск
run = client.create_run_from_pipeline_func(
    fraud_detection_pipeline,
    arguments={"learning_rate": 0.03, "n_estimators": 1000},
    run_name="experiment-47"
)

# Регулярный запуск по расписанию
client.create_recurring_run(
    experiment_id=experiment.id,
    job_name="weekly-retrain",
    cron_expression="0 2 * * 1",  # каждый понедельник в 2:00
    pipeline_func=fraud_detection_pipeline,
    arguments={"data_path": "s3://bucket/fraud-data/latest/"}
)

Caching steps

KFP automatically caches step results: if the input data and code haven't changed, the step is skipped and the previous results are used. This saves time when repeating experiments with the same data.

Pipeline monitoring

The Kubeflow UI displays a pipeline execution graph with statuses for each step, logs for each container, artifacts, and metrics. Prometheus metrics: kubeflow_pipeline_run_duration_seconds, kubeflow_pipeline_step_count.

Setup times

Week 1: Installing Kubeflow, first simple pipeline, checking GPU access

Week 2: Integration with MLflow, S3 for artifacts, parameterization

Week 3–4: Caching, scheduled runs, component unit tests

Month 2: Multi-GPU training, hyperparameter sweeps, production deployment