Zum Hauptinhalt springen

Nachdem wir in den ersten Artikeln einen Ausflug in die Welt von Ray gemacht haben, wollen wir uns jetzt Vertex AI – dem Schlüsselbereich aller Machine Learning Services in GCP – widmen. Pipelines sollen das Leben in der Machine-Learning-Welt einfacher machen. Sie versprechen, durch ein hohes Maß an Automatisierung Entwicklungszyklen zu verkürzen. Außerdem soll das Team durch eine Abstraktion der Infrastruktur keine Expertise mit Microservices etc. benötigen und kann sich stattdessen auf seine Kernkompetenzen fokussieren.

In diesem Blogbeitrag wollen wir uns an einem einfachen Beispiel ansehen, wie eine Machine Learning Pipeline in Vertex AI aufgesetzt werden kann.

Was sind Vertex AI Pipelines?

Im Gegensatz zu anderen Cloud-Anbietern hat Google die Entscheidung getroffen, keine produkteigenen Pipelines (wie AWS Sagemaker) zu entwickeln, und setzt stattdessen auf die Open-Source-Technologien Kubeflow und TensorFlow Extended (TFX), deren Pipeline-Formate unterstützt werden.

Wir werden uns hier auf Kubeflow-Pipelines fokussieren. Diese bestehen aus individuellen Komponenten, die durch Python-Code verknüpft werden. Diese Komponenten können entweder fertige Container oder einfach nur Python-Funktionen sein – Kubeflow führt Letztere automatisch in einer Containerumgebung aus, ohne dass wir ein Dockerfile schreiben müssen.

Hands-on – Erstellen und Ausführen einer Pipeline

Der genaue Ablauf der Implementierung lässt sich in meinen Augen am besten an einem Beispiel beschreiben. Der folgende Code wird eine Pipeline erzeugen, die

  • Daten aus einer Quelle einliest (der Einfachheit halber laden wir hier nur einen SklearnBeispieldatensatz).
  • ein Modell trainiert und die Ergebnisse validiert.
     

Einlesen der Daten

Die Beispielimplementierung für den ersten Schritt sieht wie folgt aus:


import kfp.v2.dsl, kfp.v2.compiler
from kfp.v2.dsl import Artifact, Dataset, Input, Metrics, Model, Output

