FastAPI — это современная, быстрая (высокопроизводительная) веб-инфраструктура для создания API-интерфейсов с Python 3.6+ на основе стандартных подсказок типов Python. Он быстро завоевывает популярность в индустрии разработки программного обеспечения благодаря своей эффективности, простоте и множеству уникальных функций, которые позволяют разработчикам создавать надежные API в рекордно короткие сроки. В этой статье мы сосредоточимся на одной из самых мощных функций FastAPI: фоновых задачах. В частности, мы покажем, как использовать эту функцию для асинхронного выполнения вычислений, в то время как API продолжает реагировать на другие входящие запросы.

Обзор кода

В этом примере рассмотрим приложение FastAPI, которое обрабатывает запросы на вывод. Наш API будет принимать входную полезную нагрузку, планировать выполнение задачи обработки в фоновом режиме и немедленно отвечать смещением для этой задачи. Результат задачи обработки можно получить позже, используя это смещение. Этот тип настройки распространен в сценариях, где вычислительные задачи занимают значительное время, и мы не хотим, чтобы клиент ждал завершения задачи.

Разберем ключевые компоненты кода:

import logging
import time
from typing import List

import uvicorn
from fastapi import BackgroundTasks, FastAPI, HTTPException
from pydantic import BaseModel, Field

app = FastAPI()

logging.basicConfig(level=logging.INFO)


class InferenceRequest(BaseModel):
    deployment: str = Field(..., example="Docker")
    framework: str = Field(..., example="FastAPI")
    fn: str = Field(..., example="default_fn")


class InferenceResponse(BaseModel):
    offset: int


stack = []

Сначала мы импортируем необходимые пакеты и настраиваем базовое приложение FastAPI. Мы определяем две модели Pydantic, InferenceRequest и InferenceResponse, чтобы указать схему запроса и ответа для нашего API. Мы также объявляем глобальный список stack для отслеживания результатов наших фоновых задач.

def process(offset) -> list[str]:
    for m in ["result1", "result2", "result3"]:
        time.sleep(10)
        stack[offset].append(m)

    stack[offset].append("done")

Затем мы определяем функцию process, которая заменяет нашу трудоемкую вычислительную задачу. Он просто ждет некоторое время, а затем отправляет набор результатов в stack с заданным offset.

@app.post("/schedule", response_model=InferenceResponse)
async def inference(request: InferenceRequest, background_tasks: BackgroundTasks):
    start_time = time.time()
    stack.append([])
    offset = len(stack) - 1
    background_tasks.add_task(lambda: process(offset))
    return InferenceResponse(offset=offset)


@app.get("/messages/{offset}", response_model=List[str])
async def get_messages(offset: int):
    if offset < 0 or offset >= len(stack):
        raise HTTPException(status_code=404, detail="Offset not found")
    return stack[offset]

Наконец, мы определяем две конечные точки: /schedule и /messages/{offset}. Конечная точка /schedule принимает InferenceRequest, добавляет новую задачу в фоновый режим с помощью background_tasks.add_task() и немедленно отвечает InferenceResponse, содержащим offset. Конечная точка /messages/{offset} извлекает результаты из stack для заданного offset.

Заключение

В этом посте мы узнали, как использовать BackgroundTasks FastAPI для планирования выполнения трудоемких задач в фоновом режиме, сохраняя при этом отзывчивость API. Эта функция позволяет создавать эффективные и высокопроизводительные API-интерфейсы, способные выполнять задачи различной продолжительности, не блокируя основное приложение. Удачного кодирования!