Налаштування Kubeflow для ML-пайплайнів

Проектуємо та впроваджуємо системи штучного інтелекту: від прототипу до production-ready рішення. Наша команда поєднує експертизу в машинному навчанні, дата-інжинірингу та MLOps, щоб AI працював не в лабораторії, а в реальному бізнесі.
Показано 1 з 1Усі 1566 послуг
Налаштування Kubeflow для ML-пайплайнів
Складний
~5 днів
Часті запитання

Напрямки AI-розробки

Етапи розробки AI-рішення

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    902
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1123
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    590
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    860

Налаштування Kubeflow Pipelines для ML-пайплайнів

Kubeflow Pipelines (KFP) - Kubernetes-нативна платформа для оркестрації ML-пайплайнів. Кожен крок пайплайна - окремий контейнер, що забезпечує репродуктивність, ізоляцію та паралельне виконання кроків.

Установка Kubeflow

# Установка на существующий K8s кластер
export PIPELINE_VERSION=2.2.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"

# Проверка
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
# → http://localhost:8080

Створення ML-пайплайну

import kfp
from kfp import dsl
from kfp.dsl import component, pipeline, Input, Output, Dataset, Model, Metrics

@component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "boto3"]
)
def prepare_data(
    data_path: str,
    output_dataset: Output[Dataset],
    test_size: float = 0.2
):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_parquet(data_path)
    train, test = train_test_split(df, test_size=test_size, random_state=42)

    train.to_parquet(output_dataset.path + "/train.parquet")
    test.to_parquet(output_dataset.path + "/test.parquet")

@component(
    base_image="python:3.11-slim",
    packages_to_install=["lightgbm", "pandas", "scikit-learn", "mlflow"]
)
def train_model(
    dataset: Input[Dataset],
    model_output: Output[Model],
    metrics_output: Output[Metrics],
    learning_rate: float = 0.05,
    n_estimators: int = 500
):
    import pandas as pd
    from lightgbm import LGBMClassifier
    from sklearn.metrics import f1_score, roc_auc_score

    train = pd.read_parquet(dataset.path + "/train.parquet")
    test = pd.read_parquet(dataset.path + "/test.parquet")

    X_train, y_train = train.drop("target", axis=1), train["target"]
    X_test, y_test = test.drop("target", axis=1), test["target"]

    model = LGBMClassifier(learning_rate=learning_rate, n_estimators=n_estimators)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    f1 = f1_score(y_test, y_pred)
    auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])

    metrics_output.log_metric("f1", f1)
    metrics_output.log_metric("auc", auc)

    import joblib
    joblib.dump(model, model_output.path + "/model.pkl")

@component(base_image="python:3.11-slim",
           packages_to_install=["lightgbm", "mlflow", "boto3"])
def register_model(
    model: Input[Model],
    metrics: Input[Metrics],
    model_name: str,
    min_f1: float = 0.90
) -> bool:
    f1 = metrics.metadata.get("f1", 0)
    if f1 < min_f1:
        print(f"Model F1={f1:.3f} below threshold {min_f1}, skipping registration")
        return False

    import mlflow
    mlflow.set_tracking_uri("http://mlflow.mlops.svc.cluster.local:5000")
    mlflow.sklearn.log_model(
        joblib.load(model.path + "/model.pkl"),
        artifact_path="model",
        registered_model_name=model_name
    )
    return True

@pipeline(name="fraud-detection-training", description="Full training pipeline")
def fraud_detection_pipeline(
    data_path: str = "s3://bucket/fraud-data/v2.3/",
    model_name: str = "fraud-detector",
    learning_rate: float = 0.05,
    n_estimators: int = 500,
    min_f1: float = 0.90
):
    # Шаги выполняются последовательно (автоматически по зависимостям данных)
    data_task = prepare_data(data_path=data_path)

    train_task = train_model(
        dataset=data_task.outputs["output_dataset"],
        learning_rate=learning_rate,
        n_estimators=n_estimators
    )
    # GPU для обучения
    train_task.set_accelerator_type("NVIDIA_GPU").set_accelerator_limit(1)

    register_model(
        model=train_task.outputs["model_output"],
        metrics=train_task.outputs["metrics_output"],
        model_name=model_name,
        min_f1=min_f1
    )

# Компиляция в YAML
kfp.compiler.Compiler().compile(fraud_detection_pipeline, "pipeline.yaml")

Запуск пайплайну

client = kfp.Client(host="http://ml-pipeline-ui.kubeflow.svc.cluster.local:80")

# Разовый запуск
run = client.create_run_from_pipeline_func(
    fraud_detection_pipeline,
    arguments={"learning_rate": 0.03, "n_estimators": 1000},
    run_name="experiment-47"
)

# Регулярный запуск по расписанию
client.create_recurring_run(
    experiment_id=experiment.id,
    job_name="weekly-retrain",
    cron_expression="0 2 * * 1",  # каждый понедельник в 2:00
    pipeline_func=fraud_detection_pipeline,
    arguments={"data_path": "s3://bucket/fraud-data/latest/"}
)

Кешування кроків

KFP автоматично кешує результати кроків: якщо вхідні дані та код не змінилися – step пропускається, використовуються попередні результати. Економія при повторних експериментах із однаковими даними.

Моніторинг пайплайнів

Kubeflow UI показує: граф виконання пайплайну зі статусами кожного кроку, логи кожного контейнера, артефакти, метрики. Prometheus метрики: kubeflow_pipeline_run_duration_seconds, kubeflow_pipeline_step_count.

Терміни налаштування

Тиждень 1: Установка Kubeflow, перший простий пайплайн, перевірка GPU доступу

Тиждень 2: Інтеграція з MLflow, S3 для артефактів, параметризація

Тиждень 3–4: Caching, scheduled runs, unit тести компонентів

Місяць 2: Multi-GPU training, hyperparameter sweeps, production deployment