Spark Открытие нескольких потоков для одного задания при попытке запуска параллельных заданий

У нас есть вариант использования, в котором нам нужно запускать параллельные SQL-запросы Spark в одном сеансе Spark через rest-api (akka http).

Конфигурация приложения

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // or in Akka 2.4.2+
    fixed-pool-size = 4
  }
  throughput = 100
} 

Служба Spark

import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
import org.apache.spark.sql.execution.ui.CustomSqlListener
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.parsing.json.JSON

trait SparkService {

  val session = SparkSession
                .builder()
                .config("spark.scheduler.mode", "FAIR")
                .appName("QueryCancellation")
                .master("local[*]")
                .enableHiveSupport()
                .getOrCreate()

  var queryJobMapStart = Map[String, String]()
  var queryStatusMap = Map[String,String]()
  session.sparkContext.setLogLevel("ERROR")
  session.sparkContext.setCallSite("Reading the file")
  val dataDF = session.read
      .format("csv")
      .option("inferSchema","true")
      .option("header","true")
      .load("C:\\dev\\QueryCancellation\\src\\main\\resources\\Baby_Names__Beginning_2007.csv")



  dataDF.createOrReplaceTempView("data_tbl")


  //dataDF.printSchema()

  val customListener = new CustomSqlListener(session.sparkContext.getConf,queryJobMapStart,queryStatusMap)
  val appListener = session.sparkContext.addSparkListener(customListener)


  def runQuery(query : String, queryId: String)(implicit  ec : ExecutionContext)=  {


  //  println("queryId: "+ queryId +" query:" + query)
    session.sparkContext.setLocalProperty("callSite.short",queryId)
    session.sparkContext.setLocalProperty("callSite.long",query)

     session.sql(query).show(2)
    //Thread.sleep(60000)
   // Future(data)

  }

}

object SparkService extends SparkService 

Служба запросов

import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorSystem
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
trait QueryService extends SparkService {
  implicit val system: ActorSystem
  implicit val materializer : ActorMaterializer
  // implicit val sparkSession: SparkSession
  // val datasetMap = new ConcurrentHashMap[String, Dataset[Row]]()
  implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
  val route: Route =
    pathSingleSlash {
      get {
        complete {
          "welcome to rest service"
        }
      }
    } ~
      path("runQuery" / "county"/Segment) { county  =>
        get {
          complete{
            var res= ""
            val documentId = "user ::" + UUID.randomUUID().toString
            val queryId = System.nanoTime().toString
            val stmt = "select a.sex,count(*) from data_tbl a,data_tbl b where b.county=a.county and a.country= '"+county+"' group by a.sex"
            val result = runQuery(stmt,queryId)
            /* var entity = queryResult match {
               case Some(result) =>s"Query : $stmt  is submitted. Query id is $result. User id is $documentId"
               case None => s"Query : $stmt could not be submitted. User id is $documentId"
             }*/
            /*result.onComplete{
              case Success(value) => println(s"Query completed")
              case Failure(e) =>  None
            }*/
            var entity = s"Query : $stmt  is submitted. Query id is $queryId. User id is $documentId"
            entity
          }
        }
      } ~
      path("getStatus" / """[\w[0-9]-_]+""".r) { id =>
        get {
          complete {
            val statusResult = getStatus(id)
            var res = statusResult match {
              case Some(result) =>  s"Status for query id : $id is $result"
              case None =>  s"Could not find the status of the query id : $id"
            }
            res
          }
        }
      } ~
      path("killQuery" / """[\w[0-9]-_]+""".r) { id =>
        get {
          complete {
            val statusResult = killQuery(id)
            s"Query id $id is cancelled."
          }
        }
      }
}

Сервер запросов

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer

import scala.concurrent.Future
//import scala.concurrent.ExecutionContext.Implicits.global

class QueryServer (implicit val system:ActorSystem ,
                   implicit val materializer: ActorMaterializer) extends QueryService {

  def startServer(address : String, port: Int) = {
      Http().bindAndHandle(route,address,port)
  }

}

    object QueryServer extends App {

      implicit val actorSystem = ActorSystem("query-server")
      implicit val materializer = ActorMaterializer()
      val server = new QueryServer()
      server.startServer("localhost",8080)
      println("running server at localhost 8080")

    }

Когда я пытаюсь выполнить запрос к Spark SQL через http:localhost:8080/runQuery/county/'KINGS', создается несколько идентификаторов заданий, из которых максимальное количество пропускается.

Ниже приведен снимок экрана пользовательского интерфейса Spark. Я не могу понять, почему создается выделенная задача.

введите здесь описание изображения

Ниже приведен журнал консоли, который показывает, что задание было выполнено только один раз:

"running server at localhost 8080
173859599588358->2
****************************************************************************************
****************************************************************************************
Job id 2 is completed
--------------------------------------------------------
173859599588358->3
****************************************************************************************
****************************************************************************************
Job id 3 is completed
--------------------------------------------------------
173859599588358->4
****************************************************************************************
****************************************************************************************
Job id 4 is completed
--------------------------------------------------------
173859599588358->5
****************************************************************************************
****************************************************************************************
Job id 5 is completed
--------------------------------------------------------
173859599588358->6
****************************************************************************************
****************************************************************************************
Job id 6 is completed
--------------------------------------------------------
173859599588358->7
****************************************************************************************
****************************************************************************************
Job id 7 is completed
--------------------------------------------------------
+---+--------+
|sex|count(1)|
+---+--------+
|  F|12476769|
|  M|12095080|
+---+--------+

Версия Spark: 2.2.1


person Rajat Mishra    schedule 18.05.2018    source источник
comment
В чем вопрос?   -  person Alper t. Turker    schedule 18.05.2018
comment
Почему в sparkUI создается дополнительный идентификатор задания? Я выделил это на скриншоте   -  person Rajat Mishra    schedule 18.05.2018


Ответы (2)


Похоже, оптимизатор искрового катализатора оптимизирует запрос. Создается более одной DAG и, вероятно, выбирается лучший план выполнения. Вы видите план выполнения, и их может быть не один. Я думаю, что здесь нет никакого отношения к akka http. Попробуйте запустить код в искровой оболочке и можете проверить утверждение.

person Amit Joshi    schedule 18.05.2018
comment
Согласен с вами, это больше связано с проблемой Spark Catalyst, а не с akka. Потому что, когда я делаю подсчет, создаются только 2 jobId. - person Rajat Mishra; 18.05.2018

Часть option("inferSchema","true") вызовет Spark Job в дополнение к остальной части вашей логики. См. Spark API для получения подробной информации.

Из-за того, что Джобы недолговечны, они могут быть последовательными, а не параллельными (говоря с самого начала). Вы смотрели на вкладку SQL пользовательского интерфейса Spark. Вполне возможно, что все эти задания (кроме задания на вывод схемы) являются частью одного выполняемого SQL-запроса.

person OL_FK    schedule 18.05.2018
comment
Задания Inferschema — это два нижних задания с файлом чтения описания. - person Rajat Mishra; 18.05.2018
comment
Ага, хорошо. Тем не менее, вы можете взглянуть на вкладку SQL. Выполнение запроса может состоять из нескольких заданий. - person OL_FK; 18.05.2018