To build an end-to-end machine learning workflow, we will harness the power and flexibility of Rafay's MLOps Offering for Kubernetes. This seamlessly integrates a number of world class Open Source technologies such as Kubeflow, MLflow, Feast. K-Serve etc. Data Scientists and ML engineers can use this to go from data to deployment with an integrated MLOps pipeline:
Train effective models with Kubeflow pipelines
Register them with MLflow for versioning and lineage, and
Serve predictions robustly with KServe.
This unified environment gives us an extensible platform to develop, deploy, and monitor our models with the tools purpose-built for machine learning and Kubernetes. In this basic getting started guide, we will use the Iris data set for our MLOps pipeline.
Our goal is to build a machine learning model that can LEARN (i.e. Train) the measurements of these iris flowers, whose species are known. We will then use this model to PREDICT (i.e. Inference) the species for a new iris flower.
Note
Although the Iris dataset is useful for introductory purposes, it is not representative of advanced machine learning tasks. It is a very small and well-structured dataset and lacks the complexity and variety found in real-world datasets.
Kubeflow pipelines consist of components, each representing a step in the pipeline. Each component runs in its container ensuring that each step can encapsulate its set of dependencies without affecting other components.
For every component we build, we will either create a new container image or use an existing one. These images take inputs, execute specific operations, and produce outputs. We will also maintain a distinct Python script that will transform each container image into a pipeline component and assemble the components into a complete, end-to-end MLOps pipeline.
Our MLOps pipeline will comprise the following steps/stages:
Data Preparation
Train-Test-Split
Model Training
Model Registration
Model Evaluation
Model Serving
The image below shows how the pipeline and the various steps are visualized in Kubeflow.
The Kubeflow Pipelines SDK for Python supports multiple versions of Python. You can specify the version of Python in the base image for each component.
In this component in our pipeline, we will use the Python 3.9 image because it is compatible with KFP. We will specify a number of libraries that we will require for data preparation.
We will first download the standard Iris data set from the SKLearn repository.
Next we will clean the dataset by dropping rows or columns that are "not numbers"
Note
Kubeflow will automatically pull the Python 3.9 image from DockerHub and install the required libraries.
@component(packages_to_install=["pandas","numpy","scikit-learn"],base_image="python:3.9",)defprepare_data(data_path:str):importpandasaspdimportosfromsklearnimportdatasets# Load datasetiris=datasets.load_iris()df=pd.DataFrame(iris.data,columns=iris.feature_names)df['species']=iris.target# Clean the dataset by dropping rows or columns that contain NaN values.df=df.dropna()df.to_csv(f'{data_path}/final_df.csv',index=False)
Despite its name, logistic regression can actually be used as a model for classification. In our Iris dataset, we have a total of four features that we can use.
The Rafay MLOps Platform comes with a pre-integrated, world class model registry (i.e. MLflow). In this component, we will register and log the model into our model registry. Users can observe details about the models organized by the following in the integrated MLFlow registry. They just need to click on MLflow in their dashboard to access it.
Experiments
Registered Models
Note
The Model versions are how a model lineage can be traced.
@component(packages_to_install=["pandas","numpy==1.21.0","scikit-learn","mlflow","boto3"],base_image="python:3.9",)defregister_model(data_path:str)->dict:importpandasaspdimportnumpyasnpimportpickleimportosimportmlflowfrommlflow.modelsimportinfer_signaturefromsklearnimportdatasetswithopen(f'{data_path}/model.pkl','rb')asf:logistic_reg_model=pickle.load(f)# Infer the model signatureX_test=np.load(f'{data_path}/X_test.npy',allow_pickle=True)y_pred=logistic_reg_model.predict(X_test)signature=infer_signature(X_test,y_pred)# Log and register the model using MLflow scikit-learn APImlflow.set_tracking_uri("http://mlflow-tracking.mlflow.svc.cluster.local:80")reg_model_name="SklearnLogisticRegression"# Change the name of the experiment if required mlflow.set_experiment(experiment_name="new-exp")withmlflow.start_run()asrun:mlflow.log_param('max_iter',500)# Log model artifact to our registry artifact_path="sklearn-model"mlflow.log_artifact(local_path=f'{data_path}/model.pkl',artifact_path=artifact_path)model_info=mlflow.sklearn.log_model(sk_model=logistic_reg_model,artifact_path="sklearn-model",signature=signature,registered_model_name=reg_model_name,)model_uri=f"runs:/{run.info.run_id}/sklearn-model"#run details # Capture version from register_model and pass it to next stage in the pipeline for inference etc. mlflow.register_model(model_uri,reg_model_name)return{"artifact_path":artifact_path,"artifact_uri":run.info.artifact_uri,"run_id":run.info.run_id,"experiment_id":run.info.experiment_id}
In this component, we will load the model from the Model Registry and score the test data (i.e. predictions). Model evaluation
Is the process of using different evaluation metrics to understand a machine learning model's performance, as well as its strengths and weaknesses.
Is important to assess the efficacy of a model during initial research phases.
@component(packages_to_install=["pandas","numpy==1.21.0","scikit-learn","mlflow","boto3"],base_image="python:3.9",)defpredict_on_test_data(data_path:str,model_info:dict)->str:importpandasaspdimportnumpyasnpimportpickleimportosimportmlflowartifact_path=model_info["artifact_path"]artifact_uri=model_info["artifact_uri"]mlflow.set_tracking_uri("http://mlflow-tracking.mlflow.svc.cluster.local:80")# Update the default name of the experiment if required mlflow.set_experiment(experiment_name="experiment-2006")model_uri=f"{artifact_uri}/{artifact_path}"logistic_reg_model=mlflow.sklearn.load_model(model_uri)X_test=np.load(f'{data_path}/X_test.npy',allow_pickle=True)y_pred=logistic_reg_model.predict(X_test)np.save(f'{data_path}/y_pred.npy',y_pred)X_test=np.load(f'{data_path}/X_test.npy',allow_pickle=True)y_pred_prob=logistic_reg_model.predict_proba(X_test)np.save(f'{data_path}/y_pred_prob.npy',y_pred_prob)returnmodel_uri