как использовать globalKtable и StateStore в одной теме?

Просто чтобы уточнить, я новичок в Kafka, поэтому извините, если мои вопросы кажутся недокументированными, я читаю учебники, документы и все, что могу, чтобы понять это.

Я пытаюсь прочитать все значения из GlobalStore, чтобы обновить его значения, а затем использовать StateStore, который уже существует, чтобы поместить эти новые обновленные значения.

Я пытаюсь сделать это, потому что когда я это делаю:

this.stateStore.all();

У меня только 1/10 часть данных, если я правильно понял, это потому что у меня 10 разделов, а ss, читает только один (хотя я не совсем понимаю почему)

Это моя глобальная таблица:

    public StreamsBuilder declareTopology(StreamsBuilder builder) {

        logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
                getInputTopic(),
                getDataTopic(),
                getToEsTopic());

        builder.globalTable(
                getDataTopic(),
                Consumed.with(Serdes.String(), fooSerdes)
                        .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
                Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
                        "foosktable")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(fooSerdes)
                        .withLoggingEnabled(new HashMap<>()));
    ...

А это addStateStore, который я не могу удалить, потому что он используется в другом месте кода:

       ...

       builder.addStateStore(
            Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("foosktable"),
                    Serdes.String(),
                    fooSerdes));
    ...

    return builder;
}

Итак, теоретически я думал о том, чтобы удалить StateStore, который также использует ту же тему, и поместить мои данные, используя одну из моих тем data.process, проблема в том, что этот процессор делает другие вещи с этим StateStore, поэтому Я не могу взорвать его.

Я заблудился здесь, любой свет очень помог бы. Спасибо !


person Russo    schedule 04.11.2019    source источник
comment
Я прошу вас включить необходимые импорты для запуска вашего кода; А.К.А. минимальный, воспроизводимый пример Прямая цитата: Complete — предоставьте все части, которые нужны кому-то еще для воспроизведения вашей проблемы в сам вопрос   -  person FailingCoder    schedule 04.11.2019
comment
@FailingCoder спасибо за комментарий, я постараюсь это сделать, проблема в том, что в этих проектах используется множество библиотек моей компании и других зависимостей, но я постараюсь отредактировать его.   -  person Russo    schedule 04.11.2019
comment
Если вы не понимаете код, он в основном бесполезен для вас, пока вы не поймете. Это проект, в котором используется множество библиотек; Не жертвуйте ясностью ради краткости при создании минимального примера.   -  person FailingCoder    schedule 04.11.2019


Ответы (1)


Немного неясно, чего вы на самом деле пытаетесь достичь. Тем не менее, несколько объяснений высокого уровня:

GlobalKTable имеет только одну цель: читать данные без изменений из темы, чтобы позволить либо выполнить KStream-GlobalKTable-соединение, либо запросить хранилище с помощью «интерактивных запросов».

Следовательно, вы не можете делать то, что хотите, поскольку копирование данных из глобального хранилища в другое хранилище невозможно так, как вы этого хотите. Вам нужно будет продублировать входную тему и прочитать ее дважды: (1) как GlobalKTable и (2) как обычную KStream, чтобы изменить данные перед тем, как поместить их в хранилище. Для (2) вы можете использовать transform().

Надеюсь это поможет.

person Matthias J. Sax    schedule 05.11.2019
comment
Да, я собираюсь использовать этот подход, я думал об этом, но не был уверен, спасибо! - person Russo; 12.11.2019