Напишите в GlobalStateStore на Kafka Streams

Я пытаюсь использовать addGlobalStore в Kafka DSL, где необходимо хранить несколько значений, к которым мне понадобится глобальный доступ для всех моих потоков / экземпляров.

Моя проблема в том, что мне нужно периодически обновлять эти значения внутри моей топологии и информировать все запущенные потоки о новых значениях.

Я инициализировал глобальное хранилище через builder.addGlobalStore и использовал функцию init() процессора, который использовался в качестве последнего аргумента этой функции, но я не могу найти способ обновить значения внутри глобального хранилища.

Следующим шагом в моей топологии является преобразователь, где я могу получить перехват через `` init () '' в глобальном хранилище и прочитать сохраненные значения, но, к сожалению, я не могу обновить их глобально. Я имею в виду, что могу обновить локальную копию для работающего потока, но другие потоки / экземпляры не видят изменения.

Я где-то читал, что это невозможно сделать на Transformer, но даже я использую процессор вместо этого, проблема остается

Итак, есть ли способ обновить globalStateStore в топологии Kafka DSL, и если да, то как это возможно? Или для использования глобального хранилища мне нужно использовать API низкоуровневого процессора?


person ypanag    schedule 21.05.2019    source источник


Ответы (2)


Я инициализировал глобальное хранилище через builder.addGlobalStore и использовал функцию init () процессора, который использовался в качестве последнего аргумента этой функции, но я не могу найти способ обновить значения внутри глобального хранилища.

Вы не можете напрямую обновить глобальный магазин. Вместо этого вы должны обновить (= написать сообщение) основную тему этого глобального хранилища.

person Michael G. Noll    schedule 22.05.2019

Если это соответствует вашим потребностям, вы, вероятно, могли бы использовать GlobalKTable вместо GlobalStore.

person dmkvl    schedule 21.05.2019
comment
Когда можно использовать GlobalKTable против GlobalStateStore? - person prixa; 15.06.2020