Я пытаюсь использовать Apache Spark для запроса моих данных в Elasticsearch, но моя искровая работа занимает около 20 часов, чтобы выполнить агрегацию, и она все еще выполняется. Такой же запрос в ES занимает около 6 секунд.
Я понимаю, что данные должны переместиться из кластера Elasticsearch в мой искровый кластер, а некоторые данные перетасовываются в Spark.
Данные внутри моего индекса ES составляют прибл. 300 миллионов документов, и каждый документ имеет около 400 полей (1,4 Терабайта).
У меня есть искровой кластер с 3 узлами (1 мастер, 2 рабочих) с 60 ГБ памяти и 8 ядрами в целом.
Время, необходимое для запуска, неприемлемо, есть ли способ ускорить выполнение моего искрового задания?
Вот моя конфигурация искры:
SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp")
.setMaster("spark://10.0.0.203:7077")
.set("es.nodes", "10.0.0.207")
.set("es.cluster", "wp-es-reporting-prod")
.setJars(JavaSparkContext.jarOfClass(Demo.class))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.default.parallelism", String.valueOf(cpus * 2))
.set("spark.executor.memory", "8g");
Отредактировано
SparkContext sparkCtx = new SparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sparkCtx);
DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "customer-rpts01-201510/sample");
DataFrame dfCleaned = cleanSchema(sqlContext, df);
dfCleaned.registerTempTable("RPT");
DataFrame sqlDFTest = sqlContext.sql("SELECT agent, count(request_type) FROM RPT group by agent");
for (Row row : sqlDFTest.collect()) {
System.out.println(">> " + row);
}