In [None]:
from kfp.dsl import component, pipeline
import kfp
from kfp import kubernetes

@component(
    packages_to_install=["pandas", "numpy", "tensorflow"],
    base_image="python:3.9"
)
def prepare_data(data_path: str):
    import pandas as pd
    dftrain = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/train.csv')  # training data
    dftrain.to_csv(f'{data_path}/train.csv', index=False)
    dfeval = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/eval.csv')  # testing data
    dfeval.to_csv(f'{data_path}/eval.csv', index=False)

@component(
    packages_to_install=["pandas", "numpy", "tensorflow"],
    base_image="python:3.9",
)
def preprocess_data(data_path: str):
    import pandas as pd
    import numpy as np
    import pickle
    
    dftrain = pd.read_csv(f'{data_path}/train.csv')
    dfeval = pd.read_csv(f'{data_path}/eval.csv')
    y_train = dftrain.pop('survived')
    y_eval = dfeval.pop('survived')

    # Combine train and eval data to ensure consistent one-hot encoding
    combined = pd.concat([dftrain, dfeval], axis=0)
    
    # Convert categorical features to one-hot encoding
    combined = pd.get_dummies(combined, columns=['sex', 'class', 'deck', 'embark_town', 'alone'])
    
    # Split back into train and eval
    dftrain = combined.iloc[:len(dftrain)]
    dfeval = combined.iloc[len(dftrain):]
    
    # Normalize numeric features
    numeric_features = ['age', 'fare', 'n_siblings_spouses', 'parch']
    dftrain[numeric_features] = (dftrain[numeric_features] - dftrain[numeric_features].mean()) / dftrain[numeric_features].std()
    dfeval[numeric_features] = (dfeval[numeric_features] - dfeval[numeric_features].mean()) / dfeval[numeric_features].std()

    dftrain.to_csv(f'{data_path}/X_train.csv', index=False)
    y_train.to_csv(f'{data_path}/y_train.csv', index=False)
    dfeval.to_csv(f'{data_path}/X_eval.csv', index=False)
    y_eval.to_csv(f'{data_path}/y_eval.csv', index=False)

@component(
    packages_to_install=["pandas", "numpy", "tensorflow"],
    base_image="python:3.9",
)
def train_model(data_path: str, batch_size: int = 32):
    import pandas as pd
    import tensorflow as tf
    import os
    import datetime

    # Read the preprocessed data
    dftrain = pd.read_csv(f'{data_path}/X_train.csv')
    y_train = pd.read_csv(f'{data_path}/y_train.csv')

    # Define the model using Keras
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(dftrain.shape[1],)),  # Adjust input shape
        tf.keras.layers.Dense(1,activation='sigmoid')  # Simple linear layer
    ])
    
    log_dir = "/{}/titanic_training_logs/fit/".format(data_path) + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

    # Compile the model
    model.compile(optimizer='adam',
                  loss='binary_crossentropy',
                  metrics=['accuracy'])

    # Train the model with specified batch size
    model.fit(dftrain, y_train, epochs=25, 
              batch_size=batch_size,
              callbacks=[tensorboard_callback])  # Training the model

    # Save the trained model in the native Keras format
    model.save(f'{data_path}/model.keras')

@component(
    packages_to_install=["pandas", "numpy", "tensorflow", "mlflow", "boto3"],
    base_image="python:3.9",
)
def register_model(data_path: str) -> dict:
    import mlflow
    import tensorflow as tf
    from mlflow.models import infer_signature

    # Load the Keras model from the saved file
    model_path = f'{data_path}/model.keras'
    model = tf.keras.models.load_model(model_path)

    mlflow.set_tracking_uri("http://mlflow-tracking.mlflow.svc.cluster.local:80")
    reg_model_name = "tensorboard_metrics"
    mlflow.set_experiment(experiment_name="tb-titanic-exp")

    with mlflow.start_run() as run:
        mlflow.log_param('epochs', 10)
        artifact_path = "keras-model"
        # Log the Keras model object
        mlflow.keras.log_model(model=model, artifact_path=artifact_path)
        model_info = mlflow.register_model(model_uri=run.info.artifact_uri + '/' + artifact_path, name=reg_model_name)
    
    return {"artifact_path": artifact_path, "artifact_uri": run.info.artifact_uri, "run_id": run.info.run_id, "experiment_id": run.info.experiment_id}

@component(
    packages_to_install=["pandas", "numpy", "tensorflow", "mlflow", "boto3"],
    base_image="python:3.9",
)
def predict_on_test_data(data_path: str, model_info: dict) -> str:
    import pandas as pd
    import tensorflow as tf
    import mlflow

    artifact_path = model_info["artifact_path"]
    artifact_uri = model_info["artifact_uri"]
    
    mlflow.set_tracking_uri("http://mlflow-tracking.mlflow.svc.cluster.local:80")
    
    model_uri = f"{artifact_uri}/{artifact_path}"
    model = mlflow.keras.load_model(model_uri)

    dfeval = pd.read_csv(f'{data_path}/X_eval.csv')
    y_eval = pd.read_csv(f'{data_path}/y_eval.csv')

    predictions = model.predict(dfeval)
    predicted_classes = [1 if pred > 0.5 else 0 for pred in predictions]

    pd.DataFrame(predicted_classes).to_csv(f'{data_path}/y_pred.csv', index=False)

    # Print specific predictions for comparison
    print(dfeval.loc[1])
    print(y_eval.loc[1])
    print(predictions[1])

    return model_uri

from kubernetes import client, config
import base64

@pipeline(
    name="titanic-pipeline",
)
def titanic_pipeline(data_path: str):
    pvc1 = kubernetes.CreatePVC(
        pvc_name_suffix='-tb-example',
        access_modes=['ReadWriteOnce'],
        size='1G',
    )
    
    prepare_data_task = prepare_data(data_path=data_path)
    prepare_data_task.set_caching_options(False)
    kubernetes.mount_pvc(prepare_data_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    
    preprocess_data_task = preprocess_data(data_path=data_path)
    preprocess_data_task.set_caching_options(False)
    kubernetes.mount_pvc(preprocess_data_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    preprocess_data_task.after(prepare_data_task)
    
    train_model_task = train_model(data_path=data_path)
    train_model_task.set_caching_options(False)
    kubernetes.mount_pvc(train_model_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    train_model_task.after(preprocess_data_task)
    
    register_model_task = register_model(data_path=data_path)
    register_model_task.set_caching_options(False)
    kubernetes.mount_pvc(register_model_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    register_model_task.after(train_model_task)
    
    predict_on_test_data_task = predict_on_test_data(data_path=data_path, model_info=register_model_task.output)
    predict_on_test_data_task.set_caching_options(False)
    kubernetes.mount_pvc(predict_on_test_data_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    predict_on_test_data_task.after(register_model_task)

if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(titanic_pipeline, 'tensorboard_metrics.yaml')
