Pyspark изменяет значение столбца, если значение из списка находится в другом столбце

У меня есть такой кадр данных:

+-------+----------------+
|Name   |Source          |
+-------+----------------+
|Tom    |clientA-incoming|
|Dick   |clientB-incoming|
|Harry  |c-abc-incoming  |

и я хотел бы добавить столбец slug, чтобы получить этот фрейм данных:

+-------+----------------+--------+
|Name   |Source          |slug    |
+-------+----------------+--------+
|Tom    |clientA-incoming|clientA |
|Dick   |clientB-incoming|clientB |
|Harry  |c-abc-incoming  |c-abc   |

и у меня есть список значений, в которых есть слаги:

slugs = ['clientA', 'clientB', 'c-abc']

в основном я думаю что-то вроде этого псевдокода:

for i in slugs:
    if i in df['Source']:
        df['Slug'] = i

может кто-нибудь помочь мне пересечь финишную черту?

РЕДАКТИРОВАТЬ:

Я хочу обновить столбец slug значением из списка slugs. Конкретное значение, которое входит в столбец slug, определяется на основе столбца Source.

Например, поскольку slugs[0] = 'clientA' и clientA являются подстрокой clientA-incoming, я хотел бы обновить значение этой строки в столбце slug до clientA.


person DBA108642    schedule 20.02.2020    source источник
comment
Итак, вероятно, у вас будут строки, которых нет в slugs, и они будут разной длины?   -  person roganjosh    schedule 20.02.2020
comment
Вы хотите заполнить столбец slug на основе столбца Source? или из slugs списка..?   -  person Shu    schedule 20.02.2020
comment
каждая строка имеет соответствующее значение в slugs, но будут значения в slugs, у которых нет строки   -  person DBA108642    schedule 20.02.2020
comment
@Shu Я хочу заполнить slug данными из списка slugs на основе значений из Source   -  person DBA108642    schedule 20.02.2020
comment
Используйте композицию списка с coalesce, contains (или startswith) и when для выполнения if-then -else логика: Что-то вроде: df.withColumn('slug', coalesce(*[when(col('Source').contains(slug), lit(slug)) for slug in slugs]))   -  person pault    schedule 21.02.2020


Ответы (1)


Это можно решить с помощью левого или внутреннего соединения в зависимости от ваших требований:

from pyspark.sql.functions import broadcast

slugs = ['clientA', 'clientB', 'c-abc', 'f-gd']
sdf = spark.createDataFrame(slugs, "string").withColumnRenamed("value", "slug")

df = spark.createDataFrame([
  ["Tom", "clientA-incoming"],
  ["Dick", "clientB-incoming"],
  ["Harry", "c-abc-incoming"],
  ["Harry", "c-dgl-incoming"]
], ["Name", "Source"])

df.join(broadcast(sdf), df["Source"].contains(sdf["slug"]), "left").show()

# +-----+----------------+-------+
# | Name|          Source|   slug|
# +-----+----------------+-------+
# |  Tom|clientA-incoming|clientA|
# | Dick|clientB-incoming|clientB|
# |Harry|  c-abc-incoming|  c-abc|
# |Harry|  c-dgl-incoming|   null|
# +-----+----------------+-------+

Обратите внимание, что мы транслируем меньший df, чтобы предотвратить перетасовку.

person abiratsis    schedule 21.02.2020