Просто чтобы уточнить, я новичок в Kafka, поэтому извините, если мои вопросы кажутся недокументированными, я читаю учебники, документы и все, что могу, чтобы понять это.
Я пытаюсь прочитать все значения из GlobalStore, чтобы обновить его значения, а затем использовать StateStore, который уже существует, чтобы поместить эти новые обновленные значения.
Я пытаюсь сделать это, потому что когда я это делаю:
this.stateStore.all();
У меня только 1/10 часть данных, если я правильно понял, это потому что у меня 10 разделов, а ss, читает только один (хотя я не совсем понимаю почему)
Это моя глобальная таблица:
public StreamsBuilder declareTopology(StreamsBuilder builder) {
logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
getInputTopic(),
getDataTopic(),
getToEsTopic());
builder.globalTable(
getDataTopic(),
Consumed.with(Serdes.String(), fooSerdes)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
"foosktable")
.withKeySerde(Serdes.String())
.withValueSerde(fooSerdes)
.withLoggingEnabled(new HashMap<>()));
...
А это addStateStore, который я не могу удалить, потому что он используется в другом месте кода:
...
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("foosktable"),
Serdes.String(),
fooSerdes));
...
return builder;
}
Итак, теоретически я думал о том, чтобы удалить StateStore, который также использует ту же тему, и поместить мои данные, используя одну из моих тем data.process, проблема в том, что этот процессор делает другие вещи с этим StateStore, поэтому Я не могу взорвать его.
Я заблудился здесь, любой свет очень помог бы. Спасибо !