kafka получить количество разделов для темы

Как я могу получить количество разделов для любой темы kafka из кода. Я изучил много ссылок, но, похоже, ни одна из них не работает.

Упомянув несколько:

http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api

http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic

http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic

которые выглядят как аналогичные обсуждения.

Также есть аналогичные ссылки на SO, у которых нет рабочего решения для этого.


person vish4071    schedule 16.02.2016    source источник
comment
Какая версия Кафки?   -  person Marko Bonaci    schedule 16.02.2016
comment
vish4071, как насчет того, чтобы принять решение, которым вы в конечном итоге воспользовались?   -  person Marko Bonaci    schedule 07.10.2018


Ответы (14)


В 0.82 Producer API и 0.9 Consumer api вы можете использовать что-то вроде

Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")
person Sunil Patil    schedule 16.02.2016
comment
Это предполагает, что я использую KafkaConsumer для своего потребителя, но для него я использую ConsumerConnector. см .: cwiki.apache.org/confluence/display/KAFKA/ - person vish4071; 17.02.2016

Перейдите в свой kafka/bin каталог.

Затем запустите это:

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name

Вы должны увидеть то, что вам нужно, под PartitionCount.

Topic:topic_name        PartitionCount:5        ReplicationFactor:1     Configs:
        Topic: topic_name       Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001
person peter.petrov    schedule 16.02.2016
comment
Голосование за поддержку, потому что я искал решение, не связанное с кодом, и это было идеально. - person David Kaczynski; 02.03.2017
comment
Если вы хотите суммировать все разделы (включая реплики) для регулярного выражения темы, см. Мой ответ ниже. - person PragmaticProgrammer; 14.01.2021

Вот как я это делаю:

  /**
   * Retrieves list of all partitions IDs of the given {@code topic}.
   * 
   * @param topic
   * @param seedBrokers List of known brokers of a Kafka cluster
   * @return list of partitions or empty list if none found
   */
  public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
    for (BrokerInfo seed : seedBrokers) {
      SimpleConsumer consumer = null;
      try {
        consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
        List<String> topics = Collections.singletonList(topic);
        TopicMetadataRequest req = new TopicMetadataRequest(topics);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

        List<Integer> partitions = new ArrayList<>();
        // find our partition's metadata
        List<TopicMetadata> metaData = resp.topicsMetadata();
        for (TopicMetadata item : metaData) {
          for (PartitionMetadata part : item.partitionsMetadata()) {
            partitions.add(part.partitionId());
          }
        }
        return partitions;  // leave on first successful broker (every broker has this info)
      } catch (Exception e) {
        // try all available brokers, so just report error and go to next one
        LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
      } finally {
        if (consumer != null)
          consumer.close();
      }
    }
    throw new RuntimeError("Could not get partitions");
  }

Обратите внимание, что мне просто нужно было извлечь идентификаторы разделов, но вы можете дополнительно получить любые другие метаданные разделов, например leader, isr, replicas, ...
И BrokerInfo - это просто простой POJO с полями host и port.

person Marko Bonaci    schedule 17.02.2016

Ниже оболочки cmd может вывести количество разделов. Вы должны быть в каталоге bin kafka перед выполнением cmd:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{print "count of partitions=" $1}'

Обратите внимание, что вы должны изменить название темы в соответствии с вашими потребностями. Вы можете дополнительно проверить это, используя условие if:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{if ($1=="16") print "valid partitions"}'

Приведенная выше команда cmd печатает допустимые разделы, если количество равно 16. Вы можете изменить количество в зависимости от ваших требований.

person MD5    schedule 27.08.2017

В java-коде мы можем использовать AdminClient, чтобы получить суммарные доли одной темы.

Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
AdminClient client = AdminClient.create(props);

DescribeTopicsResult result = client.describeTopics(Arrays.asList("TEST"));
Map<String, KafkaFuture<TopicDescription>>  values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get("TEST");
int partitions = topicDescription.get().partitions().size();
System.out.println(partitions);
person TongChen    schedule 22.01.2019

