Трансляция нескольких представлений в SQL в pyspark

Я хочу использовать подсказку BROADCAST для нескольких небольших таблиц при объединении с большой таблицей. В приведенном ниже примере SMALLTABLE2 объединяется несколько раз с LARGETABLE в разных столбцах соединения. Теперь, чтобы получить лучшую производительность, я хочу, чтобы и SMALLTABLE1, и SMALLTABLE2 транслировались. Можно ли этого добиться, просто добавив подсказку /* BROADCAST (B,C,D,E) */ или есть лучшее решение? SMALLTABLE1 и SMALLTABLE2 Я получаю данные, запрашивая таблицы HIVE в Dataframe, а затем использую createOrReplaceTempView для создания представления как SMALLTABLE1 и SMALLTABLE2; который позже используется в запросе, как показано ниже.

Есть ли представление BROADCASTING, созданное с помощью функции createOrReplaceTempView?

SELECT A.COL1, A.COL2, A.COL3, B.COL4, C.COL5, D.COL6, E.COL7
FROM LARGETABLE A
JOIN SMALLTABLE1 B
ON A.LCOL = B.SCOL
JOIN SMALLTABLE2 C
ON A.LCOL1 = C.SCOL
JOIN SMALLTABLE2 D
ON A.LCOL2 = D.SCOL
JOIN SMALLTABLE2 E
ON A.LCOL3 = E.SCOL

person Koushik Chandra    schedule 28.07.2018    source источник


Ответы (1)


Если вы используете spark 2.2+, вы можете использовать любую из этих MAPJOIN/BROADCAST/BROADCASTJOIN подсказок.

См. эту Jira и это для получения дополнительных сведений об этой функции.

Пример: ниже я использовал broadcast, но вы можете использовать подсказки mapjoin/broadcastjoin, чтобы получить тот же план объяснения.

>>> spark.range(1000000000).createOrReplaceTempView("t")
>>> spark.range(1000000000).createOrReplaceTempView("u")
>>>sql("select /*+ Broadcast(t,u) */* from t join u on t.id=u.id").explain()
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#16L], Inner, BuildRight
:- *Range (0, 1000000000, step=1, splits=56)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 1000000000, step=1, splits=56)

(or)

если вы используете Spark ‹ 2, тогда нам нужно использовать API-интерфейс dataframe для сохранения, а затем зарегистрировавшись в качестве временной таблицы, мы можем добиться присоединения к памяти.

>>> df=hc.range(10000000)
>>> df.persist() --persist the df in memory
>>> df.registerTempTable("u") --register temp table
>>> df1=hc.range(10000000)
>>> df1.persist()
>>> df1.registerTempTable("t")
>>> hc.sql("select * from t join u on t.id=u.id").explain()
== Physical Plan ==
Project [id#11L,id#26L]
+- SortMergeJoin [id#11L], [id#26L]
   :- Sort [id#11L ASC], false, 0
   :  +- TungstenExchange hashpartitioning(id#11L,200), None
   :     +- InMemoryColumnarTableScan [id#11L], InMemoryRelation [id#11L], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
   +- Sort [id#26L ASC], false, 0
      +- TungstenExchange hashpartitioning(id#26L,200), None
         +- InMemoryColumnarTableScan [id#26L], InMemoryRelation [id#26L], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None

С помощью DataFrames без создания временных таблиц

>>>from pyspark.sql.functions import *
>>> df=hc.range(10000000)
>>> df1=hc.range(10000000)
>>> df.join(broadcast(df1),['id']).explain()
== Physical Plan ==
Project [id#26L]
+- BroadcastHashJoin [id#26L], [id#11L], BuildRight
   :- ConvertToUnsafe
   :  +- Scan ExistingRDD[id#26L]
   +- ConvertToUnsafe
      +- Scan ExistingRDD[id#11L]

кроме того, широковещательные соединения выполняются в Spark автоматически.

Существует параметр spark.sql.autoBroadcastJoinThreshold, для которого по умолчанию задано значение 10 МБ.

Чтобы изменить значение по умолчанию, затем

conf.set("spark.sql.autoBroadcastJoinThreshold", 1024*1024*<mb_value>)

для получения дополнительной информации см. эту ссылка на spark.sql.autoBroadcastJoinThreshold.

person Shu    schedule 29.07.2018