Използвам AWS EMR със Spark 1.4.0, работещ като самостоятелен клъстер (не се управлява от Yarn или Mesos), и използвам couchbase-spark-connector версия 1.0.0-beta към момента на писане и couchbase-java-client версия 2.2.0-dp2 към момента на писане.
Имам изглед, написан в Couchbase, който ми връща само ключовете на моите документи (около 300 милиона+ ключа).
Написах програма Spark, написана на Scala 2.10.4, която за всеки ключ, върнат от изгледа, получавам документа и искам да го запазя целия във файловата система AWS S3.
Проблемът ми е, че Couchbase ми хвърля изключение от тип: com.couchbase.client.core.BackpressureException, което казва от документация:
Идентифицира необходимостта от отстъпление от страна на доставчика при използване на услуга, тъй като потребителят е претоварен.
Така че въпросът ми е как мога да поправя това изключение, за да не бъде хвърлено. Предполагам, че бих искал да забавя заявките си, но не виждам как да го направя.
Приложен по-долу е моят код на Spark:
val couchbaseKeys = sparkContext.couchbaseView(ViewQuery.from(couchbaseDesignName, couchbaseViewName)).map(_.id).couchbaseGet[JsonDocument]()
couchbaseKeys.map(Projection.projectCouchbaseObject(_)).filter(_ != null).saveAsTextFile(pathForExportedOutput)
И обектът Projection с неговия метод:
object Projection {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
def projectCouchbaseObject(couchbaseObject: JsonDocument): String = {
try {
return couchbaseObject.id() + '\t' + couchbaseObject.content()
}
catch {
case exception: Throwable => {
logger.error("Failed project couchbase object. key was: " + couchbaseObject.id(), exception)
}
}
return null
}
}