Pyspark: присоединитесь к 2 фреймворкам данных, чтобы получать только новые записи из 2-го фрейма данных (историзация)

У меня есть 2 фрейма данных df1 и df2. Я хочу, чтобы результат этого фрейма был таким: 1. Возьмите все записи df1. 2. Возьмите только новые записи из df2 (записи, которых нет в df1) 3. Создайте новый фрейм данных этой логики

Примечание. Первичный ключ - id. Я хочу проверить только идентификатор, а не полную строку. Если Id недоступен в df1, то только сказка из df2.

df1

    +------+-------------+-----+
    |  id  |time         |other|
    +------+-------------+-----+
    |   111|  29-12-2019 |   p1|
    |   222|  29-12-2019 |   p2|
    |   333|  29-12-2019 |   p3|
    +----+-----+-----+---------+

df2

    +------+-------------+-----+
    |  id  |time         |other|
    +------+-------------+-----+
    |   111|  30-12-2019 |   p7|
    |   222|  30-12-2019 |   p8|
    |   444|  30-12-2019 |   p0|
    +----+-----+-----+---------+

Результат

+------+-------------+-----+
|  id  |time         |other|
+------+-------------+-----+
|   111|  29-12-2019 |   p1|
|   222|  29-12-2019 |   p2|
|   333|  29-12-2019 |   p3|
|   444|  30-12-2019 |   p0|
+----+-----+-----+---------+

Не могли бы вы помочь мне, как это сделать в pyspark. Я планирую использовать соединение.


person Hina Patidar    schedule 30.12.2019    source источник


Ответы (2)


Наконец, я написал этот код и, похоже, отлично работает для 12 000 000 строк, его сборка занимает всего 5 минут. Надеюсь, это поможет другим:

df1=spark.createDataFrame([(111,'29-12-2019','p1'),(222,'29-12-2019','p2'),(333,'29-12-2019','p3')],['id','time','other'])
df2=spark.createDataFrame([(111,'30-12-2019','p7'),(222,'30-12-2019','p8'),(444,'30-12-2019','p0')],['id','time','other'])

#So this is giving me all records which are not available in df1 dataset
new_input_df = df2.join(df1, on=['id'], how='left_anti')

#Now union df1(historic reocrds) and new_input_df  which contains only new 
final_df = df1.union(new_input_df)

final_df.show()
person Hina Patidar    schedule 02.01.2020

person    schedule
comment
Спасибо за ответ, но у меня миллионы записей, и объединение вызывает проблемы с памятью. Медленнее использовать объединение для такого количества записей. как насчет соединения и использования вычитания или исключения? - person Hina Patidar; 30.12.2019
comment
Присоединение - одна из самых дорогостоящих операций, которые вы обычно используете в Spark, поэтому перед выполнением соединения стоит сделать все возможное, чтобы сжать данные. - person Prathik Kini; 31.12.2019
comment
Думаю, да, объединение двух таблиц с таким количеством записей может быть очень дорогостоящим, поэтому я использовал объединение только для фильтрации новых записей и использования UNION для окончательного вывода. - person Hina Patidar; 02.01.2020