Рефакторинг pyspark joinWithCassandraTable без карт

Я новичок в использовании spark/scala здесь, и у меня проблемы с рефакторингом некоторого моего кода здесь. Я использую Scala 2.11, используя pyspark и настройку spark/yarn. Следующее работает, но мне бы хотелось его очистить и получить от этого максимальную производительность. Я читал в другом месте, что pyspark udf и lambdas могут сильно повлиять на производительность, поэтому я пытался уменьшить или удалить их.

# Reduce ingest df1 data by joining on allowed table df2
to_process = df2\
    .join(
        sf.broadcast(df1),
        df2.secondary_id == df1.secondary_id,
        how="inner")\
    .rdd\
    .map(lambda r: Row(tag=r['tag_id'], user_uuid=r['user_uuid']))

# Type column fixed to type=2, and tag==key
ready_to_join = to_process.map(lambda r: (r[0], 2, r[1]))

# Join with cassandra table to find matches
exists_in_cass = ready_to_join\
    .joinWithCassandraTable(keyspace, table3)\
    .on("user_uuid", "type")\
    .select("user_uuid")

log.error(f"TEST PRINT - [{exists_in_cass.count()}]")

таблица Кассандры такова, что

CREATE TABLE keyspace.table3 (
    user_uuid uuid,
    type int,
    key text,
    value text,
    PRIMARY KEY (user_uuid, type, key)
) WITH CLUSTERING ORDER BY (type ASC, key ASC)

в настоящее время я получил

to_process = df2\
    .join(
        sf.broadcast(df1),
        df2.secondary_id == df1.secondary_id,
        how="inner")\
    .select(col("user_uuid"), col("tag_id").alias("tag"))

ready_to_join = to_process\
        .withColumn("type", sf.lit(2))\
        .select('user_uuid', 'type', col('tag').alias("key"))\
        .rdd\
        .map(lambda x: Row(x))

# planning on using repartitionByCassandraReplica here after I get it logically working
exists_in_cass = ready_to_join\
        .joinWithCassandraTable(keyspace, table3)\
        .on("user_uuid", "type")\
        .select("user_uuid")

log.error(f"TEST PRINT - [{exists_in_cass.count()}]")

но я получаю такие ошибки, как

2020-10-30 15:10:42 WARN  TaskSetManager:66 - Lost task 148.0 in stage 22.0 (TID ----, ---, executor 9): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)

ищу помощи от любых искровых гуру, чтобы указать мне на что-нибудь глупое, что я здесь делаю.

Обновлять

Благодаря предложению Алекса использование spark-cassandra-connector v2.5+ дает возможность напрямую соединять фреймы данных. Я обновил свой код, чтобы использовать это вместо этого.

to_process = df2\
    .join(
        sf.broadcast(df1),
        df2.secondary_id == df1.secondary_id,
        how="inner")\
    .select(col("user_uuid"), col("tag_id").alias("tag"))

ready_to_join = to_process\
        .withColumn("type", sf.lit(2))\
        .select(col('user_uuid').alias('c1_user_uuid'), 'type', col('tag').alias("key"))\

cass_table = spark_session
        .read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table=config.table, keyspace=config.keyspace) \
    .load()

exists_in_cass = ready_to_join\
        .join(
            cass_table,
            [(cass_table["user_uuid"] == ready_to_join["c1_user_uuid"]) &
             (cass_table["key"]  == ready_to_join["key"]) &
             (cass_table["type"] == ready_to_join["type"])])\
        .select(col("c1_user_uuid").alias("user_uuid"))

    
exists_in_cass.explain()
log.error(f"TEST PRINT - [{exists_in_cass.count()}]")

Насколько я знаю, теоретически это должно быть намного быстрее! Но я получаю ошибки во время выполнения с тайм-аутом базы данных.

WARN  TaskSetManager:66 - Lost task 827.0 in stage 12.0 (TID 9946, , executor 4): java.io.IOException: Exception during execution of SELECT "user_uuid", "key" FROM "keyspace"."table3" WHERE token("user_uuid") > ? AND token("user_uuid") <= ? AND "type" = ?   ALLOW FILTERING: Query timed out after PT2M


TaskSetManager:66 - Lost task 125.0 in stage 12.0 (TID 9215, , executor 7): com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT2M

etc

У меня есть конфигурация для настройки искры, позволяющая использовать расширения искры.

--packages mysql:mysql-connector-java:5.1.47,com.datastax.spark:spark-cassandra-connector_2.11:2.5.1  \

--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \

DAG от spark показывает, что все узлы полностью исчерпаны. Должен ли я разбивать свои данные перед запуском соединения здесь?

Объяснение для этого также не показывает прямое соединение (в объяснении больше кода, чем во фрагменте выше)

