производитель java kafka застрял при создании сообщения

Я реализую производителя apache kafka, используя java api. Apache Kafka установлен на локальном хосте. Zookeeper также работает, но функция Producer.send () по-прежнему зависает при отправке сообщения, и сообщение не публикуется.

Я уже создал тему "быстрых сообщений".

Ниже приведен код.

package com.hsahu.kafka.producer;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerExample {
public static void main(String[] args) {

    Properties props = new Properties();

    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 0);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new KafkaProducer<>(props);

    try {
        producer.send(new ProducerRecord<String, String>("fast-messages", "This is a dummy message"));
    } catch(Exception ex) {
        System.out.println(ex);
    }

    System.out.println("message publisher");

    producer.close();
}

}

Что я должен делать ? мой код неправильный, или какие-либо свойства установлены неправильно или отсутствуют?


person Himanshu Sahu    schedule 28.08.2016    source источник


Ответы (4)


В коде не было никаких проблем. только версия api и версия сервера kafka не совпадали. Итак, я исправил только версию api, и теперь производитель работает.

person Himanshu Sahu    schedule 31.08.2016
comment
Я согласен. Я использовал клиент kafka 0.10; однако моя версия брокера была .9. Я обновился до .10, и все отлично заработало. - person Ryan Stack; 19.05.2017

тест ниже

если версия выше 0.9, вам нужна конфигурация "Advertised.host.name" в брокере

person zpc    schedule 29.08.2016

Можете ли вы попробовать Producer.flush () вместо Producer.close (). Flush () блокирует до тех пор, пока сообщения не будут отправлены брокеру kafka ??? Я не вижу ничего странного, кроме этого ..

person amateur    schedule 29.08.2016

В моем случае это произошло потому, что тема не была создана. Я испортил конфигурацию bootstrap-серверов для производителя (использовал другой env, где такой темы не было). После исправления это сработало.

person Reynard    schedule 03.12.2018