Google наскоро обяви Vertex AI, основно управлявана платформа за машинно обучение, която е заредена с лесни за работа компоненти за извършване на MLOps функционалност от край до край.

Vertex AI не е единична услуга, а комбинация от много различни услуги, свързани с AI за етикетиране на набор от данни, предварителна обработка, съхраняване на функции, AutoML, бележник, тръбопровод, модел, крайна точка, метаданни и функции за наблюдение на модели като цяло.

В този блог ще научите как да създадете свой собствен конвейер с помощта на Vertex AI AutoML, оценка на производителността на модела и как можем да планираме изпълнението на конвейера на определен интервал от време.

Да започваме

Първо, нека да разгледаме какво представлява тръбопроводът за машинно обучение за новодошлите. На практика това е капсулиран работен процес като цяло, където всички малки стъпки, необходими за извършване на ML операция, са записани като компонент, т.е. от разработването до внедряването на ML модели. Полезно е поради няколко причини, като изкривяване при обслужване на обучение, валидиране на схема, отклонение на данни или концептуално отклонение, непрекъснато обучение, наблюдение на модела и др.

Добре, сега нека напишем малко код. Ще ви покажа прост пример за код на набор от данни на Iris, така че да получите пълна представа за това как работят MLO в Vertex AI. След това можете да напишете свой собствен код и да изпълнявате най-сложните задачи според изискванията.

Настройване

Първо, нека импортираме необходимите библиотеки:

from typing import NamedTuple
from google.cloud import storage
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component)

from kfp.v2 import compiler
from kfp.v2.google import experimental
from kfp.v2.google.client import AIPlatformClient

Тук импортираме артефакт, набор от данни, вход, модел и т.н. от версия 2 на Kubeflow, за да можем да предаваме тези артефакти между нашите компоненти.

Сега следващата стъпка е да настроим GCP среда за същото, за да се отървем от главоболието, което можем да имаме по-късно.

PROJECT_ID = "pipelinetest-321606"
REGION = "us-central1" #though us-central is cheaper
PIPELINE_ROOT = "gs://pipelinetest_testing/pipeline_root"
!gcloud auth login

Подготовка на набор от данни

Първата стъпка от всеки проблем с машинното обучение е получаването на данните. И така, тук използваме набора от данни за ириса, който качих в кофата на GCP, и допълнително създаваме табличен набор от данни за изграждането на нашия модел.

create_dataset = gcc_aip.TabularDatasetCreateOp(
    project=project,
    display_name=display_name,
    gcs_source="gs://pipelinetest_testing/iris.csv"
    )

Тук можете да видите, че даваме gcp_source като път на набор от данни на ириса, т.е. местоположение на GCP кофа.

Vertex AI също помага при етикетирането на набора от данни, което може да се направи в рамките на таблото за управление, индивидуално или в случай на голям набор от данни можете също да възложите тази задача на група хора срещу известна цена.

Модел за обучение

Тъй като това е проблем с класификацията, можем да използваме Vertex AutoML, за да разрешим този проблем. Но можем също така да използваме нашия собствен персонализиран модел в процес на подготовка за по-нататъшна оценка на модела. По-долу можете да видите как използваме функцията AutoMLTabularTrainingJobRunOp на Vertex AI, за да обучим нашия набор от данни.

training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1,
        dataset=create_dataset.outputs["dataset"],
        target_column="target",
    )

Отделно от това, можем също да предаваме параметри като optimization_objective, където казваме на нашия процес на обучение да оптимизира нашия модел за минимизиране на средноквадратична грешка (RMSE). „minimize-mae“ — Минимизиране на средната абсолютна грешка (MAE). „minimize-rmsle“ — минимизиране на средноквадратична логаритмична грешка (RMSLE) също.

Писане на първия компонент на Vertex AI pipeline

Сега ще напишем нашия първи компонент, като използваме едно от базираните на google задълбочено обучение изображения и ще дефинираме някои допълнителни пакети, които трябва да бъдат допълнително инсталирани, за да работи нашият изолиран код.

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

По-горе сме дефинирали входни артефакти на модел и изходни показатели, които нашият върхов конвейер може да използва в допълнителни компоненти за напълно свързан конвейер.

Модел за оценка

Сега ще пишем прост код на Python за нашите показатели за оценка и ще вземем изхода от дневника, за да покажем нашия резултат в таблото за управление на Vertex AI след обучение.

    import json
    import logging

    from google.cloud import aiplatform

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is 
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info(
                        "{} < {}; returning False".format(metrics_dict[k], v)
                    )
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aiplatform.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.uri.replace("aiplatform://v1/", "")
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aiplatform.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

След като напишем компонента, следващата стъпка е да го свържем с главния тръбопроводен компонент, който записваме в @dsl.pipeline компонент. Тук ние всъщност пишем нашата AutoML или персонализирана моделна функция, като вземаме параметри от предварително дефинирани компоненти и дефинираме подходяща структура за нашия конвейер, който ще бъде изпълнен, след като тази функция бъде компилирана. Можем също така да предадем условни параметри, например ако точността на нашия модел е по-голяма от прага, така че крайната точка на модела да се изпълни само ако това условие е изпълнено чрез използване на функцията dsl.Condition.

with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        deploy_op = gcc_aip.ModelDeployOp(  # noqa: F841
            model=training_op.outputs["model"],
            project=project,
            machine_type="n1-standard-4",
        )

След това най-накрая пишем нашия код за компилация на конвейер, както винаги правим, за да генерираме json файл. Този файл може да се използва многократно за създаване на тръбопровод без пренаписване на кода отново, тъй като в него се съхраняват всички необходими инструкции за метаданни за разполагане на тръбопровода върху сървъра.

compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='cancer_pipe.json')
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
                project_id=PROJECT_ID,
                region=REGION
                )

response = api_client.create_run_from_job_spec(
    'cancer_pipe.json',
    enable_caching= True
)

Сега можете да се насочите към таблото за управление на тръбопровода на Vertex AI и да проверите състоянието му, ако е успешно и в случай на неуспех, имате възможност да погледнете дневника и да отстраните грешки в кода си. Следователно можете лесно да забележите потенциални проблеми и лесно да ги разрешите.

Можете също така да планирате изпълнението на тръбопровода си на определен интервал от време всеки ден и това става, като напишете кода по-долу:

SCHEDULE = '30 5 * * *'
TIME_ZONE = "Europe/Istanbul"
api_client.create_schedule_from_job_spec(
    job_spec_path=COMPILED_PIPELINE_PATH,
    schedule=SCHEDULE,
    time_zone=TIME_ZONE,
    # parameter_values=PIPELINE_PARAMETERS
)

По-долу е дадена примерна архитектура на конвейера за изграждане.

Можете да проверите целия код в GitHub на Vertex AI Part 1.

Заключение

И така, в тази публикация видяхме как да извличаме данни, да обучаваме модел, да го оценяваме чрез предаване на параметри на Kubeflow и да планираме нашия конвейер да го изпълнява автоматично на определени интервали. В следващата публикация ще видите как можем да активираме функционалността за наблюдение на модела, за да проверим изкривяването и отклонението на данните и да добавим CI (непрекъснати интеграции) към нашия конвейер, така че да се задейства автоматично, след като кодът се актуализира в GCP група.

Очаквайте следващата публикация!!