Используйте PartitionList от KafkaConsumer

     //create consumer then loop through topics
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    List<PartitionInfo> partitions = consumer.partitionsFor(topic);

    ArrayList<Integer> partitionList = new ArrayList<>();
    System.out.println(partitions.get(0).partition());

    for(int i = 0; i < partitions.size(); i++){
        partitionList.add(partitions.get(i).partition());
    }

    Collections.sort(partitionList);

Должен работать как шарм. Сообщите мне, есть ли более простой способ получить доступ к списку разделов из темы.

person pjkmgs    schedule 22.07.2019

Таким образом, следующий подход работает для kafka 0.10 и не использует API-интерфейсы производителя или потребителя. Он использует некоторые классы из scala API в kafka, такие как ZkConnection и ZkUtils.

    ZkConnection zkConnection = new ZkConnection(zkConnect);
    ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
    System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
         JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());
person Avinash Kumar Pandey    schedule 26.10.2016

У меня была такая же проблема, когда мне нужно было получить разделы для темы.

С помощью ответа здесь я смог получить информацию от Zookeeper.

Вот мой код на Scala (но его можно легко перевести на Java)

import org.apache.zookeeper.ZooKeeper

def extractPartitionNumberForTopic(topicName: String, zookeeperQurom: String): Int = {
  val zk = new ZooKeeper(zookeeperQurom, 10000, null);
  val zkNodeName = s"/brokers/topics/$topicName/partitions"
  val numPartitions = zk.getChildren(zkNodeName, false).size
  zk.close()
  numPartitions
}

Использование этого подхода позволило мне получить доступ к информации о темах Kafka, а также другой информации о брокерах Kafka ...

В Zookeeper вы можете проверить количество разделов для темы, перейдя на /brokers/topics/MY_TOPIC_NAME/partitions

Использование zookeeper-client.sh для подключения к вашему zookeeper:

[zk: ZkServer:2181(CONNECTED) 5] ls /brokers/topics/MY_TOPIC_NAME/partitions
[0, 1, 2]

Это показывает нам, что для темы MY_TOPIC_NAME есть 3 раздела.

person user1314742    schedule 30.01.2017

Количество разделов можно получить из zookeeper-shell

Syntax: ls /brokers/topics/<topic_name>/partitions

Ниже приведен пример:

root@zookeeper-01:/opt/kafka_2.11-2.0.0# bin/zookeeper-shell.sh zookeeper-01:2181
Connecting to zookeeper-01:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /brokers/topics/test/partitions
[0, 1, 2, 3, 4]
person Hackaholic    schedule 17.11.2018

Вы можете получить список разделов kafka из zookeeper следующим образом. Это реальный номер раздела на стороне сервера kafka.

[zk: zk.kafka:2181(CONNECTED) 43] ls /users/test_account/test_kafka_name/brokers/topics/test_kafka_topic_name/partitions
[35, 36, 159, 33, 34, 158, 157, 39, 156, 37, 155, 38, 154, 152, 153, 150, 151, 43, 42, 41, 40, 202, 203, 204, 205, 200, 201, 22, 23, 169, 24, 25, 26, 166, 206, 165, 27, 207, 168, 208, 28, 29, 167, 209, 161, 3, 2, 162, 1, 163, 0, 164, 7, 30, 6, 32, 5, 160, 31, 4, 9, 8, 211, 212, 210, 215, 216, 213, 19, 214, 17, 179, 219, 18, 178, 177, 15, 217, 218, 16, 176, 13, 14, 11, 12, 21, 170, 20, 171, 174, 175, 172, 173, 220, 221, 222, 223, 224, 225, 226, 227, 188, 228, 187, 229, 189, 180, 10, 181, 182, 183, 184, 185, 186, 116, 117, 79, 114, 78, 77, 115, 112, 113, 110, 111, 118, 119, 82, 83, 80, 81, 86, 87, 84, 85, 67, 125, 66, 126, 69, 127, 128, 68, 121, 122, 123, 124, 129, 70, 71, 120, 72, 73, 74, 75, 76, 134, 135, 132, 133, 59, 138, 58, 57, 139, 136, 56, 137, 55, 64, 65, 62, 63, 60, 131, 130, 61, 49, 143, 48, 144, 145, 146, 45, 147, 44, 148, 47, 149, 46, 51, 52, 53, 54, 140, 142, 141, 50, 109, 108, 107, 106, 105, 104, 103, 99, 102, 101, 100, 98, 97, 96, 95, 94, 93, 92, 91, 90, 88, 89, 195, 194, 197, 196, 191, 190, 193, 192, 198, 199, 230, 239, 232, 231, 234, 233, 236, 235, 238, 237]

