Как заставить потребителей работать в Kafka 0.8 API

Я собираюсь написать прототип для публикации и использования сообщений kafka. У нас уже настроена инфраструктура Cloudera (зоопарки, брокеры и т. д.), и я уже успешно поиграл с инструментами командной строки Kafka для создания и использования сообщений.

Я использую [org.apache.kafka/kafka_2.10 "0.8.2.1"] в качестве зависимости и уже смог использовать клиентский API для настройки KafkaProducer, который публикует сообщения с простым строковым содержимым и может быть успешно прочитан потребителем командной строки на другой стороне.

Мой вопрос: есть ли один пример кода в интернетах, чтобы показать, как инициализировать KafkaConsumer, и прочитайте это сообщение на другой стороне, потому что я искал его несколько дней, и ни один из примеры кода вроде бы работают:

  • Они используют классы или методы, которые даже не существуют в самом API (например, они, по-видимому, передают карту свойств в конструктор org.apache.kafka.clients.consumer.ConsumerConfig, но такого конструктора не существует;
  • вызов статического метода createJavaConsumerConnector для класса kafka.consumer.Consumer... в какой вселенной существуют эти вещи?).

И обычно каждый пример выглядит чрезвычайно сложным. Я ожидаю, что инфраструктуре обмена сообщениями потребуется несколько строк конфигурации для подключения к брокерам и некоторая функция для помещения и извлечения в/из очереди или темы. Настройка Producer для Kafka не была чем-то чрезвычайно сложным, и я ожидал, что Consumer будет таким же.

Также кажется, что я не одинок с это.


person Daniel Dinnyes    schedule 19.05.2016    source источник
comment
В разных версиях Kafka есть изменения API. Следите за версией примеров, которые вы просматриваете. Посмотрите здесь API 0.8.2 webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/ Если это не поможет, задайте более точный вопрос...   -  person Matthias J. Sax    schedule 21.05.2016
comment
@MatthiasJ.Sax Итак, если я правильно понимаю, вывод состоит в том, что Kafka определенно не то, что вы бы рекомендовали для запуска в производство на данном этапе. Раньше я использовал много систем обмена сообщениями, и они не слишком сложны, вы можете put и take из очереди/темы после нескольких строк конфигурации/аутентификации (1 или 2, а не 100), и все. Kafka приближается к этому набору функций, но я бы постарался не называть ее платформой обмена сообщениями без этих самых базовых функций.   -  person Daniel Dinnyes    schedule 21.05.2016
comment
@MatthiasJ.Sax Что касается конкретных примеров, я написал зависимость maven, с которой я работаю, и пример кода, на который я ссылался (для версии 0.8.0 и на основе концепции семантическое управление версиями, я ожидаю, что оно будет работать с версией моей зависимости maven), но оно даже не компилируется. Если бы вы могли указать мне пример кода, который работает с 0.8.2.1., я был бы очень признателен.   -  person Daniel Dinnyes    schedule 21.05.2016
comment
Kafka полностью готова к производству — уже какое-то время. См. здесь список компаний, использующих его: cwiki.apache.org/confluence/ display/KAFKA/Powered+By Конечно, вам нужно помнить о своем сценарии использования, и может случиться так, что какая-то другая система лучше всего соответствует вашим требованиям. Примеры для 0.8.2 смотрите здесь: github.com/apache/kafka/tree/0.8.2/examples/src/main/java/kafka/   -  person Matthias J. Sax    schedule 22.05.2016
comment
Вот хорошее сравнение Kafka и AMQP (смотреть с 28:45) infoq.com /презентации/event-streams-kafka   -  person Matthias J. Sax    schedule 22.05.2016
comment
@MatthiasJ.Sax Интересно, почему в большинстве случаев мы думаем, что Кафка — это решение нашей проблемы? вопрос задан, ответ эти ведущие компании используют его, мы должны иметь его слишком часто всплывает. Не уверен, что это веский аргумент против того факта, что я никогда не видел примера кода, который компилируется для 8.2.1. и способен читать из темы Кафки. Также кажется, что у нас разные взгляды на то, что такое готовая продукция. Другой вопрос, почему релиз 0.x где-то находится в производстве.   -  person Daniel Dinnyes    schedule 22.05.2016
comment
@MatthiasJ.Sax Спасибо за ссылку на примеры кода, я попробую их.   -  person Daniel Dinnyes    schedule 22.05.2016


Ответы (1)


Во-первых, я хочу упомянуть, что между Kafka 0.8.0, 0.8.1 и 0.8.2 есть несколько изменений API (главное переписывание и упрощение произошло для 0.9.0 и 0.10.0) - таким образом, ваш вопрос немного открыт, просто спрашивая 0.8.

Чтобы написать потребителя Java для 0.8.2.2, вам нужно включить зависимость:

Это для Scala 2.11 — доступны и другие версии Scala.

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.8.2.2</version>
</dependency>

Не используйте kafka-clients в качестве идентификатора артефакта для версии 0.8.x.

