Skip to content

Overview

To build an end-to-end machine learning workflow, we will harness the power and flexibility of Rafay's MLOps Offering for Kubernetes. This seamlessly integrates a number of world class Open Source technologies such as Kubeflow, MLflow, Feast. K-Serve etc. Data Scientists and ML engineers can use this to go from data to deployment with an integrated MLOps pipeline:

  • Train effective models with Kubeflow pipelines
  • Register them with MLflow for versioning and lineage, and
  • Serve predictions robustly with KServe.

This unified environment gives us an extensible platform to develop, deploy, and monitor our models with the tools purpose-built for machine learning and Kubernetes. In this basic getting started guide, we will use the Iris data set for our MLOps pipeline.

Our goal is to build a machine learning model that can LEARN (i.e. Train) the measurements of these iris flowers, whose species are known. We will then use this model to PREDICT (i.e. Inference) the species for a new iris flower.

End-to-End MLOps Pipeline

Note

Although the Iris dataset is useful for introductory purposes, it is not representative of advanced machine learning tasks. It is a very small and well-structured dataset and lacks the complexity and variety found in real-world datasets.


Pipelines

Kubeflow pipelines consist of components, each representing a step in the pipeline. Each component runs in its container ensuring that each step can encapsulate its set of dependencies without affecting other components.

For every component we build, we will either create a new container image or use an existing one. These images take inputs, execute specific operations, and produce outputs. We will also maintain a distinct Python script that will transform each container image into a pipeline component and assemble the components into a complete, end-to-end MLOps pipeline.

Our MLOps pipeline will comprise the following steps/stages:

  • Data Preparation
  • Train-Test-Split
  • Model Training
  • Model Registration
  • Model Evaluation
  • Model Serving

The image below shows how the pipeline and the various steps are visualized in Kubeflow.

Our Pipeline


Components

Our pipeline is developed using the Kubeflow SDK v2. Let us review and understand the various steps in our pipeline in detail.

Imports

We will import the Kubeflow Pipelines Module (i.e. kfp.dsl). This contains domain-specific language objects used to compose pipelines.

from kfp.dsl import component, pipeline
import kfp
from kfp import kubernetes

Note

The Kubeflow Pipelines SDK for Python supports multiple versions of Python. You can specify the version of Python in the base image for each component.


Data Preparation

In this component in our pipeline, we will use the Python 3.9 image because it is compatible with KFP. We will specify a number of libraries that we will require for data preparation.

  • We will first download the standard Iris data set from the SKLearn repository.
  • Next we will clean the dataset by dropping rows or columns that are "not numbers"

Note

Kubeflow will automatically pull the Python 3.9 image from DockerHub and install the required libraries.

@component(
    packages_to_install=["pandas", "numpy", "scikit-learn"],
    base_image="python:3.9",
)
def prepare_data(data_path: str):
    import pandas as pd
    import os
    from sklearn import datasets

    # Load dataset
    iris = datasets.load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names)
    df['species'] = iris.target

    # Clean the dataset by dropping rows or columns that contain NaN values.
    df = df.dropna()
    df.to_csv(f'{data_path}/final_df.csv', index=False)

Split Data

In this component, we will use the train_test_split() method to split our data into train and test sets.

  • We need to divide our data into features (X) and labels (y).
  • The dataframe gets divided into X_train, X_test, y_train, and y_test.
  • X_train and y_train sets are used for training and fitting the model.
  • The X_test and y_test sets are used for testing the model if it’s predicting the right outputs/labels.

Note

We can explicitly test the size of the train and test sets. It is recommended to keep our train sets larger than the test sets.

@component(
    packages_to_install=["pandas", "numpy", "scikit-learn"],
    base_image="python:3.9",
)
def train_test_split(data_path: str):    
    import pandas as pd
    import numpy as np
    import os
    from sklearn.model_selection import train_test_split

    final_data = pd.read_csv(f'{data_path}/final_df.csv')

    target_column = 'species'
    X = final_data.loc[:, final_data.columns != target_column]
    y = final_data.loc[:, final_data.columns == target_column]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3,stratify = y, random_state=47)

    np.save(f'{data_path}/X_train.npy', X_train)
    np.save(f'{data_path}/X_test.npy', X_test)
    np.save(f'{data_path}/y_train.npy', y_train)
    np.save(f'{data_path}/y_test.npy', y_test)

Model Training

In this component, we will train a logistic regression model using our cleaned Iris dataset.

Note

Despite its name, logistic regression can actually be used as a model for classification. In our Iris dataset, we have a total of four features that we can use.

@component(
    packages_to_install=["pandas", "numpy==1.21.0", "scikit-learn"],
    base_image="python:3.9",
)
def training_basic_classifier(data_path: str):
    import pandas as pd
    import numpy as np
    import os
    from sklearn.linear_model import LogisticRegression

    X_train = np.load(f'{data_path}/X_train.npy',allow_pickle=True)
    y_train = np.load(f'{data_path}/y_train.npy',allow_pickle=True)

    classifier = LogisticRegression(max_iter=500)
    classifier.fit(X_train,y_train)
    import pickle
    with open(f'{data_path}/model.pkl', 'wb') as f:
        pickle.dump(classifier, f)

Register Model

The Rafay MLOps Platform comes with a pre-integrated, world class model registry (i.e. MLflow). In this component, we will register and log the model into our model registry. Users can observe details about the models organized by the following in the integrated MLFlow registry. They just need to click on MLflow in their dashboard to access it.

  1. Experiments
  2. Registered Models

