Управление внутренними данными Kafka Streams

В моей компании мы широко используем Kafka, но мы использовали реляционную базу данных для хранения результатов нескольких промежуточных преобразований и агрегатов по причинам отказоустойчивости. Теперь мы исследуем Kafka Streams как более естественный способ сделать это. Часто наши потребности довольно просты - один из таких случаев

  • Прослушать входную очередь <K1,V1>, <K2,V2>, <K1,V2>, <K1,V3>...
  • Для каждой записи выполните операцию с высокой задержкой (вызовите удаленную службу)
  • Если к моменту обработки <K1,V1> и оба <K1,V2>, <K1,V3> были созданы, то я должен обработать V3, поскольку V2 уже устарел.

Для этого я читаю тему как KTable. Код выглядит так, как показано ниже

KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> kTable = builder.table("input-topic");
kTable.toStream().foreach((K,V) -> client.post(V));
return builder;

Это работает так, как ожидалось, но мне непонятно, как Kafka автоматически этого добивается. Я предполагал, что Кафка создает внутренние темы для достижения этой цели, но я не вижу созданных внутренних тем. Javadoc для метода, похоже, подтверждают это наблюдение. Но потом я наткнулся на эту официальную страницу которые, похоже, предполагают, что Kafka использует отдельное хранилище данных, также известное как RocksDB, вместе с темой журнала изменений.

Теперь я не понимаю, при каких обстоятельствах создается тема журнала изменений. Мои вопросы

  1. Если поведение хранилища состояний по умолчанию является отказоустойчивым, как это предлагается на официальной странице, то где хранится это состояние? В RocksDB? В теме журнала изменений или и то, и другое?
  2. What are the implications of relying on RocksDB in production? (EDITED)
    1. As I understood, the dependency to rocksdb is transparent (just a jar file) and rocksdb stores data in local file system. But this would also means that in our case, that application will maintain a copy of the sharded data on the storage where application is running. When we replace a remote database with a KTable, it has storage implications and that is my point.
    2. Будут ли релизы Kafka позаботиться о том, чтобы RocksDB продолжала работать на различных платформах? (Поскольку кажется, что это зависит от платформы и написано не на Java)
  3. Имеет ли смысл уплотнять журнал входных тем?

Я использую v. 0.11.0


person senseiwu    schedule 09.05.2018    source источник


Ответы (1)


  1. Kafka Streams хранит состояние локально. По умолчанию используется RocksDB. Однако местное государство эфемерно. Для обеспечения отказоустойчивости все обновления магазина также записываются в раздел журнала изменений. Это позволяет перестроить и / или перенести хранилище в случае сбоя или увеличения / уменьшения масштаба. Для вашего особого случая тема журнала изменений не создается, потому что KTable не является результатом агрегирования, а заполняется непосредственно из темы - это только оптимизация. Поскольку тема журнала изменений будет содержать те же данные, что и тема ввода, Kafka Streams избегает дублирования данных и использует тему ввода в качестве темы журнала изменений в случае возникновения сценария ошибки.

  2. Не совсем понимаю, что вы имеете в виду под этим вопросом. Обратите внимание, что RocksDB считается временным хранилищем. Он используется по умолчанию по разным причинам, как описано здесь: Почему Apache Kafka Streams использует RocksDB и как его можно изменить? (например, он позволяет сохранять состояние больше, чем основная память, так как оно может попасть на диск). Вы можете заменить RocksDB любым другим магазином. Kafka Streams также поставляется с хранилищем в памяти. (Изменить)

    1. Это правильно. Вам необходимо соответствующим образом подготовить свое приложение, чтобы иметь возможность хранить локальный сегмент общего состояния. Для этого есть руководство по выбору размеров: https://docs.confluent.io/current/streams/sizing.html

    2. RocksDB написан на C ++ и подключается через привязку JNI. В Windows есть некоторые известные проблемы, поскольку RocksDB не предоставляет предварительно скомпилированные двоичные файлы для всех версий Windows. Пока вы используете платформу на базе Linux, она должна работать. Сообщество Kafka проводит тесты обновления для RocksDB, чтобы убедиться, что он совместим.

  3. да. Kafka Streams фактически предполагает, что входная тема для table() операции сжата в журнале. В противном случае существует риск потери данных в случае сбоя. (Изменить)

    1. If you enable log-compaction, retention time setting is ignored. Thus, yes, the latest update will be maintained forever (or until a tombstone message with value=null is written). Note, that when compaction is execute on the broker side, old data is garbage collected and thus, on restore, only the new version per key are read -- old versions got removed during compaction process. If you are not interested in some data after some period of time, you would need to write a tombstone into the source topic to make it work.
person Matthias J. Sax    schedule 10.05.2018
comment
спасибо за подробный ответ и за то, что вы являетесь чемпионом по этой теме! (2): Я редактировал вопрос. Ваши мысли очень ценятся. (3) Я не понимаю этого. Будет ли в сжатой теме журнала сохраняться последнее значение для каждого ключа даже по истечении срока хранения (в нашем случае 24 часа)? Если, скажем, нас не интересуют те key,value, срок хранения которых истек, ваш комментарий по-прежнему остается в силе? Как я понял, идея уплотнения состоит в том, чтобы сделать восстановление действительно эффективным, поскольку состояние может быть восстановлено без необходимости просматривать весь набор данных для каждого ключа. - person senseiwu; 10.05.2018
comment
Обновил свой ответ. - person Matthias J. Sax; 10.05.2018