И вы можете использовать количество разделов в потребительском коде.

  def getNumPartitions(topic: String): Int = {
    val zk = CuratorFrameworkFactory.newClient(zkHostList, new RetryNTimes(5, 1000))

    zk.start()
    var numPartitions: Int = 0
    val topicPartitionsPath = zkPath + "/brokers/topics/" + topic + "/partitions"

    if (zk.checkExists().forPath(topicPartitionsPath) != null) {
        try {
            val brokerIdList = zk.getChildren().forPath(topicPartitionsPath).asScala
            numPartitions = brokerIdList.length.toInt
        } catch {
            case e: Exception => {
                e.printStackTrace()
            }  
        }  
    }  
    zk.close()

    numPartitions
  }
person gyuseong    schedule 05.08.2019

@ Сунил-патил ответ остановился перед ответом на счет его части. Вы должны получить размер списка

Manufacturer.partitionsFor ("тест"). size ()

@ vish4071 нет смысла бодать Сунила, вы не упомянули, что используете ConsumerConnector в вопросе.

person Andy    schedule 17.09.2016

Вы можете изучить kafka.utils.ZkUtils, в котором есть множество методов, предназначенных для извлечения метаданных о кластере. Ответы здесь хорошие, поэтому я просто добавляю их для разнообразия:

import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient

def getTopicPartitionCount(zookeeperQuorum: String, topic: String): Int = {
  val client = new ZkClient(zookeeperQuorum)
  val partitionCount = ZkUtils.getAllPartitions(client)
    .count(topicPartitionPair => topicPartitionPair.topic == topic)

  client.close
  partitionCount
}
person Danny Mor    schedule 05.04.2017

Чтобы получить список разделов, идеальный / фактический способ - использовать API AdminClients.

    Properties properties=new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    AdminClient adminClient=KafkaAdminClient.create(properties);
    Map<String, TopicDescription> jension = adminClient.describeTopics(Collections.singletonList("jenison")).all().get();
    System.out.println(jension.get("jenison").partitions().size());

Его можно запустить как отдельный java-метод без зависимостей производителя / потребителя.

person D1 and 1ly    schedule 24.02.2020

Я обнаружил, что ни один из ответов не дает быстрого и простого способа подсчитать все разделы для данного регулярного выражения темы. В моем случае мне нужно было увидеть, сколько разделов было в моем кластере, включая реплики для определения размера.

Ниже приведена команда bash, которую вы можете запустить (никаких дополнительных инструментов не требуется):

kafka-topics --describe --bootstrap-server broker --topic ".*" | grep Configs | awk '{printf "%d\n", $4*$6}' | awk '{s+=$1} END {print s}'

Вы можете настроить регулярное выражение темы, заменив регулярное выражение .* на то, что вам нравится. Также не забудьте изменить broker на адрес вашего брокера.

Подробности:

  1. Поток kafka-topics описывает результаты по заданным интересующим темам
  2. Извлеките только первую строку для каждой темы, которая содержит количество разделов и коэффициент репликации.
  3. Умножьте PartitionCount на ReplicationFactor, чтобы получить общее количество разделов для темы.
  4. Суммируйте все подсчеты и распечатайте общее

Бонус:

Если у вас установлен докер, вам не нужно загружать двоичный файл Kafka:

docker run -it confluentinc/cp-kafka:6.0.0 /bin/bash

Затем вы можете запустить это, чтобы получить доступ ко всем скриптам Kafka:

cd /usr/bin
person PragmaticProgrammer    schedule 14.01.2021