Datasource V2 Reader (структурированная потоковая передача Spark) — смещения не по порядку

В настоящее время я реализую два пользовательских считывателя, используя API V2 для задания структурированной потоковой передачи искры. После того, как задание работает в течение ~ 30-60 минут, оно выдает:

Caused by: java.lang.RuntimeException: Offsets committed out of order: 608799 followed by 2982

Я повторно использую найденные примеры здесь и он бомбит строку: 206.

Вместо использования потока твиттера, представленного в примере, я реализую его для JMS и SQS.

У меня вопрос: кто-нибудь сталкивался с этой проблемой? Или что-то не так с этой реализацией?

Фрагмент кода:

override def commit(end: Offset): Unit = {
    internalLog(s"** commit($end) lastOffsetCommitted: $lastOffsetCommitted")

    val newOffset = TwitterOffset.convert(end).getOrElse(
      sys.error(s"TwitterStreamMicroBatchReader.commit() received an offset ($end) that did not " +
        s"originate with an instance of this class")
    )

    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

    if (offsetDiff < 0) {
      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
    }

    tweetList.trimStart(offsetDiff)
    lastOffsetCommitted = newOffset
}

Я не могу найти ответ с моими обычными розетками. Однако я видел это. Был сделан вывод об удалении данных контрольных точек, что не похоже на жизнеспособное решение в производственной системе. Другой заключался в том, что исходная система не поддерживает информацию о смещении? У меня сложилось впечатление, что искра будет обрабатывать информацию о смещении сама по себе. Если этот второй пункт является проблемой, как я могу гарантировать, что исходная система обрабатывает эту парадигму.

Пожалуйста, дайте мне знать, если я могу предоставить больше информации.

Редактировать: Глядя на интерфейс MicroBatchReader, документация для фиксации говорит:

    /**
     * Informs the source that Spark has completed processing all data for offsets less than or
     * equal to `end` and will only request offsets greater than `end` in the future.
     */
    void commit(Offset end);

Таким образом, возникает вопрос, почему spark отправляет мне смещения коммитов, которые уже были зафиксированы?


person taylorcressy    schedule 22.05.2019    source источник


Ответы (1)


Отвечая на мой собственный вопрос, если это кому-то поможет,

Я должен был добавить больше информации к вопросу - это задание выполняется в EMR и использует EFS для данных контрольных точек.

Проблема возникла, когда я использовал Amazon amazon-efs-utils для монтирования EFS. По какой-то причине каждый воркер не мог видеть операции чтения и записи других воркеров, как будто EFS не монтировалась.

Решение состояло в том, чтобы переключиться на nfs-utils для монтирования EFS (в соответствии с инструкциями AWS), чтобы каждый воркер мог точно читать данные контрольной точки.

person taylorcressy    schedule 30.05.2019