В настоящее время я реализую два пользовательских считывателя, используя API V2 для задания структурированной потоковой передачи искры. После того, как задание работает в течение ~ 30-60 минут, оно выдает:
Caused by: java.lang.RuntimeException: Offsets committed out of order: 608799 followed by 2982
Я повторно использую найденные примеры здесь и он бомбит строку: 206.
Вместо использования потока твиттера, представленного в примере, я реализую его для JMS и SQS.
У меня вопрос: кто-нибудь сталкивался с этой проблемой? Или что-то не так с этой реализацией?
Фрагмент кода:
override def commit(end: Offset): Unit = {
internalLog(s"** commit($end) lastOffsetCommitted: $lastOffsetCommitted")
val newOffset = TwitterOffset.convert(end).getOrElse(
sys.error(s"TwitterStreamMicroBatchReader.commit() received an offset ($end) that did not " +
s"originate with an instance of this class")
)
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
if (offsetDiff < 0) {
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
}
tweetList.trimStart(offsetDiff)
lastOffsetCommitted = newOffset
}
Я не могу найти ответ с моими обычными розетками. Однако я видел это. Был сделан вывод об удалении данных контрольных точек, что не похоже на жизнеспособное решение в производственной системе. Другой заключался в том, что исходная система не поддерживает информацию о смещении? У меня сложилось впечатление, что искра будет обрабатывать информацию о смещении сама по себе. Если этот второй пункт является проблемой, как я могу гарантировать, что исходная система обрабатывает эту парадигму.
Пожалуйста, дайте мне знать, если я могу предоставить больше информации.
Редактировать: Глядя на интерфейс MicroBatchReader, документация для фиксации говорит:
/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);
Таким образом, возникает вопрос, почему spark отправляет мне смещения коммитов, которые уже были зафиксированы?