У меня есть носик kafka, скажем, KafkaSpout, который читает сообщение, скажем, msg из темы kafka, и отправляет болт, скажем, Bolt1. Bolt1 разделяет это сообщение на несколько сообщений msg1,msg2,..,msgN и отправляет другому Bolt, скажем < strong>Болт2. Теперь, когда все сообщения msg1,msg2,..,msgN от Bolt1 отправляются на Bolt2 , мне нужно немного обработать сообщение msg. Есть ли способ определить, достигли ли все разделенные сообщения Bolt2?
как определить, все ли выбросы достигли болта?
Ответы (1)
На странице документации Apache Storm Гарантия обработки сообщений говорится о том, что правильно, будет сгенерировано дерево кортежей, которое Storm будет воспроизводить из Spout, если сообщения будут потеряны.
Это происходит с помощью механизма, который Storm вызывает привязку, скажем, у вас есть следующий метод execute.
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word)); //anchoring happening
}
_collector.ack(tuple);
}
Каждый кортеж слов привязывается путем указания входного кортежа в качестве первого аргумента для генерации. Поскольку кортеж слов привязан, кортеж вывода в корне дерева будет воспроизводиться позже, если кортеж слов не удалось обработать ниже по течению. ссылка
Принимая во внимание, что если вы выдали новое слово без включения исходного кортежа,
_collector.emit(new Values(word)); //no anchoring
Испускание кортежа слов таким образом приводит к тому, что он не привязывается. Если кортеж не может быть обработан нижестоящим потоком, корневой кортеж не будет воспроизведен. ссылка