Kubeflow ML Pipelines

Table of Contents

Overview

Kubeflow is an open-source machine learning platform designed to make deployments of ML workflows on Kubernetes simple, portable, and scalable. Originally developed at Google and released in 2017, Kubeflow enables data scientists and ML engineers to build, train, and deploy machine learning models using Kubernetes as the orchestration layer, abstracting away infrastructure complexity while providing enterprise-grade scalability.

The project's name combines "Kube" from Kubernetes and "Flow" from TensorFlow, reflecting its origins as a way to run TensorFlow jobs on Kubernetes. Today, Kubeflow has evolved into a comprehensive MLOps platform supporting multiple ML frameworks including PyTorch, scikit-learn, XGBoost, and more.

Background

History and Evolution

  • 2017: Kubeflow originated as an internal Google project to simplify running TensorFlow on Kubernetes
  • 2018: Released as open-source under the Apache 2.0 license
  • 2019: Kubeflow 1.0 released with stable APIs and production-ready components
  • 2020-2021: KFServing evolved into KServe as a standalone project
  • 2022-2024: Continued maturation with improved multi-tenancy and security features

Design Philosophy

Kubeflow embraces several key principles:

  1. Composability: Each component can be used independently or together
  2. Portability: Deploy the same pipelines across any Kubernetes cluster (on-premises, cloud, hybrid)
  3. Scalability: Leverage Kubernetes autoscaling for training and serving workloads
  4. Reproducibility: Version control for data, code, and model artifacts

Key Concepts

Core Components

Kubeflow Pipelines

The heart of Kubeflow for MLOps workflows. Pipelines allow you to author, schedule, and monitor multi-step ML workflows as directed acyclic graphs (DAGs).

from kfp import dsl
from kfp import compiler

@dsl.component
def preprocess_data(input_path: str, output_path: str):
    """Preprocess raw data for training."""
    import pandas as pd
    df = pd.read_csv(input_path)
    # Preprocessing logic
    df.to_csv(output_path, index=False)

@dsl.component
def train_model(data_path: str, model_path: str, epochs: int = 10):
    """Train ML model on preprocessed data."""
    import tensorflow as tf
    # Training logic
    model.save(model_path)

@dsl.pipeline(name="ml-training-pipeline")
def training_pipeline(input_data: str, output_model: str):
    preprocess_task = preprocess_data(input_path=input_data, output_path="/tmp/processed.csv")
    train_task = train_model(data_path=preprocess_task.output, model_path=output_model)

compiler.Compiler().compile(training_pipeline, "pipeline.yaml")

Key features:

  • Visual pipeline editor and execution viewer
  • Artifact tracking and lineage
  • Experiment management and comparison
  • Recurring run scheduling

Kubeflow Notebooks

Jupyter notebook servers running natively on Kubernetes with:

  • Pre-configured images for TensorFlow, PyTorch, and other frameworks
  • GPU support for accelerated computing
  • Persistent volumes for data storage
  • Integration with Kubeflow Pipelines SDK

KServe (formerly KFServing)

Serverless inference platform for deploying ML models on Kubernetes:

apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: sklearn-iris
spec:
  predictor:
    sklearn:
      storageUri: "gs://kfserving-examples/models/sklearn/iris"
      resources:
        requests:
          cpu: 100m
          memory: 256Mi
        limits:
          cpu: 500m
          memory: 512Mi

Features:

  • Autoscaling (including scale-to-zero)
  • Canary rollouts and A/B testing
  • Request batching for efficiency
  • Model explainability with Alibi
  • Support for TensorFlow, PyTorch, scikit-learn, XGBoost, ONNX, and custom models

Katib

Automated machine learning (AutoML) system for hyperparameter tuning and neural architecture search:

apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
  name: hyperparameter-tuning
spec:
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: accuracy
  algorithm:
    algorithmName: bayesianoptimization
  parallelTrialCount: 3
  maxTrialCount: 12
  parameters:
    - name: learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.001"
        max: "0.1"
    - name: batch_size
      parameterType: int
      feasibleSpace:
        min: "16"
        max: "128"

Supported algorithms:

  • Grid search, random search
  • Bayesian optimization
  • Tree-structured Parzen Estimator (TPE)
  • Hyperband and ASHA
  • Neural Architecture Search (NAS)

Training Operators

