Elasticsearch + производительность Apache Spark

Я пытаюсь использовать Apache Spark для запроса моих данных в Elasticsearch, но моя искровая работа занимает около 20 часов, чтобы выполнить агрегацию, и она все еще выполняется. Такой же запрос в ES занимает около 6 секунд.

Я понимаю, что данные должны переместиться из кластера Elasticsearch в мой искровый кластер, а некоторые данные перетасовываются в Spark.

Данные внутри моего индекса ES составляют прибл. 300 миллионов документов, и каждый документ имеет около 400 полей (1,4 Терабайта).

У меня есть искровой кластер с 3 узлами (1 мастер, 2 рабочих) с 60 ГБ памяти и 8 ядрами в целом.

Время, необходимое для запуска, неприемлемо, есть ли способ ускорить выполнение моего искрового задания?

Вот моя конфигурация искры:

SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp")
                 .setMaster("spark://10.0.0.203:7077")    
                 .set("es.nodes", "10.0.0.207")
                 .set("es.cluster", "wp-es-reporting-prod")              
                .setJars(JavaSparkContext.jarOfClass(Demo.class))
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.default.parallelism", String.valueOf(cpus * 2))
                .set("spark.executor.memory", "8g");

Отредактировано

    SparkContext sparkCtx = new SparkContext(sparkConf);

    SQLContext sqlContext = new SQLContext(sparkCtx);
    DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "customer-rpts01-201510/sample");

    DataFrame dfCleaned = cleanSchema(sqlContext, df);

    dfCleaned.registerTempTable("RPT");

    DataFrame sqlDFTest = sqlContext.sql("SELECT agent, count(request_type) FROM RPT group by agent");

    for (Row row : sqlDFTest.collect()) {
        System.out.println(">> " + row);
    }

person Adetiloye Philip Kehinde    schedule 17.07.2015    source источник
comment
Как так получилось, что у вас есть теги datastax? Если вы используете DSE, лучшая производительность, которую вы получите, - это поиск DSE, запрашиваемый встроенным DSE Spark.   -  person phact    schedule 17.07.2015
comment
Можете ли вы включить фрагмент кода в то место, где вы запрашиваете в Spark?   -  person phact    schedule 17.07.2015
comment
@phat, отредактировано. Спасибо   -  person Adetiloye Philip Kehinde    schedule 17.07.2015
comment
Хорошо, спасибо :) fwiw, я предполагаю, что вы не выполняете фильтрацию на уровне ES. Хотя я не специалист по коннектору ES Spark.   -  person phact    schedule 17.07.2015


Ответы (2)


Я понял, что происходит, в основном, я пытался манипулировать схемой фрейма данных, потому что у меня есть некоторые поля с точкой, например user.firstname. Похоже, это вызывает проблемы на этапе сбора искры. Чтобы решить эту проблему, мне пришлось просто повторно проиндексировать мои данные, чтобы в моих полях больше не было точки, а было подчеркивание, например user_firstname.

person Adetiloye Philip Kehinde    schedule 25.08.2015

Боюсь, что вы не сможете выполнить группу размером более 1,4 ТБ с общим объемом оперативной памяти всего 120 ГБ и добиться хорошей производительности. DF попытается загрузить все данные в память / диск и только после этого выполнит группировку по. Не думаю, что на данный момент соединитель spark / ES переводит синтаксис sql на язык запросов ES.

person axlpado - Agile Lab    schedule 24.08.2015
comment
Да, он переводит синтаксис sql в запрос ES! - person Adetiloye Philip Kehinde; 25.08.2015
comment
Ах, вы используете 2.1 RC ... вы включили раскрытие параметров - ›правда? - person axlpado - Agile Lab; 25.08.2015