Как ограничить предикат pushdown для Cassandra при использовании фреймов данных?

У меня есть большой стол Cassandra. Я хочу загрузить только 50 строк из Cassandra. Следующий код

val ds = sparkSession.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> s"$Aggregates", "keyspace" -> s"$KeySpace"))
      .load()
      .where(col("aggregate_type") === "DAY")
      .where(col("start_time") <= "2018-03-28")
      .limit(50).collect()

Следующий код выдвигает оба предиката из методов where, но не ограничивает их. Правда ли, что извлекаются все данные (1 миллион записей)? Если нет, то почему время выполнения этого кода и кода без limit(50) примерно одинаковое.


person addmeaning    schedule 28.03.2018    source источник
comment
Просто предположение: если у вас на самом деле меньше 50 записей, соответствующих предикату, Spark должен просмотреть всю таблицу, чтобы проверить, есть ли больше   -  person dk14    schedule 01.04.2018
comment
@ dk14 нет, это не так, более 10 тысяч записей соответствует предикату   -  person addmeaning    schedule 01.04.2018
comment
limit по умолчанию не преобразуется в ограничение CQL. Однако, если вы работаете с базовым RDD, вы можете asInstanceOf[CassandraRDD] там, где для CQL доступен специально определенный limit метод. Смотрите отредактированный ответ   -  person dk14    schedule 02.04.2018


Ответы (1)


В отличие от Spark Streaming, Spark сам пытается предварительно загрузить как можно больше данных и как можно быстрее, чтобы иметь возможность работать с ними параллельно. Таким образом, предварительная загрузка ленива, но жадна, когда она срабатывает. Однако существуют специфические факторы cassandra-conector:

  • Автоматический ввод предиката действительных предложений "где".

  • Согласно этот ответ limit(...) не переводится в CQL LIMIT, поэтому его поведение зависит от того, сколько заданий на выборку создано после достаточного данные загружаются. Цитировать:

ограничение вызовов позволит Spark пропустить чтение некоторых частей из базового источника данных. Это ограничит объем данных, считываемых из Cassandra, путем отмены выполнения задач.

Возможные решения:

  • Ограничениями DataFrame можно частично управлять, ограничивая numPartitions и скорость обмена данными (concurrent.reads и другие параметры). Если вас устраивает n ~ 50 «в большинстве случаев», вы также можете ограничить что-то вроде where(dayIndex < 50 * factor * num_records).

  • Существует способ установить CQL с LIMIT по SparkPartitionLimit, что напрямую влияет на каждый запрос CQL (см. подробнее) — помните, что запросы относятся к разделу Spark. Он доступен в CassandraRdd, поэтому сначала вам придется преобразовать его в RDD.

Код будет примерно таким:

filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect()

Это добавит LIMIT $N к каждому CQL-запросу. В отличие от ограничения DataFrame, если вы укажете CassandraRDD limit несколько раз (.limit(10).limit(20)) - будет добавлен только последний. Кроме того, я использовал n вместо n / numPartitions + 1, так как это (даже если разделы Spark и Cassandra совпадают) может возвращать меньше результатов для каждого раздела. В результате мне пришлось добавить take(n), чтобы сократить <= numPartitions * n до n.

Предупреждение дважды проверьте, что ваши where могут быть переведены в CQL (используя explain()) — в противном случае перед фильтрацией будет применен LIMIT.

P.S. Вы также можете попробовать запустить CQL напрямую, используя sparkSession.sql(...) (как здесь) и сравните результаты.

person dk14    schedule 01.04.2018