Тази статия е дневник на скорошното ми пътуване (което току-що завърши вчера) по използването на KServe (известен преди като KFserving) и новия GCP Vertex AI.

Отказ от отговорност: Не съм експерт по Kubenetes или мрежи или облачна платформа по никакъв начин. Възможно е да има злоупотреба с терминологията, моля, не се колебайте да ме поправите 😉

Заден план

Нека първо да дам малко контекст на моя минал опит, прекарах последните си 3 години като инженер по задълбочено обучение, работейки върху медицински изображения. И използвах Kubeflow за обучение и KServe за обслужване на нашия модел като API. Всичко това е разработено преди съществуването на Vertex AI и по това време GCP предлага само AI платформа за работни потоци на ML (което по мое мнение беше трудно да се използва, тъй като има МНОГО ограничения). Ето защо избрах Kubeflow и KServe.

Защо да преминете към Vertex AI

Доскоро се случи нещастен инцидент и трябваше да преразпределя всички наши API. Можех просто да стартирам отново всички скриптове за внедряване и да го прекратя, но откакто GCP представи Vertex AI, бях заинтригуван от промените, които са направили, така че решавам да опитам.

Обосновката зад преминаването към Vertex AI е, че:

  1. Той е на конзолата на GCP и по този начин изрядната визуализация на различни показатели на GCP е достъпна в кликвания (напр.Заявки/секунда, продължителност на забавяне на модела, регистрационни файлове). Това прави много лесен за нашия екип достъп до информация за API, като например кой контейнер използва този API, показатели за трафик и т.н.
  2. Приложните програмни интерфейси (API), внедрени към крайната точка на Vertex AI, са унифицирани и управлявани в обхвата на aiplatform.googleapis.com. По-малко проблеми с разрешенията.

Защо да използвате KServe за обслужване?

Е, винаги можете да създадете свое собствено приложение Flask, което да обслужва вашия ML модел.

  1. Това е част от Kubeflow.
  2. Освен това просто съм мързелив и KServe се справя с много неща, като обработка на HTTP, докато във Flask вярвам, че трябва да го напишете сами.
  3. KServe предостави специфични за ML методи на шаблони, като предварителна обработка, прогнозиране, последваща обработка, където след като ги наследите правилно, можете безпроблемно да внедрите своя модел.

Внедрявания стъпка по стъпка

За да опростя нещата, ще демонстрирам как да разположа прост модел за сегментиране на изображение с минимална предварителна и последваща обработка.

Моят код използва KFServing версия 0.5, така че отсега нататък ще използвам KFServing вместо Kserve в името на последователността.

1. Създайте свой персонализиран контейнер

Моделът, който използвам, е модел Tensorflow Keras .h5. За всички разклонения на Pytorch, не се притеснявайте, KServe е способен да сървърира почти всеки тип ML модел.

Kfserver и KFModel са създадени, за да улеснят вашето ML внедряване. Те се справят с всички основни компоненти на обслужването на модела, като например създаване на уеб приложение, манипулатор на Health и Liveness и т.н. Просто трябва да се съсредоточите върху вашия модел.

Всичко, което трябва да направите, е да създадете подклас на KFModel, да замените метода за предварителна обработка, прогнозиране и последваща обработка към свой собствен. След това най-накрая сервирайте вашия модел с помощта на KFServe.

Следният кодов фрагмент е буквално целият и единственият .py файл в моя контейнер. (и разбира се, това е така, защото го поддържам наистина прост.)

class KFServingSampleModel(kfserving.KFModel):
    
    def __init__(self, name: str):
        super().__init__(name)
        self.name = name
        self.ready = False
    
    def load(self,model_fp):
        self.model = tf.keras.models.load_model(model_fp,compile=False)
        self.ready = True
        
    def preprocess_inputs(self, img_byte):
        img_raw_byte = tf.io.decode_base64(img_byte)
        img_tensor = tf.io.decode_raw(img_raw_byte,tf.uint8)
        img = tf.reshape(img_tensor,[500,500,3])  
        img = tf.image.resize(img,[320,320])
        img = tf.cast(img,tf.float32)
        img /= 255
        return img
def predict(self, request: Dict):
"""Performs custom prediction.
Preprocesses inputs, then performs prediction using the          trained Keras model.
Args:
instances: A list of prediction input instances
**kwargs: A dictionary of keyword args provided as additional
fields on the predict request body.
Returns:
A list of outputs containing the prediction results.
"""
        inputs = request["instances"]
        # Input follows the Tensorflow V1 HTTP API for binary values
        # https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
        data = inputs[0]["image"]["b64"]
        
        image = self.preprocess_inputs(data)
        
        ds = tf.data.Dataset.from_tensors(image).batch(1)
        
        y_pred = self.model.predict(ds)
        
        y_pred_byte = base64.urlsafe_b64encode(y_pred.ravel()).decode('ascii')
        return json.dumps({"predictions":[y_pred_byte]})
