Я пытаюсь использовать addGlobalStore в Kafka DSL, где необходимо хранить несколько значений, к которым мне понадобится глобальный доступ для всех моих потоков / экземпляров.
Моя проблема в том, что мне нужно периодически обновлять эти значения внутри моей топологии и информировать все запущенные потоки о новых значениях.
Я инициализировал глобальное хранилище через builder.addGlobalStore
и использовал функцию init()
процессора, который использовался в качестве последнего аргумента этой функции, но я не могу найти способ обновить значения внутри глобального хранилища.
Следующим шагом в моей топологии является преобразователь, где я могу получить перехват через `` init () '' в глобальном хранилище и прочитать сохраненные значения, но, к сожалению, я не могу обновить их глобально. Я имею в виду, что могу обновить локальную копию для работающего потока, но другие потоки / экземпляры не видят изменения.
Я где-то читал, что это невозможно сделать на Transformer, но даже я использую процессор вместо этого, проблема остается
Итак, есть ли способ обновить globalStateStore в топологии Kafka DSL, и если да, то как это возможно? Или для использования глобального хранилища мне нужно использовать API низкоуровневого процессора?