Kubernetes operators for distributed training:

  • TFJob: Distributed TensorFlow training
  • PyTorchJob: Distributed PyTorch training with DDP
  • MXNetJob: Apache MXNet distributed training
  • XGBoostJob: Distributed XGBoost training
  • MPIJob: MPI-based distributed training (Horovod)
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-distributed
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      template:
        spec:
          containers:
            - name: pytorch
              image: pytorch/pytorch:latest
              command: ["python", "train.py"]
    Worker:
      replicas: 4
      template:
        spec:
          containers:
            - name: pytorch
              image: pytorch/pytorch:latest
              resources:
                limits:
                  nvidia.com/gpu: 1

Architecture

+------------------------------------------------------------------+
|                         Kubeflow Platform                         |
+------------------------------------------------------------------+
|  +-------------+  +-------------+  +-------------+  +-----------+ |
|  |  Notebooks  |  |  Pipelines  |  |   KServe    |  |   Katib   | |
|  +-------------+  +-------------+  +-------------+  +-----------+ |
|  +-------------+  +-------------+  +-------------+  +-----------+ |
|  | TF Operator |  | PT Operator |  | MPI Operator|  |  Volumes  | |
|  +-------------+  +-------------+  +-------------+  +-----------+ |
+------------------------------------------------------------------+
|                    Kubernetes Cluster                             |
|  +-------------+  +-------------+  +-------------+  +-----------+ |
|  |   Istio     |  |   Knative   |  |   Storage   |  |    GPU    | |
|  | (Networking)|  | (Serverless)|  |   (PV/PVC)  |  | Scheduler | |
|  +-------------+  +-------------+  +-------------+  +-----------+ |
+------------------------------------------------------------------+
|                     Infrastructure Layer                          |
|        (AWS EKS / GCP GKE / Azure AKS / On-Premises)             |
+------------------------------------------------------------------+

Implementation

Getting Started

Prerequisites

  • Kubernetes cluster (1.22+) with at least 4 CPUs and 16GB RAM
  • kubectl configured for your cluster
  • kustomize (v3.2.0+) for deployment customization

Installation Methods

1. Kubeflow Manifests (Full Platform)
# Clone the manifests repository
git clone https://github.com/kubeflow/manifests.git
cd manifests

# Deploy using kustomize
while ! kustomize build example | kubectl apply -f -; do
  echo "Retrying..."
  sleep 10
done

# Access the dashboard (default credentials: user@example.com / 12341234)
kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80
2. Standalone Kubeflow Pipelines
# Simpler deployment for pipelines-only use case
export PIPELINE_VERSION=2.0.5
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-emissary?ref=$PIPELINE_VERSION"
3. Cloud Provider Distributions

Pipeline SDK Usage

Python SDK (KFP v2)

from kfp import dsl, compiler
from kfp.client import Client

# Define components
@dsl.component(base_image="python:3.10")
def load_data(dataset_name: str) -> str:
    import urllib.request
    url = f"https://data.example.com/{dataset_name}.csv"
    path = f"/tmp/{dataset_name}.csv"
    urllib.request.urlretrieve(url, path)
    return path

@dsl.component(base_image="tensorflow/tensorflow:2.13.0")
def train_tensorflow_model(
    data_path: str,
    epochs: int,
    learning_rate: float
) -> str:
    import tensorflow as tf
    # Load and prepare data
    # Build and train model
    model_path = "/tmp/model"
    # model.save(model_path)
    return model_path

@dsl.component(base_image="python:3.10")
def evaluate_model(model_path: str, test_data: str) -> float:
    # Load model and evaluate
    accuracy = 0.95  # placeholder
    return accuracy

# Define pipeline
@dsl.pipeline(
    name="TensorFlow Training Pipeline",
    description="End-to-end ML pipeline with TensorFlow"
)
def tf_training_pipeline(
    dataset: str = "mnist",
    epochs: int = 10,
    learning_rate: float = 0.001
):
    load_task = load_data(dataset_name=dataset)
    train_task = train_tensorflow_model(
        data_path=load_task.output,
        epochs=epochs,
        learning_rate=learning_rate
    )
    eval_task = evaluate_model(
        model_path=train_task.output,
        test_data=load_task.output
    )

# Compile and submit
compiler.Compiler().compile(tf_training_pipeline, "pipeline.yaml")

