Я новичок в использовании spark/scala здесь, и у меня проблемы с рефакторингом некоторого моего кода здесь. Я использую Scala 2.11, используя pyspark и настройку spark/yarn. Следующее работает, но мне бы хотелось его очистить и получить от этого максимальную производительность. Я читал в другом месте, что pyspark udf и lambdas могут сильно повлиять на производительность, поэтому я пытался уменьшить или удалить их.
# Reduce ingest df1 data by joining on allowed table df2
to_process = df2\
.join(
sf.broadcast(df1),
df2.secondary_id == df1.secondary_id,
how="inner")\
.rdd\
.map(lambda r: Row(tag=r['tag_id'], user_uuid=r['user_uuid']))
# Type column fixed to type=2, and tag==key
ready_to_join = to_process.map(lambda r: (r[0], 2, r[1]))
# Join with cassandra table to find matches
exists_in_cass = ready_to_join\
.joinWithCassandraTable(keyspace, table3)\
.on("user_uuid", "type")\
.select("user_uuid")
log.error(f"TEST PRINT - [{exists_in_cass.count()}]")
таблица Кассандры такова, что
CREATE TABLE keyspace.table3 (
user_uuid uuid,
type int,
key text,
value text,
PRIMARY KEY (user_uuid, type, key)
) WITH CLUSTERING ORDER BY (type ASC, key ASC)
в настоящее время я получил
to_process = df2\
.join(
sf.broadcast(df1),
df2.secondary_id == df1.secondary_id,
how="inner")\
.select(col("user_uuid"), col("tag_id").alias("tag"))
ready_to_join = to_process\
.withColumn("type", sf.lit(2))\
.select('user_uuid', 'type', col('tag').alias("key"))\
.rdd\
.map(lambda x: Row(x))
# planning on using repartitionByCassandraReplica here after I get it logically working
exists_in_cass = ready_to_join\
.joinWithCassandraTable(keyspace, table3)\
.on("user_uuid", "type")\
.select("user_uuid")
log.error(f"TEST PRINT - [{exists_in_cass.count()}]")
но я получаю такие ошибки, как
2020-10-30 15:10:42 WARN TaskSetManager:66 - Lost task 148.0 in stage 22.0 (TID ----, ---, executor 9): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
ищу помощи от любых искровых гуру, чтобы указать мне на что-нибудь глупое, что я здесь делаю.
Обновлять
Благодаря предложению Алекса использование spark-cassandra-connector v2.5+ дает возможность напрямую соединять фреймы данных. Я обновил свой код, чтобы использовать это вместо этого.
to_process = df2\
.join(
sf.broadcast(df1),
df2.secondary_id == df1.secondary_id,
how="inner")\
.select(col("user_uuid"), col("tag_id").alias("tag"))
ready_to_join = to_process\
.withColumn("type", sf.lit(2))\
.select(col('user_uuid').alias('c1_user_uuid'), 'type', col('tag').alias("key"))\
cass_table = spark_session
.read \
.format("org.apache.spark.sql.cassandra") \
.options(table=config.table, keyspace=config.keyspace) \
.load()
exists_in_cass = ready_to_join\
.join(
cass_table,
[(cass_table["user_uuid"] == ready_to_join["c1_user_uuid"]) &
(cass_table["key"] == ready_to_join["key"]) &
(cass_table["type"] == ready_to_join["type"])])\
.select(col("c1_user_uuid").alias("user_uuid"))
exists_in_cass.explain()
log.error(f"TEST PRINT - [{exists_in_cass.count()}]")
Насколько я знаю, теоретически это должно быть намного быстрее! Но я получаю ошибки во время выполнения с тайм-аутом базы данных.
WARN TaskSetManager:66 - Lost task 827.0 in stage 12.0 (TID 9946, , executor 4): java.io.IOException: Exception during execution of SELECT "user_uuid", "key" FROM "keyspace"."table3" WHERE token("user_uuid") > ? AND token("user_uuid") <= ? AND "type" = ? ALLOW FILTERING: Query timed out after PT2M
TaskSetManager:66 - Lost task 125.0 in stage 12.0 (TID 9215, , executor 7): com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT2M
etc
У меня есть конфигурация для настройки искры, позволяющая использовать расширения искры.
--packages mysql:mysql-connector-java:5.1.47,com.datastax.spark:spark-cassandra-connector_2.11:2.5.1 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \
DAG от spark показывает, что все узлы полностью исчерпаны. Должен ли я разбивать свои данные перед запуском соединения здесь?
Объяснение для этого также не показывает прямое соединение (в объяснении больше кода, чем во фрагменте выше)
== Physical Plan ==
*(6) Project [c1_user_uuid#124 AS user_uuid#158]
+- *(6) SortMergeJoin [c1_user_uuid#124, key#125L], [user_uuid#129, cast(key#131 as bigint)], Inner
:- *(3) Sort [c1_user_uuid#124 ASC NULLS FIRST, key#125L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c1_user_uuid#124, key#125L, 200)
: +- *(2) Project [id#0 AS c1_user_uuid#124, tag_id#101L AS key#125L]
: +- *(2) BroadcastHashJoin [secondary_id#60], [secondary_id#100], Inner, BuildRight
: :- *(2) Filter (isnotnull(secondary_id#60) && isnotnull(id#0))
: : +- InMemoryTableScan [secondary_id#60, id#0], [isnotnull(secondary_id#60), isnotnull(id#0)]
: : +- InMemoryRelation [secondary_id#60, id#0], StorageLevel(disk, memory, deserialized, 1 replicas)
: : +- *(7) Project [secondary_id#60, id#0]
: : +- Generate explode(split(secondary_ids#1, \|)), [id#0], false, [secondary_id#60]
: : +- *(6) Project [id#0, secondary_ids#1]
: : +- *(6) SortMergeJoin [id#0], [guid#46], Inner
: : :- *(2) Sort [id#0 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(id#0, 200)
: : : +- *(1) Filter (isnotnull(id#0) && id#0 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12})
: : : +- InMemoryTableScan [id#0, secondary_ids#1], [isnotnull(id#0), id#0 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}]
: : : +- InMemoryRelation [id#0, secondary_ids#1], StorageLevel(disk, memory, deserialized, 1 replicas)
: : : +- Exchange RoundRobinPartitioning(3840)
: : : +- *(1) Filter AtLeastNNulls(n, id#0,secondary_ids#1)
: : : +- *(1) FileScan csv [id#0,secondary_ids#1] Batched: false, Format: CSV, Location: InMemoryFileIndex[inputdata_file, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,secondary_ids:string>
: : +- *(5) Sort [guid#46 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(guid#46, 200)
: : +- *(4) Filter (guid#46 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12} && isnotnull(guid#46))
: : +- Generate explode(set_guid#36), false, [guid#46]
: : +- *(3) Project [set_guid#36]
: : +- *(3) Filter (isnotnull(allowed#39) && (allowed#39 = 1))
: : +- *(3) FileScan orc whitelist.whitelist1[set_guid#36,region#39,timestamp#43] Batched: false, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://file, PartitionCount: 1, PartitionFilters: [isnotnull(timestamp#43), (timestamp#43 = 18567)], PushedFilters: [IsNotNull(region), EqualTo(region,1)], ReadSchema: struct<set_guid:array<string>,region:int>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
FROM TAG as T
JOIN MAP as M
ON T.tag_id = M.tag_id
WHERE (expire >= NOW() OR expire IS NULL)
ORDER BY T.tag_id) AS subset) [numPartitions=1] [secondary_id#100,tag_id#101L] PushedFilters: [*IsNotNull(secondary_id), *IsNotNull(tag_id)], ReadSchema: struct<secondary_id:string,tag_id:bigint>
+- *(5) Sort [user_uuid#129 ASC NULLS FIRST, cast(key#131 as bigint) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(user_uuid#129, cast(key#131 as bigint), 200)
+- *(4) Project [user_uuid#129, key#131]
+- *(4) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user_uuid#129,key#131] PushedFilters: [*EqualTo(type,2)], ReadSchema: struct<user_uuid:string,key:string>
У меня не работают прямые соединения, что вызывает тайм-ауты.
Обновление 2
Я думаю, что это не разрешает прямые соединения, поскольку мои типы данных в кадрах данных отключены. В частности, тип uuid