Я использую AWS EMR со Spark 1.4.0, работающим как автономный кластер (не управляемый Yarn или Mesos), и я использую couchbase-spark-connector версии 1.0.0-beta на момент написания этой статьи и couchbase-java-client версии 2.2.0-dp2 на момент написания этой статьи.
У меня есть представление, написанное в Couchbase, которое возвращает мне только ключи моих документов (около 300 миллионов ключей).
Я написал программу Spark, написанную на Scala 2.10.4, которая для каждого ключа, возвращаемого из представления, я получаю документ и хочу сохранить все это в файловой системе AWS S3.
Моя проблема в том, что Couchbase выдает мне исключение из типа: com.couchbase.client.core.BackpressureException, которое говорит документация:
Определяет необходимость отступить со стороны поставщика при использовании услуги, поскольку потребитель перегружен.
Итак, мой вопрос в том, как я могу исправить это исключение, чтобы оно не выбрасывалось. Я бы предположил, что хотел бы отложить свои запросы, но я не понимаю, как это сделать.
Ниже прикреплен мой код Spark:
val couchbaseKeys = sparkContext.couchbaseView(ViewQuery.from(couchbaseDesignName, couchbaseViewName)).map(_.id).couchbaseGet[JsonDocument]()
couchbaseKeys.map(Projection.projectCouchbaseObject(_)).filter(_ != null).saveAsTextFile(pathForExportedOutput)
И объект Projection со своим методом:
object Projection {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
def projectCouchbaseObject(couchbaseObject: JsonDocument): String = {
try {
return couchbaseObject.id() + '\t' + couchbaseObject.content()
}
catch {
case exception: Throwable => {
logger.error("Failed project couchbase object. key was: " + couchbaseObject.id(), exception)
}
}
return null
}
}