Варианты объединения искр

У меня возникли проблемы с пониманием того, как использовать настройки для параметров пула и определить, работают ли они из этого источника: https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/

Будет ли SparkSession val учитывать параметры пула из кластера?

Мой Scala-код:

package com.zeropoints.processing

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import com.datastax.spark.connector._
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.PoolingOptions
import com.datastax.driver.core.HostDistance

//This object provides the main entry point into spark processing
object main {

  var appName = "Processing" 

  lazy val sparkconf:SparkConf = new SparkConf(true).setAppName(appName)

  lazy val poolingOptions:PoolingOptions = new PoolingOptions()

  lazy val cluster:Cluster = Cluster.builder().withPoolingOptions(poolingOptions).build()

  lazy val spark:SparkSession = SparkSession.builder().config(sparkconf).getOrCreate

  lazy val sc:SparkContext = spark.sparkContext

  def main(args: Array[String]) {
    //Set pooling stuff
    poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, 6, 60)

    //DF and RDDs tasks...
    spark.sql("select * from data.raw").groupBy("key1,key2").agg(sum("views")).
        write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "summary", "keyspace" -> "data")).
        mode(org.apache.spark.sql.SaveMode.Append).save()

    //..more stuff
  }

}

person ZeroPoints    schedule 26.07.2018    source источник


Ответы (1)


Нет, Spark Connector не будет учитывать вашу конфигурацию пула - он работает по-другому, особенно если вы думаете о выполнении своего кода в распределенной среде - ваш setConnectionsPerHost выполняется только в драйвере и не влияет на исполнителей.

Правильный способ — указать необходимые настройки через параметры конфигурации Spark. В документации есть отдельный раздел о параметры подключения, и connection.connections_per_executor_max может быть тем, что вам нужно. Вы также можете написать свой собственный класс, реализующий трейт CassandraConnectionFactory и обеспечивающий реализацию функции createCluster. Затем вы можете указать это имя класса в качестве параметра конфигурации connection.factory.

Но главный вопрос — действительно ли вам нужно настраивать эти параметры? Вы думаете, что обработка идет медленно? Документация по драйверу Java рекомендует иметь 1 соединение на хост, чтобы избежать дополнительной нагрузки на Cassandra.

person Alex Ott    schedule 26.07.2018
comment
Полностью согласен, маловероятно, что увеличение количества подключений на хост улучшит производительность. При тестировании я обнаружил, что вы можете получить некоторое улучшение пропускной способности, увеличив значение, но обычно это происходит только тогда, когда вы загружаетесь на полной остановке, и даже тогда улучшение довольно незначительно. Я рассмотрю обновление документации драйвера datastax, чтобы сделать это более понятным. - person Andy Tolbert; 26.07.2018
comment
Хорошо, похоже, что ваш код Spark достигает удаленного контроллера домена, который по умолчанию имеет ограничение в 256 текущих запросов. Вы можете попробовать установить параметр connection.local_dc на имя вашего локального центра обработки данных Casasndra — в некоторых случаях драйвер не может угадать, где находится ваш код. - person Alex Ott; 27.07.2018
comment
Извините за задержку с ответом. Попробую настроить, посмотрю как пойдет. Спасибо за помощь - person ZeroPoints; 30.07.2018