FastAPI — это современная, быстрая (высокопроизводительная) веб-инфраструктура для создания API-интерфейсов с Python 3.6+ на основе стандартных подсказок типов Python. Он быстро завоевывает популярность в индустрии разработки программного обеспечения благодаря своей эффективности, простоте и множеству уникальных функций, которые позволяют разработчикам создавать надежные API в рекордно короткие сроки. В этой статье мы сосредоточимся на одной из самых мощных функций FastAPI: фоновых задачах. В частности, мы покажем, как использовать эту функцию для асинхронного выполнения вычислений, в то время как API продолжает реагировать на другие входящие запросы.
Обзор кода
В этом примере рассмотрим приложение FastAPI, которое обрабатывает запросы на вывод. Наш API будет принимать входную полезную нагрузку, планировать выполнение задачи обработки в фоновом режиме и немедленно отвечать смещением для этой задачи. Результат задачи обработки можно получить позже, используя это смещение. Этот тип настройки распространен в сценариях, где вычислительные задачи занимают значительное время, и мы не хотим, чтобы клиент ждал завершения задачи.
Разберем ключевые компоненты кода:
import logging import time from typing import List import uvicorn from fastapi import BackgroundTasks, FastAPI, HTTPException from pydantic import BaseModel, Field app = FastAPI() logging.basicConfig(level=logging.INFO) class InferenceRequest(BaseModel): deployment: str = Field(..., example="Docker") framework: str = Field(..., example="FastAPI") fn: str = Field(..., example="default_fn") class InferenceResponse(BaseModel): offset: int stack = []
Сначала мы импортируем необходимые пакеты и настраиваем базовое приложение FastAPI. Мы определяем две модели Pydantic, InferenceRequest
и InferenceResponse
, чтобы указать схему запроса и ответа для нашего API. Мы также объявляем глобальный список stack
для отслеживания результатов наших фоновых задач.
def process(offset) -> list[str]: for m in ["result1", "result2", "result3"]: time.sleep(10) stack[offset].append(m) stack[offset].append("done")
Затем мы определяем функцию process
, которая заменяет нашу трудоемкую вычислительную задачу. Он просто ждет некоторое время, а затем отправляет набор результатов в stack
с заданным offset
.
@app.post("/schedule", response_model=InferenceResponse) async def inference(request: InferenceRequest, background_tasks: BackgroundTasks): start_time = time.time() stack.append([]) offset = len(stack) - 1 background_tasks.add_task(lambda: process(offset)) return InferenceResponse(offset=offset) @app.get("/messages/{offset}", response_model=List[str]) async def get_messages(offset: int): if offset < 0 or offset >= len(stack): raise HTTPException(status_code=404, detail="Offset not found") return stack[offset]
Наконец, мы определяем две конечные точки: /schedule
и /messages/{offset}
. Конечная точка /schedule
принимает InferenceRequest
, добавляет новую задачу в фоновый режим с помощью background_tasks.add_task()
и немедленно отвечает InferenceResponse
, содержащим offset
. Конечная точка /messages/{offset}
извлекает результаты из stack
для заданного offset
.
Заключение
В этом посте мы узнали, как использовать BackgroundTasks
FastAPI для планирования выполнения трудоемких задач в фоновом режиме, сохраняя при этом отзывчивость API. Эта функция позволяет создавать эффективные и высокопроизводительные API-интерфейсы, способные выполнять задачи различной продолжительности, не блокируя основное приложение. Удачного кодирования!