if __name__ == "__main__":
   
    model = KFServingSampleModel("sample-model")
    model_uri = {YOUR MODEL ARTIFACT IN GCS}
    local_model_dir = '/tmp/model'
    os.makedirs(local_model_dir,exist_ok=True)
    # copy model.h5 from GCSto local
    tf.io.gfile.copy(model_uri,local_model_dir,overwrite=True)
    
    local_model_fp = tf.io.gfile.glob(f'{local_model_dir}/*.h5')[0]
    model.load(local_model_fp)
    
    kfserving.KFServer(workers=1).start([model])

Нека преминем през него малко по малко,

и. Инициализация на класа KFModel

def __init__(self, name: str):
        super().__init__(name)
        self.name = name # name your model
        self.ready = False # the state of the model

Тук има два важни атрибута:

Първо името на вашия модел. Трябва да наименувате модела си така, че при обслужване моделът да има собствен URL адрес на API. Това е от решаващо значение, тъй като KFServing поддържа обслужване на множество модели, което гарантира, че точните данни се предвиждат в правилния модел.

Второ, задайте състоянието на модела на false, тъй като все още не сме заредили нашия модел, това казва на https сървъра, че моделът не е готов.

ii. Заредете своя модел

def load(self,model_fp):
    self.model = tf.keras.models.load_model(model_fp,compile=False)
    self.ready = True

Вярвам, че този фрагмент е доста разбираем, точно като вашата нормална функция за модел на зареждане. След като моделът е зареден, задайте self.ready на true, за да посочите, че моделът е готов да получава заявки.

iii. Определете вашите функции преди/след процес

def preprocess_inputs(self, img_byte):
    img_raw_byte = tf.io.decode_base64(img_byte)
    img_tensor = tf.io.decode_raw(img_raw_byte,tf.uint8)
    img = tf.reshape(img_tensor,[500,500,3])  
    img = tf.image.resize(img,[320,320])
    img = tf.cast(img,tf.float32)
    img /= 255
    return img

Отново, това е почти идентично с вашата обичайна функция за предварителна обработка, с изключение на това, че входът е байтов низ. Тъй като Vertex е ограничен, всяка заявка за прогнозиране трябва да бъде 1,5 MB или по-малка, а REST API има ниска производителност при изпращане на масиви като заявка. Следователно се очаква функцията за предварителна обработка да получи кодирано base64 изображение, да го декодира и да го обработи предварително както обикновено.

iv. Функцията Predict

def predict(self, request: Dict):
    """Performs custom prediction.
    Preprocesses inputs, then predict using the trained Keras model.
    Args:
    instances: A list of prediction input instances
    
    Returns:
    A list of outputs containing the prediction results.
    """
    inputs = request["instances"]
    data = inputs[0]["image"]["b64"]
        
    image = self.preprocess_inputs(data)
        
    ds = tf.data.Dataset.from_tensors(image).batch(1)
        
    y_pred = self.model.predict(ds)
    
    # encode the predicted image (a binary mask) to byte string
    y_pred_byte = base64.urlsafe_b64encode(y_pred.ravel()).decode('ascii')
    
   # return a JSON Array as per Vertex AI required. No matter how many instances the model is predicting, it has to be a LIST of predicted values to return.
   return json.dumps({"predictions":[y_pred_byte]})

Функцията за прогнозиране приема входна заявка във формат, дефиниран от Vertex AI. Форматът не може да се промени, защото това е начинът, по който Vertex AI обработва API повикванията. Следователно нашият код трябва да адаптира този формат на заявка.

По-долу е един от възможните формати на заявка, които използвах.

