Как я могу присоединиться к искровому живому потоку со всеми данными, собранными другим потоком за весь его жизненный цикл?

У меня есть два искровых потока, в первом идут данные, связанные с продуктами: их цена поставщику, валюта, их описание, идентификатор поставщика. Эти данные обогащаются категорией, угадываемой по анализу описания и цене в долларах. Затем они сохраняются в наборе данных паркета.

Второй поток содержит данные о продаже этих товаров с аукциона, затем стоимость, по которой они были проданы, и дату.

Учитывая тот факт, что продукт может поступить в первый поток сегодня и быть проданным через год, как я могу присоединиться ко второму потоку со всей историей, содержащейся в наборе данных паркета первого потока?

Результат, чтобы было понятно, должен быть средним дневным заработком в ценовом диапазоне ...


person Claudio D'Alicandro    schedule 17.01.2018    source источник


Ответы (2)


Я нашел возможное решение с snappydata, используя его изменяемый DataFrame:

https://www.snappydata.io/blog/how-mutable-dataframes-improve-join-performance-spark-sql

Приведенный пример очень похож на описанный claudio-dalicandro.

person giorrrgio    schedule 22.01.2018
comment
Это именно то, что мне нужно - person Claudio D'Alicandro; 22.01.2018

Если вы используете структурированную потоковую передачу в Spark, вы можете загрузить файлы паркета первого потока в кадр данных.

parquetFileDF = spark.read.parquet("products.parquet")

Затем вы можете получить свой второй поток и присоединиться к файлу паркета.

streamingDF = spark.readStream. ...
streamingDF.join(parquetFileDF, "type", "right_join")

Даже вы можете присоединиться со своим первым потоком ко второму потоку.

Надеюсь это поможет.

person Gourav Dutta    schedule 18.01.2018
comment
К сожалению использую классические потоки, но даже если бы с вашим решением было не так я бы не рассматривал обновления паркета, который переписывается с окном 30 минут от первого потока... Если бы был способ читать фрейм данных каждые 30 минут было бы здорово! В качестве альтернативы мне было бы достаточно иметь возможность присоединиться к двум структурированным потокам... - person Claudio D'Alicandro; 18.01.2018