Эта статья представляет собой дневник моего недавнего путешествия (которое закончилось только вчера) по использованию KServe (ранее известного как KFserving) и нового GCP Vertex AI.

Отказ от ответственности: я никоим образом не являюсь экспертом в Kubenetes, сетях или облачных платформах. Возможно неправильное использование терминологии, пожалуйста, поправьте меня 😉

Фон

Позвольте мне сначала рассказать немного о моем прошлом опыте. Последние 3 года я работал инженером по глубокому обучению, работая над медицинскими изображениями. И я использовал Kubeflow для обучения и KServe для обслуживания нашей модели в качестве API. Все это было разработано до появления Vertex AI, и в то время GCP предлагала платформу AI только для рабочих процессов ML (что, на мой взгляд, было неудобно использовать, поскольку у нее МНОГО ограничений). Вот почему я выбрал Kubeflow и KServe.

Зачем переходить на Vertex AI

До недавнего времени произошел неприятный инцидент, и мне пришлось переразвернуть все наши API. Я мог бы просто повторно запустить все сценарии развертывания и положить этому конец, однако с тех пор, как GCP представил Vertex AI, я был заинтригован внесенными ими изменениями, поэтому решил попробовать.

Причина перехода на Vertex AI заключается в следующем:

  1. Он находится в консоли GCP, поэтому четкая визуализация GCP по различным показателям доступна в кликах (например, запросов в секунду, продолжительность задержки модели, журналы). Это значительно упрощает для нашей команды доступ к информации об API, такой как контейнер, который использует этот API, показатели трафика и т. д.
  2. API, развернутые в конечной точке Vertex AI, унифицированы и управляются в рамках aiplatform.googleapis.com. Меньше проблем с разрешениями.

Зачем использовать KServe для обслуживания?

Что ж, вы всегда можете создать собственное приложение Flask для своей модели машинного обучения.

  1. Это часть Kubeflow.
  2. Кроме того, я просто ленив, и KServe обрабатывает множество вещей, таких как обработка HTTP, тогда как во Flask я считаю, что вы должны написать это самостоятельно.
  3. KServe предоставил специальные шаблонные методы машинного обучения, такие как предварительная обработка, прогнозирование, постобработка, где после их правильного наследования вы можете беспрепятственно развернуть свою модель.

Поэтапное развертывание

Для простоты я покажу, как развернуть простую модель сегментации изображений с минимальной предварительной и последующей обработкой.

В моем коде используется KFServing версии 0.5, поэтому с этого момента я буду использовать KFServing вместо Kserve для согласованности.

1. Создайте свой собственный контейнер

Я использую модель Tensorflow Keras .h5. Для всех форков Pytorch не волнуйтесь, KServe способен обслуживать почти все типы моделей машинного обучения.

Kfserver и KFModel созданы для упрощения развертывания машинного обучения. Они обрабатывают все основные компоненты обслуживания модели, такие как создание веб-приложения, обработчик работоспособности и живучести и т. д. Вам просто нужно сосредоточиться на своей модели.

Все, что вам нужно сделать, это создать подкласс KFModel, переопределить методы предварительной обработки, прогнозирования и постобработки на свои собственные. Затем, наконец, обслуживайте свою модель с помощью KFServe.

Следующий фрагмент кода — это буквально весь и единственный файл .py в моем контейнере. (и, конечно, это потому, что я делаю это очень просто.)

class KFServingSampleModel(kfserving.KFModel):
    
    def __init__(self, name: str):
        super().__init__(name)
        self.name = name
        self.ready = False
    
    def load(self,model_fp):
        self.model = tf.keras.models.load_model(model_fp,compile=False)
        self.ready = True
        
    def preprocess_inputs(self, img_byte):
        img_raw_byte = tf.io.decode_base64(img_byte)
        img_tensor = tf.io.decode_raw(img_raw_byte,tf.uint8)
        img = tf.reshape(img_tensor,[500,500,3])  
        img = tf.image.resize(img,[320,320])
        img = tf.cast(img,tf.float32)
        img /= 255
        return img
def predict(self, request: Dict):
"""Performs custom prediction.
Preprocesses inputs, then performs prediction using the          trained Keras model.
Args:
instances: A list of prediction input instances
**kwargs: A dictionary of keyword args provided as additional
fields on the predict request body.
Returns:
A list of outputs containing the prediction results.
"""
        inputs = request["instances"]
        # Input follows the Tensorflow V1 HTTP API for binary values
        # https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
        data = inputs[0]["image"]["b64"]
        
        image = self.preprocess_inputs(data)
        
        ds = tf.data.Dataset.from_tensors(image).batch(1)
        
        y_pred = self.model.predict(ds)
        
        y_pred_byte = base64.urlsafe_b64encode(y_pred.ravel()).decode('ascii')
        return json.dumps({"predictions":[y_pred_byte]})
