Настройка сохранения журнала темы в confluent-kafka-python

Я не смог найти в документации, как задать время хранения при создании производителя с помощью confluent-kafka.

Если я просто укажу «загрузочные серверы», время хранения по умолчанию составляет 1 день. Я хотел бы иметь возможность изменить это.

(Я хочу сделать это в API Python, а не в командной строке.)


person Matyas    schedule 17.05.2017    source источник


Ответы (2)


Срок хранения устанавливается при создании темы, а не в конфигурации производителя.

Если ваш server.properties позволяет автоматически создавать темы, вы получите там настройки по умолчанию.

В противном случае вы можете использовать AdminClient API для отправки запроса NewTopic, который поддерживает атрибут config dict<str,str>

from confluent_kafka.admin import AdminClient, NewTopic

# a = AdminClient(...) 

topics = list()
t = NewTopic(topic, num_partitions=3, replication_factor=1, config={'log.retention.hours': '168'})
topics.append(t)

# Call create_topics to asynchronously create topics, a dict
# of <topic,future> is returned.
fs = a.create_topics(topics)

# Wait for operation to finish.
# Timeouts are preferably controlled by passing request_timeout=15.0
# to the create_topics() call.
# All futures will finish at the same time.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

По этой же ссылке вы можете найти запрос на изменение темы

person OneCricketeer    schedule 27.09.2018
comment
Я знаю, что не должен публиковать сообщения с благодарностью, но этот ответ был очень полезным - person Akash Roy Choudhury; 31.05.2019
comment
В нем говорится, что NewTopic не имеет ключевого аргумента с именем config. Не могли бы вы помочь? Тай! - person Dhiraj Gandhi; 28.12.2020
comment
@Dhiraj Я не уверен в каких-либо обновлениях, но класс задокументирован из репозитория github. - person OneCricketeer; 29.12.2020
comment
Насколько я вижу, аргумент config больше не поддерживается. kafka-python.readthedocs.io/ en/master/_modules/kafka/admin/ - person rodoherty1; 19.02.2021
comment
@rodo Этот ответ предназначен для библиотеки Confluent Python. - person OneCricketeer; 20.02.2021

время удерживания не является собственностью производителя. Время хранения по умолчанию задается в файле конфигурации брокера server.properties и таких свойствах, как log.retention.hours, например. /etc/kafka/server.properties ... в зависимости от вашей установки.

Вы можете изменить время хранения для каждой темы, например, с помощью

$ <path-to-kafka>/bin/kafka-topics.sh --zookeeper <zookeeper-quorum> --alter --topic <topic-name> --config retention.ms=<your-desired-retention-in-ms>

ХТХ....

person GeKo    schedule 18.05.2017
comment
Спасибо. Но как я могу сделать это из программы? Мне действительно нужно использовать системный вызов (скажем, в python) для вызова сценария оболочки? - person Matyas; 19.05.2017
comment
Добавлен ответ для этого из Python - person OneCricketeer; 27.09.2018