Налаштування Kubeflow Pipelines для ML-пайплайнів
Kubeflow Pipelines (KFP) - Kubernetes-нативна платформа для оркестрації ML-пайплайнів. Кожен крок пайплайна - окремий контейнер, що забезпечує репродуктивність, ізоляцію та паралельне виконання кроків.
Установка Kubeflow
# Установка на существующий K8s кластер
export PIPELINE_VERSION=2.2.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"
# Проверка
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
# → http://localhost:8080
Створення ML-пайплайну
import kfp
from kfp import dsl
from kfp.dsl import component, pipeline, Input, Output, Dataset, Model, Metrics
@component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "boto3"]
)
def prepare_data(
data_path: str,
output_dataset: Output[Dataset],
test_size: float = 0.2
):
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_parquet(data_path)
train, test = train_test_split(df, test_size=test_size, random_state=42)
train.to_parquet(output_dataset.path + "/train.parquet")
test.to_parquet(output_dataset.path + "/test.parquet")
@component(
base_image="python:3.11-slim",
packages_to_install=["lightgbm", "pandas", "scikit-learn", "mlflow"]
)
def train_model(
dataset: Input[Dataset],
model_output: Output[Model],
metrics_output: Output[Metrics],
learning_rate: float = 0.05,
n_estimators: int = 500
):
import pandas as pd
from lightgbm import LGBMClassifier
from sklearn.metrics import f1_score, roc_auc_score
train = pd.read_parquet(dataset.path + "/train.parquet")
test = pd.read_parquet(dataset.path + "/test.parquet")
X_train, y_train = train.drop("target", axis=1), train["target"]
X_test, y_test = test.drop("target", axis=1), test["target"]
model = LGBMClassifier(learning_rate=learning_rate, n_estimators=n_estimators)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
metrics_output.log_metric("f1", f1)
metrics_output.log_metric("auc", auc)
import joblib
joblib.dump(model, model_output.path + "/model.pkl")
@component(base_image="python:3.11-slim",
packages_to_install=["lightgbm", "mlflow", "boto3"])
def register_model(
model: Input[Model],
metrics: Input[Metrics],
model_name: str,
min_f1: float = 0.90
) -> bool:
f1 = metrics.metadata.get("f1", 0)
if f1 < min_f1:
print(f"Model F1={f1:.3f} below threshold {min_f1}, skipping registration")
return False
import mlflow
mlflow.set_tracking_uri("http://mlflow.mlops.svc.cluster.local:5000")
mlflow.sklearn.log_model(
joblib.load(model.path + "/model.pkl"),
artifact_path="model",
registered_model_name=model_name
)
return True
@pipeline(name="fraud-detection-training", description="Full training pipeline")
def fraud_detection_pipeline(
data_path: str = "s3://bucket/fraud-data/v2.3/",
model_name: str = "fraud-detector",
learning_rate: float = 0.05,
n_estimators: int = 500,
min_f1: float = 0.90
):
# Шаги выполняются последовательно (автоматически по зависимостям данных)
data_task = prepare_data(data_path=data_path)
train_task = train_model(
dataset=data_task.outputs["output_dataset"],
learning_rate=learning_rate,
n_estimators=n_estimators
)
# GPU для обучения
train_task.set_accelerator_type("NVIDIA_GPU").set_accelerator_limit(1)
register_model(
model=train_task.outputs["model_output"],
metrics=train_task.outputs["metrics_output"],
model_name=model_name,
min_f1=min_f1
)
# Компиляция в YAML
kfp.compiler.Compiler().compile(fraud_detection_pipeline, "pipeline.yaml")
Запуск пайплайну
client = kfp.Client(host="http://ml-pipeline-ui.kubeflow.svc.cluster.local:80")
# Разовый запуск
run = client.create_run_from_pipeline_func(
fraud_detection_pipeline,
arguments={"learning_rate": 0.03, "n_estimators": 1000},
run_name="experiment-47"
)
# Регулярный запуск по расписанию
client.create_recurring_run(
experiment_id=experiment.id,
job_name="weekly-retrain",
cron_expression="0 2 * * 1", # каждый понедельник в 2:00
pipeline_func=fraud_detection_pipeline,
arguments={"data_path": "s3://bucket/fraud-data/latest/"}
)
Кешування кроків
KFP автоматично кешує результати кроків: якщо вхідні дані та код не змінилися – step пропускається, використовуються попередні результати. Економія при повторних експериментах із однаковими даними.
Моніторинг пайплайнів
Kubeflow UI показує: граф виконання пайплайну зі статусами кожного кроку, логи кожного контейнера, артефакти, метрики. Prometheus метрики: kubeflow_pipeline_run_duration_seconds, kubeflow_pipeline_step_count.
Терміни налаштування
Тиждень 1: Установка Kubeflow, перший простий пайплайн, перевірка GPU доступу
Тиждень 2: Інтеграція з MLflow, S3 для артефактів, параметризація
Тиждень 3–4: Caching, scheduled runs, unit тести компонентів
Місяць 2: Multi-GPU training, hyperparameter sweeps, production deployment