{“instance”: {“image”:{“b64”:base64.b64encode(jpeg_data).decode()}}

И накрая, функцията за прогнозиране трябва да върне JSON масив, независимо дали е изображение, цяло число или низове. Без значение колко екземпляра прогнозира моделът, той трябва да бъде СПИСЪК от прогнозирани стойности, които да върне. (тази единствена грешка ми коства цял следобед 🤦‍♂️)

KFServer автоматично ще вземе всичко, върнато от функцията за прогнозиране, и ще изпрати отговор обратно на Vertex AI

v. Стартирайте KFServer

Вече всичко е готово и можете да стартирате сървъра с помощта на kfserving.KFServer. Но първо трябва да копирате вашия модел .h5 файл в локалната директория на контейнера, да заредите модела и накрая да започнете да го обслужвате.

if __name__ == "__main__":
   
    model = KFServingSampleModel("sample-model")
    model_uri = {YOUR MODEL ARTIFACT IN GCS}
    local_model_dir = '/tmp/model'
    os.makedirs(local_model_dir,exist_ok=True)
# copy model.h5 from GCS to local
    tf.io.gfile.copy(model_uri,local_model_dir,overwrite=True)
    
    local_model_fp = tf.io.gfile.glob(f'{local_model_dir}/*.h5')[0]
    
    # load model
    model.load(local_model_fp)
    
    kfserving.KFServer(http_port = 8082, workers=1).start([model])

Ако нищо не се обърка, трябва да можете да видите това, когато стартирате model_serving.py.

Сега можете да изпратите заявка за тестване на вашия контейнер локално.

http://localhost:8082/v1/models/sample-model:predict

2. Качване в регистъра на контейнерите на GCP

За да обслужваме модела на Vertex AI, трябва да изградим контейнер и да го изпратим в регистъра на контейнерите на GCP. Не забравяйте да разкриете порта и да определите вашата входна точка на контейнера.

# set base image (host OS)
FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-4:latest
# # install dependencies
RUN pip install --upgrade pip && pip install -U scikit-image kfserving==0.5 h5py==2.10.0
# copy the content of the local directory to the working directory
COPY ./prediction_model_ihc.py ./
EXPOSE 8082
ENTRYPOINT ["python", "prediction_model_ihc.py"]

Следващият фрагмент е моят скрипт за изграждане на контейнер и изпращането му в регистъра на контейнерите.

#!/bin/sh
image_name=asia.gcr.io/sample/sample_serving
image_tag=latest
full_image_name=${image_name}:${image_tag}
base_image_tag=tf2.4-py3
cd "$(dirname "$0")"
docker build — build-arg BASE_IMAGE_TAG=${base_image_tag} -t “${full_image_name}” 
docker push “$full_image_name”

3. Импортирайте модела във Vertex AI

Насочете се къмVertex -› Model -› Import Model.

аз. Определете името на вашия модел

ii. В настройките на модела изберете Импортиране на съществуващ персонализиран контейнер и изберете своя контейнер

iii. За маршрут на прогнозиране въведете /v1/models/sample-model:predict. Vertex AI ще препраща заявките към маршрута за прогнозиране.

iv. За Health route въведете/v1/models/. Vertex AI от време на време изпраща GET заявки към Heath route, за да провери дали контейнерът е здрав.

ср. За порт въведете порта, използван във вашия код. В този пример портът е 8082.

vi. Импортиране.

Импортирането на модела ще отнеме няколко минути.

Можете също да импортирате вашия модел чрез ai platform python SDK

from google.cloud import aiplatform
model = aiplatform.Model.upload(
    project={YOUR PROJECT ID},
    location={YOUR PROJECT LOCATION},
    display_name={MODEL NAME},
    serving_container_image_uri={YOUR CONTAINER IMAGE URI}
    serving_container_predict_route = '/v1/models/sample-model:predict',
    serving_container_health_route = '/v1/models/sample-model')

4. Разположете модела в крайна точка на Vertex AI

След като моделът бъде импортиран, можете да го внедрите в крайна точка. Отидете до трите точки отдясно на вашия импортиран модел и изберете добавяне към крайна точка.

аз. Създайте ново име на крайна точка.

ii. Определете разделяне на трафика, автоматично мащабиране, тип машина според вашите нужди. Уверете се, че сте избрали акаунт за услуга, който има достъп до GCS, тъй като нашият модел .h5 файл е в GCS.

iii. Разположи

Крайната точка трябва да е активна след няколко минути.

5. Ето! Готови сте да тръгватете

Можете да тествате вашата крайна точка с помощта на този фрагмент

def endpoint_predict_sample(
                     project: str, 
                     location: str,   
                     instances: list, 
                     endpoint: str):
    aiplatform.init(project=project, location=location)
    endpoint = aiplatform.Endpoint(endpoint)
    prediction = endpoint.predict(instances=instances)
    print(prediction)
    return prediction
test_data = Image.open({PATH TO IMAGE})
test_data = np.array(test_data)[:,:,:3]
img = base64.urlsafe_b64encode(test_data.ravel()).decode()
data = [{'image':{'b64':img}}]
pred = endpoint_predict_sample(project={YOUR PROJECT ID},
                               location={YOUR PROJECT LOCATION}",
                               endpoint = {YOUR ENDPOINT ID},
                               instances=data)

Наистина се надявам тази статия да помогне на читателя да има по-плавно пътуване с KServe и Vertex AI.