Google недавно анонсировала Vertex AI, в основном управляемую платформу машинного обучения, которая легко загружается для работы компонентов для выполнения сквозных функций MLOps.

Vertex AI — это не отдельный сервис, а комбинация множества различных сервисов, связанных с ИИ, для маркировки наборов данных, предварительной обработки, хранилища функций, AutoML, записной книжки, конвейера, модели, конечной точки, метаданных и функций мониторинга модели в целом.

В этом блоге вы узнаете, как создать собственный конвейер с помощью Vertex AI AutoML, оценить производительность модели и как мы можем запланировать запуск конвейера через определенный интервал времени.

Начнем

Прежде всего, давайте посмотрим, что представляет собой конвейер машинного обучения для новичков. Практически это инкапсулированный рабочий процесс в целом, где все небольшие шаги, необходимые для выполнения операции ML, записаны как компонент, то есть от разработки до развертывания моделей ML. Это полезно по нескольким причинам, таким как перекос в обучении, проверка схемы, дрейф данных или дрейф концепции, непрерывное обучение, мониторинг моделей и т. д.

Хорошо, теперь давайте напишем код. Я покажу вам простой пример кода набора данных Iris, чтобы вы получили полное представление о том, как MLOps работают в Vertex AI. Затем вы можете написать свой собственный код и выполнять самые сложные задачи в соответствии с требованиями.

Настройка

Во-первых, давайте импортируем необходимые библиотеки:

from typing import NamedTuple
from google.cloud import storage
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component)

from kfp.v2 import compiler
from kfp.v2.google import experimental
from kfp.v2.google.client import AIPlatformClient

Здесь мы импортируем Artifact, Dataset, Input, Model и т. д. из версии 2 Kubeflow, чтобы мы могли передавать эти артефакты между нашими компонентами.

Теперь следующий шаг — настроить среду GCP для того же, чтобы избавиться от головной боли, которая может возникнуть позже.

PROJECT_ID = "pipelinetest-321606"
REGION = "us-central1" #though us-central is cheaper
PIPELINE_ROOT = "gs://pipelinetest_testing/pipeline_root"
!gcloud auth login

Подготовка набора данных

Первым шагом любой проблемы ML является получение данных. Итак, здесь мы используем набор данных радужной оболочки, который я загрузил в корзину GCP, и дополнительно создаем набор табличных данных для построения нашей модели.

create_dataset = gcc_aip.TabularDatasetCreateOp(
    project=project,
    display_name=display_name,
    gcs_source="gs://pipelinetest_testing/iris.csv"
    )

Здесь вы можете видеть, что мы указываем gcp_source как путь к набору данных iris, т. е. местоположение корзины GCP.

Vertex AI также помогает маркировать набор данных, который можно выполнить на панели инструментов, либо индивидуально, либо в случае большого набора данных, вы также можете назначить эту задачу группе людей за определенную плату.

Модель обучения

Поскольку это проблема классификации, мы можем использовать Vertex AutoML для решения этой проблемы. Но мы также можем использовать нашу собственную модель в конвейере для дальнейшей оценки модели. Ниже вы можете увидеть, как мы используем функцию AutoMLTabularTrainingJobRunOp Vertex AI для обучения нашего набора данных.

training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1,
        dataset=create_dataset.outputs["dataset"],
        target_column="target",
    )

Помимо этого, мы также можем передавать параметры, такие как оптимизация_цели, где мы сообщаем нашему процессу обучения, чтобы оптимизировать нашу модель для минимизации среднеквадратичной ошибки (RMSE). "minimize-mae" — Минимизируйте среднюю абсолютную ошибку (MAE). «minimize-rmsle» — также минимизировать среднеквадратичную ошибку журнала (RMSLE).

Написание первого компонента конвейера Vertex AI

Теперь мы напишем наш первый компонент, используя один из изображений на основе глубокого обучения Google, и определим некоторые дополнительные пакеты, которые необходимо установить в дальнейшем, чтобы наш изолированный код работал.

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

Выше мы определили артефакты ввода модели и вывода метрик, которые наш вершинный конвейер может использовать в дополнительных компонентах для полностью подключенного конвейера.

Оценка модели

Теперь мы напишем простой код на Python для наших показателей оценки и возьмем выходные данные журнала, чтобы показать наш результат на панели инструментов Vertex AI после обучения.

    import json
    import logging

    from google.cloud import aiplatform

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is 
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info(
                        "{} < {}; returning False".format(metrics_dict[k], v)
                    )
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aiplatform.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.uri.replace("aiplatform://v1/", "")
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aiplatform.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

После написания компонента следующий шаг — соединить его с основным компонентом конвейера, который мы пишем в компоненте @dsl.pipeline. Здесь мы на самом деле пишем нашу функцию модели AutoML или Custom, беря параметры из ранее определенных компонентов и определяя правильную структуру для нашего конвейера, которая будет выполняться после того, как эта функция будет скомпилирована. Мы также можем передавать условные параметры, например, если точность нашей модели превышает пороговое значение, чтобы конечная точка модели выполнялась только в том случае, если это условие выполняется с помощью функции dsl.Condition.

with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        deploy_op = gcc_aip.ModelDeployOp(  # noqa: F841
            model=training_op.outputs["model"],
            project=project,
            machine_type="n1-standard-4",
        )

Затем мы, наконец, пишем наш код компиляции конвейера, как мы всегда делаем для создания файла json. Этот файл можно повторно использовать для создания конвейера без повторного написания кода, так как в нем хранятся все необходимые инструкции метаданных для развертывания конвейера на сервере.

compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='cancer_pipe.json')
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
                project_id=PROJECT_ID,
                region=REGION
                )

response = api_client.create_run_from_job_spec(
    'cancer_pipe.json',
    enable_caching= True
)

Теперь вы можете перейти к панели мониторинга конвейера Vertex AI и проверить его статус, если он успешен, а в случае сбоя у вас есть возможность просмотреть журнал и отладить свой код. Таким образом, вы можете легко обнаружить потенциальные проблемы и легко их решить.

Вы также можете запланировать запуск вашего конвейера через определенный интервал времени каждый день, и это делается с помощью следующего кода:

SCHEDULE = '30 5 * * *'
TIME_ZONE = "Europe/Istanbul"
api_client.create_schedule_from_job_spec(
    job_spec_path=COMPILED_PIPELINE_PATH,
    schedule=SCHEDULE,
    time_zone=TIME_ZONE,
    # parameter_values=PIPELINE_PARAMETERS
)

Ниже приведен пример архитектуры конвейера сборки.

Вы можете проверить весь код на GitHub в Vertex AI Part 1.

Заключение

Итак, в этом посте мы увидели, как извлекать данные, обучать модель, оценивать ее, передавая параметры Kubeflow, и планировать наш конвейер для автоматического запуска через определенные промежутки времени. В следующем посте вы увидите, как мы можем включить функцию мониторинга модели для проверки асимметрии и дрейфа данных, а также добавить CI (непрерывную интеграцию) в наш конвейер, чтобы он автоматически запускался после обновления кода в корзине GCP.

Ждите следующий пост!!