@kfp.v2.dsl.component(base_image="python:3.9-slim", packages_to_install=["pandas", "sklearn"])
def data_source(output: Output[Dataset]):
    import pandas as pd
    import numpy as np
    from sklearn.datasets import load_wine
    X, y = load_wine(return_X_y=True)
    data = pd.DataFrame(data=np.hstack([X, y.reshape([-1, 1])]))
    data.to_csv(output.path, index=False, header=False

 

Wir werden Schritt für Schritt durch alle Punkte dieser Funktion gehen:

  • Der Decorator: Der Decorator @component macht aus einer normalen PythonFunktion eine Kubeflow-Komponente. Sie definiert im Wesentlichen, wie die Funktion in einer containerisierten Umgebung ausgeführt werden soll. Wir haben hier als Beispiel ein Standard-Python-Basis-Image angegeben, in dem die Pakete Pandas und Sklearn installiert werden sollen.
  • Die Parameter: Das ist eine KubeflowBesonderheit, die etwas gewöhnungsbedürftig ist. Die Funktion bekommt ein OutputObjekt als Input. Anstatt dass die Funktion einen Return-Wert hat, wird der gewünschte Return-Wert an einem Pfad, der durch das Output-Objekt definiert ist, gespeichert. Der Type-Hint ist bei Kubeflow nicht optional – er wird benötigt, damit Kubeflow weiß, welche Art von Artifact (Dataset, Model, Metric) es als Input bereitstellen muss.
  • Die Funktion: Im Grunde eine ganz normale PythonFunktion, jedoch mit zwei Besonderheiten: Die erste ist, dass die Imports der Python-Pakete innerhalb der Funktion stattfinden müssen. Dadurch wird die Funktion autark und kann ohne weitere Infos auch später innerhalb eines Containers ausgeführt werden. Die zweite Besonderheit kommt nicht von Kubeflow, sondern ist Vertex-AI-spezifisch und nicht direkt ersichtlich: Die Codezeile data.to_csv(...) speichert den Datensatz im CSVFormat an einem Pfad, der durch output.path definiert wird. Dieser Pfad funktioniert wie ein Pfad im lokalen Filesystem, ist aber eigentlich ein Mount eines Cloud Storage Buckets.

 

Trainieren des Modells

In der zweiten Funktion wird mit dem Datensatz ein Modell trainiert und anschließend validiert:

@kfp.v2.dsl.component(base_image="python:3.9-slim", packages_to_install=["pandas", "sklearn"])
def train_model(input: Input[Dataset], model: Output[Model], metrics: Output[Metrics]):
    import pandas as pd
    import pickle
    from sklearn.model_selection import train_test_split
    from sklearn.tree import DecisionTreeRegressor

    data = pd.read_csv(input.path, header=False)
    features = list(data.columns)[0:-1]
    target = list(data.columns)[-1]

    x_train, x_test, y_train, y_test = train_test_split(data[features], data[target])

    ml_model = DecisionTreeRegressor()
    ml_model.fit(x_train, y_train)

    with open(model.path, "wb") as f:
        pickle.dump(ml_model, f)

    accuracy = ml_model.score(x_test, y_test)
    metrics.log_metric("accuracy", (accuracy * 100.0))
    metrics.log_metric("framework", "Scikit Learn")

Der Output der vorangehenden Funktion ist hier als Input definiert. Dadurch ist der Pfad bekannt, aus dem der Datensatz gelesen werden kann. Zusätzlich sind hier zwei Outputs definiert – das trainierte Modell und dessen Metriken zur Beschreibung des Modells. Metriken sind nach Belieben erstellbar. Sie funktionieren nach dem Key-Value-Prinzip: So loggen wir hier zum Beispiel die Metrik mit dem Namen framework und dem Wert Scikit Learn.

 

Zusammenfügen der Pipeline-Komponenten

Jetzt, nachdem wir beide Funktionen definiert haben, können wir sie in einer Pipeline verknüpfen:

@kfp.v2.dsl.pipeline(name="tree-pipeline")
def my_pipeline():
    source_op = data_source()
    train_model(source_op.outputs["output"])

Durch den Decorator wird beim Ausführen der Funktionen der Code nicht direkt ausgeführt. Stattdessen werden Operatoren erstellt, die sich verknüpfen lassen.

 

Kompilieren und Ausführen der Pipeline

Die Pipeline ist fertig. Jetzt können wir sie kompilieren und hochladen:

import os
from datetime import datetime
from pathlib import Path

from google.cloud import aiplatform

if __name__ == "__main__":
    filename = str(Path(__file__).parent.joinpath("pipeline.json"))
    kfp.v2.compiler.Compiler().compile(my_pipeline, filename)

    run = aiplatform.PipelineJob(
        project=os.environ['PROJECT_ID'],
        location=os.environ['REGION'],
        display_name="test-pipeline",
        template_path=filename,
        job_id=f"test-pipeline-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}",
        enable_caching=True,
        pipeline_root=f"gs://{os.environ['BUCKET_ID']}",
        parameter_values={},
    )

    run.submit(service_account=os.environ["SERVICE_ACCOUNT"])

Der Kompiliervorgang konvertiert unseren Python-Code in ein JSON-File. Dieses File wird im Anschluss hochgeladen und ausgeführt.

In Vertex AI lässt sich die Pipeline und ihr aktueller Fortschritt betrachten. Das Ganze sieht dort in etwa so aus (die hier abgebildete Pipeline hat zwei zusätzliche Schritte, die ich hier zur Vereinfachung ausgelassen habe):

 

Fazit

Vertex AI ist noch ein sehr junges Produkt, das ständig weiterentwickelt wird. Umso mehr hat es mich erstaunt, wie leicht es ist, Pipelines zusammenzustellen, die unterschiedliche Schritte in klar separierten Umgebungen ausführen können. Es sind abgesehen von ein paar Docker-Grundlagen (zur Wahl und ggf. Erstellung eines Basis-Images) keinerlei Infrastrukturkenntnisse notwendig. Außerdem ist es sehr praktisch, dass Cloud Buckets wie herkömmliche Volumes benutzt werden können, ohne hierfür extra Code schreiben zu müssen.

Alles in allem haben wir festgestellt, dass es kein großer Aufwand ist, klassische Python-Funktionalität in Kubeflow-Komponenten zu wandeln. Im nächsten Artikel werde ich die Vorteile etwas genauer betrachten, die eine Vertex AI Pipeline mit sich bringt.

 

 

 

Im nächsten Artikel werde ich die Vorteile etwas genauer betrachten, die eine Vertex AI Pipeline mit sich bringt. 
 

Zu Teil 2

 

 

 

 

Laurenz Reitsam
Consultant
Laurenz ist Data Scientist der sich neben Machine Learning und Datenanalysen auch für DevOps und Infrastruktur begeistert. Er ist davon überzeugt, dass ein Modell nur dann ein gutes Modell sein kann, wenn es seinen Weg in die Produktion schafft.
#Pythonist #GCP #DataScience