if __name__ == "__main__":
   
    model = KFServingSampleModel("sample-model")
    model_uri = {YOUR MODEL ARTIFACT IN GCS}
    local_model_dir = '/tmp/model'
    os.makedirs(local_model_dir,exist_ok=True)
    # copy model.h5 from GCSto local
    tf.io.gfile.copy(model_uri,local_model_dir,overwrite=True)
    
    local_model_fp = tf.io.gfile.glob(f'{local_model_dir}/*.h5')[0]
    model.load(local_model_fp)
    
    kfserving.KFServer(workers=1).start([model])

Давайте пройдемся по кусочкам,

я. Инициализация класса KFModel

def __init__(self, name: str):
        super().__init__(name)
        self.name = name # name your model
        self.ready = False # the state of the model

Здесь есть два важных атрибута:

Сначала название вашей модели. Вы должны назвать свою модель, чтобы при обслуживании у модели был собственный URL-адрес API. Это очень важно, потому что KFServing поддерживает обслуживание нескольких моделей, это гарантирует, что правильные данные будут предсказаны в правильной модели.

Во-вторых, установите состояние модели в false, потому что мы еще не загрузили нашу модель, это сообщает серверу https, что модель не готова.

ii. Загрузите свою модель

def load(self,model_fp):
    self.model = tf.keras.models.load_model(model_fp,compile=False)
    self.ready = True

Я считаю, что этот фрагмент не требует пояснений, как и ваша обычная функция модели загрузки. После загрузки модели установите для self.ready значение true, чтобы указать, что модель готова получать запросы.

iii. Определите свои функции до/после обработки

def preprocess_inputs(self, img_byte):
    img_raw_byte = tf.io.decode_base64(img_byte)
    img_tensor = tf.io.decode_raw(img_raw_byte,tf.uint8)
    img = tf.reshape(img_tensor,[500,500,3])  
    img = tf.image.resize(img,[320,320])
    img = tf.cast(img,tf.float32)
    img /= 255
    return img

Опять же, это почти идентично вашей обычной функции предварительной обработки, за исключением того, что ввод представляет собой строку байтов. Из-за ограничений Vertex каждый запрос прогнозирования должен иметь размер 1,5 МБ или меньше, а REST API имеет низкую производительность при отправке массивов в качестве запроса. Поэтому ожидается, что функция предварительной обработки получит изображение в кодировке base64, декодирует его и предварительно обработает изображение, как обычно.

iv. Функция прогнозирования

def predict(self, request: Dict):
    """Performs custom prediction.
    Preprocesses inputs, then predict using the trained Keras model.
    Args:
    instances: A list of prediction input instances
    
    Returns:
    A list of outputs containing the prediction results.
    """
    inputs = request["instances"]
    data = inputs[0]["image"]["b64"]
        
    image = self.preprocess_inputs(data)
        
    ds = tf.data.Dataset.from_tensors(image).batch(1)
        
    y_pred = self.model.predict(ds)
    
    # encode the predicted image (a binary mask) to byte string
    y_pred_byte = base64.urlsafe_b64encode(y_pred.ravel()).decode('ascii')
    
   # return a JSON Array as per Vertex AI required. No matter how many instances the model is predicting, it has to be a LIST of predicted values to return.
   return json.dumps({"predictions":[y_pred_byte]})

Функция прогнозирования принимает входной запрос в формате, определяемом Vertex AI. Формат не может измениться, потому что именно так Vertex AI обрабатывает вызовы API. Следовательно, наш код должен адаптировать этот формат запроса.

Ниже приведен один из возможных форматов запроса, который я использовал.

