Всем.
Пожалуйста, помогите мне.
Я пишу задание потоковой передачи apache flink, которое считывает json-сообщения из apache kafka (500-1000 сообщений в секундах), десериализует их в POJO и выполняет некоторые операции (фильтр-ключ-приемник-процесс). Я использовал бэкэнд состояния RocksDB с семантикой ExactlyOnce. Но я не понимаю, какой интервал контрольных точек мне нужно установить?
На некоторых форумах пишут в основном 1000 или 5000 мс. Я пытался установить интервал 10 мс, 100 мс, 500 мс, 1000 мс, 5000 мс. Никаких отличий не заметил.
Какой установить интервал контрольной точки (мс)?
Ответы (2)
Два фактора говорят в пользу разумно небольшого интервала между контрольными точками:
(1) Если вы используете приемник, который выполняет двухфазную фиксацию транзакций, например Kafka или StreamingFileSink, то эти транзакции будут фиксироваться только во время контрольной точки. Таким образом, любые последующие потребители вывода вашего задания будут испытывать задержку, которая определяется интервалом контрольной точки.
Обратите внимание, что у вас не будет этой задержки с Kafka, если вы не выполните все шаги, необходимые для семантики ровно один раз, от конца до конца. Это означает, что вы должны установить Semantic.EXACTLY_ONCE
в производителе Kafka и установить isolation.level
в нижестоящих потребителях на read_committed
. И если вы делаете это, вам также следует увеличить transaction.max.timeout.ms
сверх значения по умолчанию (15 минут). См. документы, чтобы узнать больше.
(2) Если ваша работа не удалась и вам необходимо восстановить данные с контрольной точки, входные данные будут перемотаны на смещения, записанные в контрольной точке, и обработка будет продолжена оттуда. Если интервал между контрольными точками очень большой (например, 30 минут), то вашей работе может потребоваться довольно много времени, чтобы вернуться к точке, где она снова будет обрабатывать события почти в реальном времени (при условии, что вы обрабатываете данные в реальном времени).
С другой стороны, установка контрольных точек добавляет некоторые накладные расходы, поэтому выполнение ее чаще, чем необходимо, влияет на производительность.
В дополнение к пунктам, описанным @David, я предлагаю также использовать следующую функцию для настройки времени контрольной точки:
StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
Таким образом, вы гарантируете, что ваша работа сможет добиться определенного прогресса в случае, если состояние станет больше, чем планировалось, или если хранилище, в котором выполняются контрольные точки, будет медленным.
Я рекомендую прочитать документацию Flink на Настройка контрольных точек, чтобы лучше понять эти сценарии.