# Submit to Kubeflow
client = Client(host="http://localhost:8080")
run = client.create_run_from_pipeline_package(
    "pipeline.yaml",
    arguments={"dataset": "cifar10", "epochs": 20}
)

Integration with ML Frameworks

MLflow Integration

Use MLflow for experiment tracking alongside Kubeflow Pipelines:

@dsl.component(
    base_image="python:3.10",
    packages_to_install=["mlflow", "scikit-learn"]
)
def train_with_mlflow(
    data_path: str,
    mlflow_tracking_uri: str,
    experiment_name: str
) -> str:
    import mlflow
    from sklearn.ensemble import RandomForestClassifier

    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run():
        # Train model
        model = RandomForestClassifier(n_estimators=100)
        # model.fit(X_train, y_train)

        # Log parameters and metrics
        mlflow.log_param("n_estimators", 100)
        mlflow.log_metric("accuracy", 0.95)

        # Log model
        mlflow.sklearn.log_model(model, "model")

        return mlflow.active_run().info.run_id

PyTorch Distributed Training

@dsl.component(base_image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime")
def distributed_pytorch_training(
    data_path: str,
    model_output: str,
    world_size: int = 4
):
    import torch
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Initialize distributed training
    dist.init_process_group(backend="nccl")
    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)

    # Create model and wrap with DDP
    model = MyModel().cuda(local_rank)
    model = DDP(model, device_ids=[local_rank])

    # Training loop
    # ...

    # Save model on rank 0
    if dist.get_rank() == 0:
        torch.save(model.module.state_dict(), model_output)

Production Deployment

Model Serving with KServe

apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: production-model
  annotations:
    sidecar.istio.io/inject: "true"
spec:
  predictor:
    minReplicas: 2
    maxReplicas: 10
    scaleTarget: 80
    scaleMetric: concurrency
    tensorflow:
      storageUri: "s3://models/production/v1"
      runtimeVersion: "2.13.0"
      resources:
        requests:
          cpu: "1"
          memory: "2Gi"
          nvidia.com/gpu: "1"
        limits:
          cpu: "2"
          memory: "4Gi"
          nvidia.com/gpu: "1"
  transformer:
    containers:
      - name: feature-transformer
        image: myregistry/feature-transformer:v1
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"

Canary Deployment

apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: model-canary
spec:
  predictor:
    canaryTrafficPercent: 20
    tensorflow:
      storageUri: "s3://models/production/v2"  # New version
  # Previous version gets remaining 80% traffic

References

Related Technologies

Books and Courses

  • "Kubeflow for Machine Learning" by Holden Karau et al. (O'Reilly)
  • "Building Machine Learning Pipelines" by Hannes Hapke (O'Reilly)
  • MLOps Specialization on Coursera

Notes

Comparison with Other MLOps Platforms

Feature Kubeflow MLflow SageMaker
Infrastructure Kubernetes Agnostic AWS-native
Experiment Tracking Via MLflow/custom Native Native
Pipeline Authoring Python SDK Limited Python SDK
Model Serving KServe MLflow Serving SageMaker Endpoints
AutoML Katib Third-party Autopilot
Cost Infrastructure Open-source Pay-per-use
Learning Curve Steep Moderate Moderate

Best Practices

  1. Start Small: Begin with standalone Pipelines before full Kubeflow deployment
  2. Use Caching: Enable pipeline caching to avoid redundant computation
  3. Version Everything: Use artifact versioning for reproducibility
  4. Resource Limits: Always set resource requests and limits for components
  5. Multi-tenancy: Use Kubeflow profiles for team isolation
  6. Monitoring: Integrate Prometheus and Grafana for observability

Common Challenges

  • Complexity: Full Kubeflow deployment requires expertise in Kubernetes, Istio, and multiple components
  • Resource Intensive: Minimum viable deployment requires significant cluster resources
  • Upgrade Path: Major version upgrades can be challenging due to CRD changes
  • Debugging: Distributed pipeline failures require understanding of Kubernetes internals

Future Directions

  • Improved integration with LLM workflows and fine-tuning pipelines
  • Enhanced support for federated learning
  • Better multi-cloud and hybrid deployment options
  • Simplified installation and management

Author: Jason Walsh

j@wal.sh

Last Updated: 2026-01-11 11:00:26

build: 2026-01-11 18:32 | sha: eb805a8