Потоковая передача большого отклика в контроллер Micronaut без потери памяти

Мы используем Micronaut с Mongo для предоставления данных через некоторые контроллеры. Поскольку размер объектов ответа растет, нашим приложениям иногда не хватает памяти. Поэтому мы исследуем переход на драйвер async mongo и использование реактивных ответов для потоковой передачи данных клиентам. К сожалению, мы не можем изменить ни структуры ответов API, ни типы контента (все application/json)

Один из наших API вернул сущности со следующей структурой:

[
  { "field": "value" },
  { "field": "value" },
  ...
  { "field": "value" }
]

Мы работали с этим контроллером, где dataStore возвращает Publisher<Example>:

    @Get("all")
    Flowable<Example> getAllExamples() {
        return Flowable.fromPublisher(dataStore.find()).map(SomeMapper::toPublic);
    }

Это прекрасно работает, огромный список примеров не нужно полностью загружать в память перед потоковой передачей его клиенту.

Другие API возвращают (я думаю, более разумную) структуру:

{
  "list": [
    { "field": "value" },
    { "field": "value" },
    ...
    { "field": "value" }
  ],
  "meta": {
    ...
  }
}

Можем ли мы применить аналогичный шаблон «издатель / текучий» для таких сущностей, или мы застряли при загрузке данных для таких ответов в память перед их отправкой?

Мы пробовали подписи вроде:

    @Get("all/dev")
    Single<ExamplesWrapper> getAllDev() {
        Publisher<Example> dev = dataStore.find();
        return Flowable.fromPublisher(dev)
                .map(mapper::map)
                .collect((Callable<ArrayList<Example>>) ArrayList::new, ArrayList::add)
                .map(ExampleWrapper::new);
    }

Где оболочка добавляла бы некоторые метаданные. Но это снова загружает все это в память перед отправкой, что приводит к сбою приложения.

Добавление Flowable в оболочку ответа:


public class ExamplesWrapper {

    private final Flowable<Example> examples;

    @ConstructorProperties({"examples"})
    public ExamplesWrapper(Flowable<Example> examples) {
        this.examples = examples;
    }

    public Flowable<Example> getExamples() {
        return examples;
    }
}

Также не работает с некоторым приятным исключением сопоставления Джексона.

Метаданные не зависят от фактических данных примера (они добавляют некоторую статическую информацию о компании). Можем ли мы каким-то образом реализовать такую ​​конечную точку без необходимости загружать все данные в память?


person drvdijk    schedule 06.03.2021    source источник


Ответы (1)


Из документации:

6.20 Запись данных ответа

Реактивная запись данных ответа

HTTP-сервер Micronaut поддерживает запись блоков данных ответа, возвращая издателя, который излучает объекты, которые могут быть закодированы в HTTP-ответ.

В следующей таблице приведены примеры сигнатур типа возвращаемого значения и поведение сервера при их обработке: Тип возвращаемого значения Описание

  • Flowable ‹byte []›: Flowable, который генерирует каждый фрагмент контента как byte [] без блокировки
  • Flux ‹ByteBuf›: поток реактора, который испускает каждый блок как Netty ByteBuf.
  • Publisher ‹String›: издатель, который передает каждый фрагмент контента в виде строки.
  • Flowable ‹Book› При отправке POJO каждый отправляемый объект по умолчанию кодируется как JSON без блокировки.

При возврате реактивного типа сервер использует кодирование передачи для фрагментов и продолжает записывать данные до тех пор, пока не будет вызван метод Publisher onComplete.

Я понимаю это так, что если вы хотите, чтобы механизм Micronaut транслировал ваши материалы, вам потребуется подпись типа Flowable<item>, Flux<item> или Publisher<item>, где элемент - это часть вашего ответа, а не полный элемент. Затем Micronaut ответит кусками по мере их поступления от Flowable или аналогичного.

В этом случае я подумал о том, что вы можете самостоятельно выполнить разделение на подходящие куски. Таким образом, потоковая передача больших ответов без их буферизации в памяти должна работать.

Так что-то вроде этого:

@Get("all")
public Flowable<String> getAllExamples() {
    ObjectMapper objectMapper = new ObjectMapper();
    Publisher<Example> dev = dataStore.find();
    return Flowable.fromPublisher(dev)
            .map(mapper::map)
            .concatMap(item -> Flowable.just(objectMapper.writeValueAsString(item), ","))
            .startWith("{\"list\": [")
            .concatWith(Flowable.just("],\"meta\":\"whatever\"}"));
}

это взломано, но, похоже, в таком случае работает.


Некоторые подходы, которые не сработали:

Я выполнил тестовую запись непосредственно в JsonGenerator в настраиваемом картографе Джексона, сбрасывая объекты по мере их поступления, как описано в jackson потоковый API, но micronaut RoutingInboundHandler, похоже, не сбрасывает ответ обратно конечному пользователю, а буферизует его, что приводит к нехватке памяти. Подход работает с Spring Boot, поэтому в Micronaut, возможно, этого не хватает.

Такая же буферизация произошла и при использовании Micronaut Writeable (блокировка) ответов и попытка сбросить данные по мере их написания. Я открыл вопрос об этом для ядра micronaut.

person eis    schedule 08.03.2021
comment
Спасибо за это, хотя он действительно хакерский и требует некоторой корректировки (в списке есть завершающий ,), в данном случае он решил нашу проблему :) - person drvdijk; 19.03.2021