Количество слов Kafka не обновляется

Я начинаю экспериментировать с Kafka Streams. Я подписан на https://kafka.apache.org/0110/documentation/streams/quickstart< /а>.

Моя песочница — это ящик с Ubuntu 16.04.2 LTS, Kafka 0.11.0.0 и Scala 2.11.11.

Как объясняется в руководстве по быстрому запуску Kafka Streams, я выполнил следующие шаги:

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-file-input

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

При просмотре вывода streams-wordcount с помощью последней команды мой стандартный вывод показывает следующее:

all 1
streams 1
lead    1
to  1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

Затем, не прерывая команду bin/kafka-console-consumer.sh, я повторно запускаю производителя консоли следующим образом:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

Я удивлен, что стандартный вывод не меняется, чтобы отразить изменения, внесенные этим новым дополнением. Насколько я понимаю, файл-input.txt использовался для получения дополнительных данных, поэтому количество слов должно было обновиться (теперь все токены должны учитываться дважды). Что не так с моими рассуждениями?


person SCO    schedule 30.08.2017    source источник
comment
и, конечно же, bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo все это время работает? Просто для двойной проверки вам также следует запустить потребителя в теме streams-file-input, чтобы убедиться, что вы действительно добавляете туда новые значения...   -  person Frederic A.    schedule 30.08.2017
comment
о, о ... Я не заметил, что WordCountDemo больше не работает. Запустив его снова, вывод выглядел правильно. Благодарю вас ! Однако bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo останавливается примерно через 5 секунд. В моем понимании, он должен работать вечно. Я что-то упустил?   -  person SCO    schedule 30.08.2017


Ответы (1)


Демонстрация подсчета слов предназначена для остановки через 5 секунд, как показано в источнике: https://github.com/apache/kafka/blob/0.11.0.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java#L88

См. последнюю версию вышеуказанного источника, которая не останавливается через 5 секунд, а только при нажатии ctrl-c: https://github.com/apache/kafka/blob/0.11.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

person Frederic A.    schedule 30.08.2017