Как можем да използваме функцията dense_rank() в pyspark?

Изпълнявам скрипт pyspark, в който изпълнявам sql заявка и създавам рамка с данни. В sql заявката има функция dense_rank(). Поради това пълното изпълнение на тази заявка отнема твърде много време.

Има ли някакъв начин да изпълним заявката бързо или можем да се справим с това на ниво pyspark? Налична ли е някаква функция или метод в pyspark за замяна на dense_rank() от sql?

SQL:

SELECT  DENSE_RANK() OVER(ORDER BY SOURCE_COLUMN_VALUE) AS SYSTEM_ID,SYSTEM_TABLE_NAME,SOURCE_ID,SOURCE_NAME,SOURCE_TABLE_NAME,SOURCE_COLUMN_NAME,SRC_VALUE AS SOURCE_COLUMN_VALUE,IM_INSERT_DT FROM (SELECT ID AS SOURCE_ID,'AMPIL' AS SOURCE_NAME,UPPER(concat(coalesce(addr_line_1,''),';',coalesce(addr_line_2,''),';',coalesce(city_1,''),';',coalesce(state_1,''),';',coalesce(zip_1,''),';',coalesce(cntry_1,''))) as  SOURCE_COLUMN_VALUE,concat(coalesce(addr_line1_src,''),';',coalesce(addr_line2_src,''),';',coalesce(city_src,''),';',coalesce(state_crc,''),';',coalesce(zip_1,''),';',coalesce(cntry_1,'')) as SRC_VALUE,SOURCE_TABLE_NAME,'ADDRESS' AS SYSTEM_TABLE_NAME,SOURCE_COLUMN_NAME,date_format(current_timestamp(),'yyyy-MM-dd hh:mm:ss') as IM_INSERT_DT from (SELECT ID,regexp_replace(addr_line_1,' ','') as addr_line_1,Upper(addr_line_1) as addr_line1_src,regexp_replace(addr_line_2,' ','') as addr_line_2 ,upper(addr_line_2) as addr_line2_src,regexp_replace(UPPER(coalesce(city,meli_city_nm)),' ','') as city_1,UPPER(coalesce(city,meli_city_nm)) as city_src,regexp_replace(coalesce(meli_stt_provncd,coalesce(vw_states_code.state_cd,state)),' ','') as state_1, coalesce(meli_stt_provncd,coalesce(vw_states_code.state_cd,state)) as state_crc,case when UPPER(coalesce(vw_states_code.country_cd,country)) = 'US' then 'USA' when UPPER(coalesce(vw_states_code.country_cd,country)) = 'CANADA' then 'CA' else regexp_replace(UPPER(coalesce(vw_states_code.country_cd,country)),' ','') end as cntry_1,case when UPPER(coalesce(vw_states_code.country_cd,country)) = 'US' then regexp_extract(substr(trim(regexp_replace(zip,' ','')),0,5),'^[0-9]{5}$',0) else regexp_replace(zip,' ','') end as zip_1,SOURCE_TABLE_NAME,SOURCE_COLUMN_NAME from vw_addr_stg LEFT JOIN (select * from vw_dmn_meli_zip where MELI_LAST_LN = 'L') vw_dmn_meli  on vw_addr_stg.zip=vw_dmn_meli.meli_zip_cd_base LEFT JOIN vw_states_code on (coalesce(meli_stt_provncd,state) = vw_states_code.state_cd or vw_states_code.state_nm = vw_addr_stg.state) LEFT JOIN vw_country_codes on vw_country_codes.country_name = vw_addr_stg.country))

person shivam    schedule 10.04.2020    source източник


Отговори (1)


В pyspark можете да използвате комбинация от Window функции и SQL функции, за да получите това, което искате. Не владея SQL и не съм тествал решението, но нещо подобно може да ви помогне:

import pyspark.sql.Window as psw
import pyspark.sql.functions as psf

w = psw.Window.partitionBy("SOURCE_COLUMN_VALUE")
df.withColumn("SYSTEM_ID", dense_rank().over(w))

Можете да намерите документа за dense_rank тук

person linog    schedule 10.04.2020
comment
Връзката към документацията изглежда е променена: spark.apache.org/docs/latest/api/python/reference/api/ - person karpan; 25.07.2021
comment