== Physical Plan ==
*(6) Project [c1_user_uuid#124 AS user_uuid#158]
+- *(6) SortMergeJoin [c1_user_uuid#124, key#125L], [user_uuid#129, cast(key#131 as bigint)], Inner
   :- *(3) Sort [c1_user_uuid#124 ASC NULLS FIRST, key#125L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(c1_user_uuid#124, key#125L, 200)
   :     +- *(2) Project [id#0 AS c1_user_uuid#124, tag_id#101L AS key#125L]
   :        +- *(2) BroadcastHashJoin [secondary_id#60], [secondary_id#100], Inner, BuildRight
   :           :- *(2) Filter (isnotnull(secondary_id#60) && isnotnull(id#0))
   :           :  +- InMemoryTableScan [secondary_id#60, id#0], [isnotnull(secondary_id#60), isnotnull(id#0)]
   :           :        +- InMemoryRelation [secondary_id#60, id#0], StorageLevel(disk, memory, deserialized, 1 replicas)
   :           :              +- *(7) Project [secondary_id#60, id#0]
   :           :                 +- Generate explode(split(secondary_ids#1, \|)), [id#0], false, [secondary_id#60]
   :           :                    +- *(6) Project [id#0, secondary_ids#1]
   :           :                       +- *(6) SortMergeJoin [id#0], [guid#46], Inner
   :           :                          :- *(2) Sort [id#0 ASC NULLS FIRST], false, 0
   :           :                          :  +- Exchange hashpartitioning(id#0, 200)
   :           :                          :     +- *(1) Filter (isnotnull(id#0) && id#0 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12})
   :           :                          :        +- InMemoryTableScan [id#0, secondary_ids#1], [isnotnull(id#0), id#0 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}]
   :           :                          :              +- InMemoryRelation [id#0, secondary_ids#1], StorageLevel(disk, memory, deserialized, 1 replicas)
   :           :                          :                    +- Exchange RoundRobinPartitioning(3840)
   :           :                          :                       +- *(1) Filter AtLeastNNulls(n, id#0,secondary_ids#1)
   :           :                          :                          +- *(1) FileScan csv [id#0,secondary_ids#1] Batched: false, Format: CSV, Location: InMemoryFileIndex[inputdata_file, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,secondary_ids:string>
   :           :                          +- *(5) Sort [guid#46 ASC NULLS FIRST], false, 0
   :           :                             +- Exchange hashpartitioning(guid#46, 200)
   :           :                                +- *(4) Filter (guid#46 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12} && isnotnull(guid#46))
   :           :                                   +- Generate explode(set_guid#36), false, [guid#46]
   :           :                                      +- *(3) Project [set_guid#36]
   :           :                                         +- *(3) Filter (isnotnull(allowed#39) && (allowed#39 = 1))
   :           :                                            +- *(3) FileScan orc whitelist.whitelist1[set_guid#36,region#39,timestamp#43] Batched: false, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://file, PartitionCount: 1, PartitionFilters: [isnotnull(timestamp#43), (timestamp#43 = 18567)], PushedFilters: [IsNotNull(region), EqualTo(region,1)], ReadSchema: struct<set_guid:array<string>,region:int>
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
    FROM TAG as T
    JOIN MAP as M
    ON T.tag_id = M.tag_id
    WHERE (expire >= NOW() OR expire IS NULL)
    ORDER BY T.tag_id) AS subset) [numPartitions=1] [secondary_id#100,tag_id#101L] PushedFilters: [*IsNotNull(secondary_id), *IsNotNull(tag_id)], ReadSchema: struct<secondary_id:string,tag_id:bigint>
   +- *(5) Sort [user_uuid#129 ASC NULLS FIRST, cast(key#131 as bigint) ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(user_uuid#129, cast(key#131 as bigint), 200)
         +- *(4) Project [user_uuid#129, key#131]
            +- *(4) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user_uuid#129,key#131] PushedFilters: [*EqualTo(type,2)], ReadSchema: struct<user_uuid:string,key:string>

У меня не работают прямые соединения, что вызывает тайм-ауты.

Обновление 2

Я думаю, что это не разрешает прямые соединения, поскольку мои типы данных в кадрах данных отключены. В частности, тип uuid


person mf42    schedule 30.10.2020    source источник


Ответы (1)


Вместо использования RDD API с PySpark я предлагаю использовать Spark Cassandra Connector (SCC) 2.5.x или 3.0.x (объявление о выпуске), которые содержат реализацию соединения Dataframe с Cassandra - в этом случае вам не нужно будет спускаться к RDD , но просто используйте обычные соединения Dataframe API.

Обратите внимание, что это не включено по умолчанию, поэтому вам нужно будет запустить pyspark или spark-submit со специальной конфигурацией, например:

pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1 \
   --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

Подробнее о присоединении к Cassandra можно узнать в моем недавнем блоге. пост на эту тему (хотя он использует Scala, часть Dataframe должна быть переведена почти один в один в PySpark)

person Alex Ott    schedule 31.10.2020
comment
Отличное предложение! Я не знал, что в новом коннекторе есть поддержка соединения фреймов данных. Спасибо за отличный комментарий, сообщение в блоге также было хорошо прочитано. Я обновил вопрос, чтобы использовать новый разъем, но у меня все еще низкая производительность, и теперь новые тайм-ауты - person mf42; 03.11.2020
comment
когда вы выполняете .explain, вы видите DirectJoin в выводе? - person Alex Ott; 03.11.2020
comment
Я не знаю (фрагмент выше), у меня есть искровые расширения, и таблица содержит ‹ 90% данных базы данных. - person mf42; 03.11.2020
comment
добавлено полное объяснение вопроса для отслеживания - person mf42; 03.11.2020