{“instance”: {“image”:{“b64”:base64.b64encode(jpeg_data).decode()}}

Наконец, функция прогнозирования должна возвращать массив JSON, будь то изображение, целое число или строки. Независимо от того, сколько экземпляров прогнозирует модель, она должна возвращать СПИСОК прогнозируемых значений. (эта единственная ошибка стоила мне целого дня 🤦‍♂️)

KFServer автоматически примет все, что было возвращено функцией прогнозирования, и отправит ответ обратно в Vertex AI.

v. Запустите KFServer

Теперь все готово, и вы можете запустить сервер с помощью kfserving.KFServer. Но сначала вам нужно скопировать файл вашей модели .h5 в локальный каталог контейнера, загрузить модель и, наконец, начать обслуживать.

if __name__ == "__main__":
   
    model = KFServingSampleModel("sample-model")
    model_uri = {YOUR MODEL ARTIFACT IN GCS}
    local_model_dir = '/tmp/model'
    os.makedirs(local_model_dir,exist_ok=True)
# copy model.h5 from GCS to local
    tf.io.gfile.copy(model_uri,local_model_dir,overwrite=True)
    
    local_model_fp = tf.io.gfile.glob(f'{local_model_dir}/*.h5')[0]
    
    # load model
    model.load(local_model_fp)
    
    kfserving.KFServer(http_port = 8082, workers=1).start([model])

Если ничего не пойдет не так, вы сможете увидеть это при запуске model_serving.py.

Теперь вы можете отправить запрос на локальное тестирование вашего контейнера.

http://localhost:8082/v1/models/sample-model:predict

2. Загрузить в реестр контейнеров GCP

Чтобы обслуживать модель в Vertex AI, нам нужно создать контейнер и отправить его в реестр контейнеров GCP. Не забудьте открыть порт и определить точку входа в контейнер.

# set base image (host OS)
FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-4:latest
# # install dependencies
RUN pip install --upgrade pip && pip install -U scikit-image kfserving==0.5 h5py==2.10.0
# copy the content of the local directory to the working directory
COPY ./prediction_model_ihc.py ./
EXPOSE 8082
ENTRYPOINT ["python", "prediction_model_ihc.py"]

Следующий фрагмент — это мой скрипт для создания контейнера и отправки его в реестр контейнеров.

#!/bin/sh
image_name=asia.gcr.io/sample/sample_serving
image_tag=latest
full_image_name=${image_name}:${image_tag}
base_image_tag=tf2.4-py3
cd "$(dirname "$0")"
docker build — build-arg BASE_IMAGE_TAG=${base_image_tag} -t “${full_image_name}” 
docker push “$full_image_name”

3. Импорт модели в Vertex AI

Перейдите в Vertex -> Model -> Import Model.

я. Укажите название вашей модели

ii. В настройках модели выберите Импортировать существующий пользовательский контейнер и выберите свой контейнер.

III. Для маршрута прогнозирования введите /v1/models/sample-model:predict. Vertex AI будет перенаправлять запросы на маршрут прогнозирования.

iv. Для маршрута Health введите/v1/models/. Vertex AI время от времени отправляет запросы GET на маршрут Heath, чтобы проверить работоспособность контейнера.

в. В поле Порт введите порт, используемый в вашем коде. В этом примере используется порт 8082.

vi. Импорт.

Импорт модели займет несколько минут.

Вы также можете импортировать свою модель через SDK python платформы ai.

from google.cloud import aiplatform
model = aiplatform.Model.upload(
    project={YOUR PROJECT ID},
    location={YOUR PROJECT LOCATION},
    display_name={MODEL NAME},
    serving_container_image_uri={YOUR CONTAINER IMAGE URI}
    serving_container_predict_route = '/v1/models/sample-model:predict',
    serving_container_health_route = '/v1/models/sample-model')

4. Развертывание модели в конечной точке Vertex AI

После импорта модели вы можете развернуть ее в Endpoint. Перейдите к трем точкам справа от импортированной модели и выберите «Добавить в конечную точку».

я. Создайте новое имя конечной точки.

ii. Определите разделение трафика, автоматическое масштабирование, тип машины в соответствии с вашими потребностями. Обязательно выберите сервисный аккаунт, который имеет доступ к GCS, поскольку файл нашей модели .h5 находится в GCS.

III. Развернуть

Конечная точка должна стать активной через несколько минут.

5.Вуаля! Вы можете идти

Вы можете проверить свою конечную точку, используя этот фрагмент

def endpoint_predict_sample(
                     project: str, 
                     location: str,   
                     instances: list, 
                     endpoint: str):
    aiplatform.init(project=project, location=location)
    endpoint = aiplatform.Endpoint(endpoint)
    prediction = endpoint.predict(instances=instances)
    print(prediction)
    return prediction
test_data = Image.open({PATH TO IMAGE})
test_data = np.array(test_data)[:,:,:3]
img = base64.urlsafe_b64encode(test_data.ravel()).decode()
data = [{'image':{'b64':img}}]
pred = endpoint_predict_sample(project={YOUR PROJECT ID},
                               location={YOUR PROJECT LOCATION}",
                               endpoint = {YOUR ENDPOINT ID},
                               instances=data)

Я очень надеюсь, что эта статья поможет читателю сделать путешествие с KServe и Vertex AI более плавным.