Configuring Kubeflow Pipelines for ML Pipelines
Kubeflow Pipelines (KFP) is a Kubernetes-native platform for orchestrating ML pipelines. Each pipeline step is a separate container, ensuring reproducibility, isolation, and parallel execution of steps.
Installing Kubeflow
# Установка на существующий K8s кластер
export PIPELINE_VERSION=2.2.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"
# Проверка
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
# → http://localhost:8080
Creating an ML pipeline
import kfp
from kfp import dsl
from kfp.dsl import component, pipeline, Input, Output, Dataset, Model, Metrics
@component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "boto3"]
)
def prepare_data(
data_path: str,
output_dataset: Output[Dataset],
test_size: float = 0.2
):
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_parquet(data_path)
train, test = train_test_split(df, test_size=test_size, random_state=42)
train.to_parquet(output_dataset.path + "/train.parquet")
test.to_parquet(output_dataset.path + "/test.parquet")
@component(
base_image="python:3.11-slim",
packages_to_install=["lightgbm", "pandas", "scikit-learn", "mlflow"]
)
def train_model(
dataset: Input[Dataset],
model_output: Output[Model],
metrics_output: Output[Metrics],
learning_rate: float = 0.05,
n_estimators: int = 500
):
import pandas as pd
from lightgbm import LGBMClassifier
from sklearn.metrics import f1_score, roc_auc_score
train = pd.read_parquet(dataset.path + "/train.parquet")
test = pd.read_parquet(dataset.path + "/test.parquet")
X_train, y_train = train.drop("target", axis=1), train["target"]
X_test, y_test = test.drop("target", axis=1), test["target"]
model = LGBMClassifier(learning_rate=learning_rate, n_estimators=n_estimators)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
metrics_output.log_metric("f1", f1)
metrics_output.log_metric("auc", auc)
import joblib
joblib.dump(model, model_output.path + "/model.pkl")
@component(base_image="python:3.11-slim",
packages_to_install=["lightgbm", "mlflow", "boto3"])
def register_model(
model: Input[Model],
metrics: Input[Metrics],
model_name: str,
min_f1: float = 0.90
) -> bool:
f1 = metrics.metadata.get("f1", 0)
if f1 < min_f1:
print(f"Model F1={f1:.3f} below threshold {min_f1}, skipping registration")
return False
import mlflow
mlflow.set_tracking_uri("http://mlflow.mlops.svc.cluster.local:5000")
mlflow.sklearn.log_model(
joblib.load(model.path + "/model.pkl"),
artifact_path="model",
registered_model_name=model_name
)
return True
@pipeline(name="fraud-detection-training", description="Full training pipeline")
def fraud_detection_pipeline(
data_path: str = "s3://bucket/fraud-data/v2.3/",
model_name: str = "fraud-detector",
learning_rate: float = 0.05,
n_estimators: int = 500,
min_f1: float = 0.90
):
# Шаги выполняются последовательно (автоматически по зависимостям данных)
data_task = prepare_data(data_path=data_path)
train_task = train_model(
dataset=data_task.outputs["output_dataset"],
learning_rate=learning_rate,
n_estimators=n_estimators
)
# GPU для обучения
train_task.set_accelerator_type("NVIDIA_GPU").set_accelerator_limit(1)
register_model(
model=train_task.outputs["model_output"],
metrics=train_task.outputs["metrics_output"],
model_name=model_name,
min_f1=min_f1
)
# Компиляция в YAML
kfp.compiler.Compiler().compile(fraud_detection_pipeline, "pipeline.yaml")
Launching the pipeline
client = kfp.Client(host="http://ml-pipeline-ui.kubeflow.svc.cluster.local:80")
# Разовый запуск
run = client.create_run_from_pipeline_func(
fraud_detection_pipeline,
arguments={"learning_rate": 0.03, "n_estimators": 1000},
run_name="experiment-47"
)
# Регулярный запуск по расписанию
client.create_recurring_run(
experiment_id=experiment.id,
job_name="weekly-retrain",
cron_expression="0 2 * * 1", # каждый понедельник в 2:00
pipeline_func=fraud_detection_pipeline,
arguments={"data_path": "s3://bucket/fraud-data/latest/"}
)
Caching steps
KFP automatically caches step results: if the input data and code haven't changed, the step is skipped and the previous results are used. This saves time when repeating experiments with the same data.
Pipeline monitoring
The Kubeflow UI displays a pipeline execution graph with statuses for each step, logs for each container, artifacts, and metrics. Prometheus metrics: kubeflow_pipeline_run_duration_seconds, kubeflow_pipeline_step_count.
Setup times
Week 1: Installing Kubeflow, first simple pipeline, checking GPU access
Week 2: Integration with MLflow, S3 for artifacts, parameterization
Week 3–4: Caching, scheduled runs, component unit tests
Month 2: Multi-GPU training, hyperparameter sweeps, production deployment