Минимальный пример для потребителя, получающего <String,String> парных сообщений "ключ-значение" и выводящего их на stdout, выглядит следующим образом:

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "myGroup");

        final String topic = "test";

        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1)); // number of consumer threads

        KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(topic).get(0);

        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        // infinite loop
        while(it.hasNext()) {
            System.out.println(new String(it.next().message()));
        }

        // non-reachable code...
        consumer.shutdown();
    }
}

Полный пример — использование нескольких потребительских потоков, включая правильное завершение работы — можно найти здесь: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Чтобы проверить это, следуйте краткому руководству и отправляйте сообщения через консоль Kafka. -продюсер.

person Matthias J. Sax    schedule 23.05.2016
comment
Пробовал пример, к сожалению все та же проблема: против 0.8.2 не компилируется. нет такого класса как kafka.consumer.ConsumerConfig, ближайший org.apache.kafka.clients.consumer.ConsumerConfig у которого нет такого конструктора как в пример. - person Daniel Dinnyes; 23.05.2016
comment
Интересно org.apache.kafka .clients.consumer.ConsumerConfig, по-видимому, определен в Kafka 0.9.0.1 API, что определенно не то, что я использую. - person Daniel Dinnyes; 23.05.2016
comment
Я не уверен, почему это не работает для вас. kafka.consumer.ConsumerConfig есть: github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/ - person Matthias J. Sax; 23.05.2016
comment
Вы включили зависимости kafka-clients? mvnrepository.com/artifact/org.apache.kafka/kafka-clients - person Matthias J. Sax; 23.05.2016
comment
Нет, я использую только зависимость, упомянутую в вопросе. Кроме того, только что напечатал транзитивное дерево зависимостей, и оно не показывает kafka-clients ни одной версии. - person Daniel Dinnyes; 23.05.2016
comment
kafka-clients зависит от kafka, а не наоборот. Таким образом, дерево зависимостей этого не показывает. Включение kafka-clients должно решить вашу проблему с отсутствующим классом ConsumerConfig. - person Matthias J. Sax; 23.05.2016
comment
Пробовал с org.apache.kafka/kafka-clients 0.8.2.1 как зависимость от пути к классам. По-прежнему не существует пространства имен, такого как kafka.consumer. С какой зависимостью от maven вы ее пробовали? - person Daniel Dinnyes; 23.05.2016
comment
Согласно файлу jar (из ссылки на репозиторий mvn сверху) это org/apache/kafka/clients/consumer/ConsumerConfig.class - person Matthias J. Sax; 23.05.2016
comment
Вы правы - это немного странно... Не знаю, почему импорт неверен... - person Matthias J. Sax; 23.05.2016
comment
К сожалению, у org.apache.kafka.clients.consumer.ConsumerConfig еще нет такого конструктора, как в примере. Пример, возможно, был написан в альтернативной вселенной, а не в нашей. Хорошее видео, Кафка против AMQP. - person Daniel Dinnyes; 24.05.2016
comment
Итак, остались какие-либо идеи, какая зависимость мне нужна, которая имеет класс kafka.consumer.ConsumerConfig? Как я уже сказал, раньше мне удавалось заставить работать производителя, поэтому мне просто любопытно, по крайней мере, зависимость, которую я использую, подходит для получения половины предполагаемой цели работы Kafka. - person Daniel Dinnyes; 24.05.2016
comment
Во-первых, я хочу извиниться за всю путаницу в этой теме... Я никогда раньше не использовал Kafka 0.8.x. Тем не менее, я покопался в теме и нашел правильный ответ. Заменил мой предыдущий ответ совершенно новым... (также проверил пример на своем ноутбуке). - person Matthias J. Sax; 25.05.2016
comment
Использование версии 0.8.2.2 определенно выглядит лучше, так как пространство имен kafka.consumer по крайней мере существует. Я дошел до того момента, когда вернул ConsumerIterator из KafkaStream. Проблема сейчас в том, что я никогда ничего не получаю обратно от него. hasNext всегда возвращает true, а next никогда ничего не возвращает (продолжает работать без тайм-аута). Я публикую с помощью своего кода издателя с другого терминала, и если я использую консоль-потребитель, он получает сообщения, поэтому я не понимаю, почему итератор не возвращается. - person Daniel Dinnyes; 27.05.2016
comment
Нет сообщения об ошибке? Вы проверяли журналы? Трудно сказать... Можешь попробовать отправить через производителя консолей? Вы дважды проверяли название темы и т.д.? - person Matthias J. Sax; 27.05.2016
comment
hasNext по-видимому, всегда возвращает true, даже до того, как я отправил какое-либо сообщение, а next просто никогда не завершается, и мне нужно выйти. Когда я нарушаю, я получаю IllegalMonitorStateException: java.util.concurrent.locks.ReentrantLock$Sync.tryRelease (:-1), но я не думаю, что это важно. - person Daniel Dinnyes; 27.05.2016
comment
Если я попытаюсь получить сообщение еще раз, оно скажет IllegalStateException: Iterator is in failed state kafka.utils.IteratorTemplate.hasNext (IteratorTemplate.scala:54) - person Daniel Dinnyes; 27.05.2016
comment
Без понятия. Пример у меня работает. Вы используете точно такой же код? - person Matthias J. Sax; 30.05.2016