Note

The Model versions are how a model lineage can be traced.

@component(
    packages_to_install=["pandas", "numpy==1.21.0", "scikit-learn", "mlflow", "boto3"],
    base_image="python:3.9",
)
def register_model(data_path: str) -> dict:
    import pandas as pd
    import numpy as np
    import pickle
    import os
    import mlflow
    from mlflow.models import infer_signature
    from sklearn import datasets

    with open(f'{data_path}/model.pkl','rb') as f:
        logistic_reg_model = pickle.load(f)

    # Infer the model signature
    X_test = np.load(f'{data_path}/X_test.npy', allow_pickle=True)
    y_pred = logistic_reg_model.predict(X_test)
    signature = infer_signature(X_test, y_pred)


    # Log and register the model using MLflow scikit-learn API
    mlflow.set_tracking_uri("http://mlflow-tracking.mlflow.svc.cluster.local:80")
    reg_model_name = "SklearnLogisticRegression"

    # Change the name of the experiment if required 
    mlflow.set_experiment(experiment_name="new-exp")

    with mlflow.start_run() as run:
        mlflow.log_param('max_iter', 500)

        # Log model artifact to our registry 
        artifact_path = "sklearn-model"      
        mlflow.log_artifact(local_path=f'{data_path}/model.pkl', artifact_path=artifact_path)

        model_info = mlflow.sklearn.log_model(
            sk_model=logistic_reg_model,
            artifact_path="sklearn-model",
            signature=signature,
            registered_model_name=reg_model_name,
        )

    model_uri = f"runs:/{run.info.run_id}/sklearn-model" #run details 


    # Capture version from register_model and pass it to next stage in the pipeline for inference etc. 
    mlflow.register_model(
        model_uri,
        reg_model_name
    )

    return {"artifact_path": artifact_path, "artifact_uri": run.info.artifact_uri, "run_id": run.info.run_id, "experiment_id": run.info.experiment_id}

Model Evaluation

In this component, we will load the model from the Model Registry and score the test data (i.e. predictions). Model evaluation

  • Is the process of using different evaluation metrics to understand a machine learning model's performance, as well as its strengths and weaknesses.
  • Is important to assess the efficacy of a model during initial research phases.
@component(
    packages_to_install=["pandas", "numpy==1.21.0", "scikit-learn", "mlflow", "boto3"],
    base_image="python:3.9",
)
def predict_on_test_data(data_path: str, model_info: dict) -> str:
    import pandas as pd
    import numpy as np
    import pickle
    import os
    import mlflow

    artifact_path = model_info["artifact_path"]
    artifact_uri = model_info["artifact_uri"]

    mlflow.set_tracking_uri("http://mlflow-tracking.mlflow.svc.cluster.local:80")

    # Update the default name of the experiment if required 
    mlflow.set_experiment(experiment_name="experiment-2006")
    model_uri = f"{artifact_uri}/{artifact_path}"
    logistic_reg_model = mlflow.sklearn.load_model(model_uri)

    X_test = np.load(f'{data_path}/X_test.npy',allow_pickle=True)
    y_pred = logistic_reg_model.predict(X_test)
    np.save(f'{data_path}/y_pred.npy', y_pred)

    X_test = np.load(f'{data_path}/X_test.npy',allow_pickle=True)
    y_pred_prob = logistic_reg_model.predict_proba(X_test)
    np.save(f'{data_path}/y_pred_prob.npy', y_pred_prob)

    return model_uri

Model Serving

In this component, we will utilize the KServe Python Client SDK that interacts with KServe control plane APIs for executing operations on KServe.

  • KServe is a standard Model Inference Platform on Kubernetes, built for highly scalable use cases.
  • KServe provides a highly performant, standardized inference protocol across ML frameworks.
  • KServe supports modern serverless inference workload with autoscaling including Scale to Zero on GPU.
  • KServe provides high scalability, density packing and intelligent routing
  • KServe provides pluggable production serving for production ML serving including prediction, pre/post processing, monitoring and explainability.
  • KServe provides advanced deployments with canary rollout, experiments, ensembles and transformers.
@component(
    packages_to_install=["kserve"],
    base_image="python:3.9",
)
def model_serving(model_info: dict):
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec
    from kserve import V1beta1ModelSpec
    from kserve import V1beta1ModelFormat
    import os

    artifact_root = "mlflow-artifacts:"

    build_model_uri = "{}/{}/{}/artifacts/{}".format(artifact_root,model_info['experiment_id'],
                                                     model_info['run_id'],model_info['artifact_path'])

    namespace = utils.get_default_target_namespace()

    name='sklearn-iris-v3-demo'

    predictor = V1beta1PredictorSpec(
        model=V1beta1ModelSpec(
            model_format=V1beta1ModelFormat(
                name="sklearn",
            ),
            protocol_version="v2",
            runtime="kserve-mlserver",
            storage_uri=build_model_uri,
        ),
    )

    isvc = V1beta1InferenceService(
        api_version=constants.KSERVE_V1BETA1,
        kind=constants.KSERVE_KIND,
        metadata=client.V1ObjectMeta(
            name=name, namespace=namespace,annotations={'sidecar.istio.io/inject':'false'}
        ),
        spec=V1beta1InferenceServiceSpec(predictor=predictor),
    )


    KServe = KServeClient()
    KServe.create(isvc)
    KServe.get(name, namespace=namespace, watch=True, timeout_seconds=200)