Как настроить Apache Kafka для отправки данных в указанное время?

Рассмотрим следующую абстрактную схему работы Apache Kafka:

Poducers ->(Send messages) -> Apache Kafka -> (Resend to customers) -> Customers 

Можно ли настроить Kafka для отправки сообщений клиентам в указанное время?

Второй вопрос, реально ли откатить сообщение к Кафке от заказчика?


person Jessie    schedule 08.12.2017    source источник
comment
Можете ли вы уточнить, что вы подразумеваете под откатом сообщения к Кафке? Вы имеете в виду прочитать предыдущее сообщение? Или поместить сообщение обратно в Kafka от потребителя?   -  person Robin Moffatt    schedule 08.12.2017


Ответы (4)


Если я получу вопрос, вы хотите отправить данные клиенту в определенный момент времени. При использовании Lenses для Apache Kafka это может быть так же просто, как

#cron the following to execute daily at 24:00 curl -XGET http://lenses-host:port/api/sql/data?sql=SELECT * from topicA WHERE customer = 'customerA WHERE _ts > 'yyyy-mm-dd hh:mm:ss'' > customerA.json send [email protected] customerA.json

Итак, чтобы ответить на первую часть вопроса, вам нужно построить свою потребительскую логику. Откат не поддерживается в Kafka, хотя вы легко можете сделать что-то вроде:

INSERT INTO topicB SELECT * from topicA WHERE _ts < '2017-12-10 00:00:00'

Таким образом, вы можете легко создать новую тему из другой, но нет семантики отката.

person Antonios Chalkiopoulos    schedule 09.12.2017

Потребители получают сообщения от Kafka; Кафка их не подталкивает («отправляет»). Таким образом, ваши потребители могут извлекать данные, когда они этого хотят.

person Robin Moffatt    schedule 08.12.2017
comment
Итак, как же извлекать данные каждый раз (каждые миллисекунды) и откатывать сообщение в Кафку? Может ли клиент вытащить данные по условию? - person Jessie; 08.12.2017

Как уже ответили другие, Kafka не отправляет сообщения потребителям, а потребители получают сообщения от Kafka; это означает, что вам нужно написать своего потребителя, чтобы получать сообщения из темы Kafka в определенное время (или интервалы). Про откат что вы имеете в виду? Может быть, этот потребитель получает сообщения от Kafka, но затем хочет перечитать те же сообщения позже, потому что во время первой обработки возникают ошибки? Если да, то есть два аспекта, которые следует учитывать в отношении Кафки:

  • В Kafka есть настраиваемое сохранение сообщений (даже в течение нескольких дней), это означает, что когда потребитель получает сообщения, они не удаляются из раздела темы.
  • вместо этого, когда потребитель получает сообщения, он должен зафиксировать смещение, чтобы он мог отслеживать, какое последнее сообщение было прочитано из раздела темы. Эта фиксация может быть выполнена автоматически или вручную, поэтому вы можете зафиксировать смещение только в том случае, если ваш процесс прошел хорошо. В любом случае вы можете перемотать поток и решить заново начать чтение раздела темы с определенного смещения.
person ppatierno    schedule 08.12.2017
comment
На самом деле вы правы здесь ; it means that you need to write your consumer in order to pull messages . Это был изначально вопрос, который привел меня к Кафке. Можете ли вы другой механизм для издателя? - person Jessie; 08.12.2017

Распространяясь на Робина,

Kafka не будет отправлять сообщения потребителю, потребителю нужно получать сообщения от Kafka.

Посмотрите на фрагмент кода Python ниже, чтобы прочитать сообщения от Kafka:

running = True
while running:
    msg = c.poll(timeout=1.0)
    if not msg.error():
        print('Received message: %s' % msg.value().decode('utf-8'))
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False

В приведенном выше фрагменте msg = c.poll(timeout=1.0) используется для получения сообщения от Kafka каждую секунду. если вы хотите увеличить время ожидания до любого количества секунд. это означает, что потребитель Kafka будет извлекать сообщения из каждого временного интервала.

если вы хотите сделать расписание, вы должны вызвать метод опроса в расписании.

Примечание: значение session.timeout.ms должно быть больше, чем время опроса

person shakeel    schedule 08.12.2017
comment
Я не спрашиваю, как читать сообщения на стороне клиента. Я задал другой, пожалуйста, прочитайте вопрос еще раз - person Jessie; 08.12.2017
comment
Вы ответили частично, ничего не сказали про откат, если это возможно в Кафке. - person Jessie; 08.12.2017
comment
откатиться значит?? что именно вы имеете в виду? можете привести пример? потребители значит не потребители? - person shakeel; 08.12.2017