Компенсация среды потребительского кластера Kafka

Я пытаюсь сделать так, чтобы x потребителей обращались к указанной теме в kafka, но не использовали одни и те же сообщения. Я хочу например ...

Смещение подбора потребителя 1 1 Смещение подбора потребителя 2 2 Смещение подбора потребителя 1 3 Смещение подбора потребителя 2 4

Я хочу, чтобы кафка действовал как очередь для этих двух потребителей. Я заметил конфигурацию group.id и предположил, что вы можете использовать ту же группу, и она будет обрабатывать ее соответствующим образом, но, похоже, она не работает так, как я думал.

Вот код, который я использую ...

     public void init(){
            Properties props = new Properties();
            props.put("bootstrap.servers", kafkaUrl);
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("enable.auto.commit", "true");
            props.put("group.id", "group1");
            props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());

            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("event1", "event2"));

            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
     }

     public void pollTopics() {
        try {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

            for (ConsumerRecord<String, String> record : records) {
                AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
                if(processor != null) {
                    kafkaThreadPool.execute(processor);
                }
            }
        }catch (Exception e){
            LOG.error("Polling exception occurred", e);
        }
    }

Я хочу иметь возможность запускать этот код в кластерной среде и иметь очередь kafka. Я хочу, чтобы он вытащил сообщение и одновременно перешел к следующему смещению, тогда следующий опрос kafka захватит следующее смещение. Это возможно? И если да, то что я делаю не так?


person Justin Smith    schedule 06.12.2016    source источник


Ответы (1)


Это невозможно в Кафке (как вы это описываете).

Если вы используете группы потребителей, один раздел может быть прочитан только одним потребителем. Таким образом, Kafka масштабируется по разделам, т. Е. Если вы хотите иметь несколько потребителей (читающих разные данные), вам нужен как минимум один раздел для каждого потребителя. Если у вас больше разделов, чем потребителей, некоторые (или все) потребители будут читать несколько разделов одновременно.

Решение для вас - создать тему с несколькими разделами (или использовать несколько тем и позволить всем потребителям вашей группы подписаться на темы).

person Matthias J. Sax    schedule 06.12.2016
comment
Хорошо, это имеет смысл, но я читал, что если у вас есть 2 раздела, у вас должно быть как минимум 2 потребителя. Так что же произойдет, если один из потребителей отключится на час? Другой потребитель не улавливает эти сообщения, верно? - person Justin Smith; 07.12.2016
comment
Подождите, я думаю, вы говорите, что если есть 2 раздела и только один потребитель, он будет выбирать из обоих? Просто чтобы убедиться, что я правильно понимаю. Если я создаю два раздела и имею двух потребителей, он должен анализировать разные сообщения, и если один действительно выйдет из строя, то другой потребитель заберет все сообщения из обоих разделов? Если это верно, есть ли у вас пример того, что необходимо для подписки на определенные разделы? Если да, то я приму ваш ответ :). - person Justin Smith; 07.12.2016
comment
Я также только что прочитал этот абзац с сайта kafkas. Концепция группы потребителей в Kafka обобщает эти две концепции. Как и в случае с очередью, группа потребителей позволяет разделить обработку на набор процессов (членов группы потребителей). Как и в случае публикации-подписки, Kafka позволяет рассылать сообщения нескольким группам потребителей. Это говорит о том, что то, что я пытаюсь достичь, возможно. Просто не знаю, как это сделать. - person Justin Smith; 07.12.2016
comment
Ваш второй комментарий верен. Потребитель может читать несколько тем. Если вы используете групповое управление и после отказа потребителя, другой берет на себя разделы отказавшего потребителя. Вы помещаете разных потребителей в одну и ту же группу потребителей, указывая в их конфигурации один и тот же group.id. - person Matthias J. Sax; 08.12.2016
comment
В случае широковещательной рассылки вы в основном используете разные группы потребителей, поскольку каждая группа будет читать все разделы. Если вы хотите читать только специальные разделы в одном потребителе, вам необходимо вручную назначить, что автоматически отключает управление группами. См. Этот вопрос: заголовок stackoverflow.com/questions/41008610/ - person Matthias J. Sax; 08.12.2016
comment
Потрясающе это то, что я хотел. Мне удалось заставить его работать, используя тот же group.id, я также где-то читал, что они должны быть в разных потоках, что могло быть частью моей проблемы. Я тестировал отдельные потоки с одним и тем же group.id, и, похоже, он отлично балансирует нагрузку. - person Justin Smith; 08.12.2016