Преобразование папки данных S3 в секционированное и секционированное хранилище данных

У меня есть папка с ТБ структурированных данных, и все они придерживаются фиксированной схемы. Структура папок такая:

s3://main-bucket/
                folder-1/ <= One folder will only contain data from one algorithm
                                part-0000-1.csv <= Created on 06/01/2019 by algorithm1
                                part-0000-2.csv <= Created on 06/01/2019 by algorithm1
                folder-2/
                                part-1000-1.csv <= Created on 06/02/2019 by algorithm2
                                part-1000-2.csv <= Created on 06/02/2019 by algorithm2

Запросы в main-bucket с использованием Spark или Athena очень неэффективны, и я хотел преобразовать его в такой формат:

s3://data-bucket/
                algorithm1/ <= derived from a column in folder-1/part files and all rows in this folder has the same algorithm value
                  2019/                   
                    06/
                      01/
                        part-0001.parquet
                algorithm2/
                    2019/                   
                       06/
                         02/
                            part-0001.parquet

Как бы то ни было, чтобы это сделать?


person Salil Surendran    schedule 31.12.2019    source источник


Ответы (3)


Предположительно, в ваших файлах данных есть поле, которое содержит элементы даты или, по крайней мере, поле даты.

Папки разбиения на разделы обычно включают имя поля, например:

algorithm1/year=2019/month=06/day=01/part-0001.parquet

Таким образом, Афина может «знать», что находится в каждом каталоге, просто просматривая имена путей.

Самый простой способ преобразовать этот формат - с помощью самой Athena. В основном:

  • Определите таблицу ввода, которая описывает существующие данные и указывает на них.
  • Определите выходную таблицу, настроенную как Паркет, определяя поля, которые будут использоваться для разделов
  • Выбрать данные из входной таблицы в выходную таблицу

См .: Преобразование в столбцы - Amazon Athena

person John Rotenstein    schedule 31.12.2019
comment
К сожалению, проблема заключается в том, что в моих файлах данных нет поля, содержащего дату. Единственная временная метка, которую я могу использовать, - это метка в файле s3. - person Salil Surendran; 31.12.2019
comment
Если каждый файл данных относится к определенному дню, вы можете переместить файлы в иерархию, например algorithm1/year=2019/month=06/day=01/part-0001.parquet или даже algorithm1/date=2019-06-01/part-0001.parquet. Это позволит Афине создавать поля даты на основе имени папки. Работает неплохо. - person John Rotenstein; 01.01.2020
comment
Проблема в том, что это 7,7 ТБ данных, и перемещение всех этих файлов в S3 с помощью одного скрипта или программы займет вечность, потому что в S3 нет переименования. Все эти файлы нужно будет переместить. Единственный способ сделать это - использовать какой-нибудь инструмент, например AWS Glue или Spark. - person Salil Surendran; 01.01.2020
comment
В любом случае вам нужно будет обработать все данные, поскольку вы хотите преобразовать из CSV в Parquet. После того, как вы переместили исторические данные, данных за каждый новый день будет довольно мало. Или, если вы просто хотите переместить данные, не обрабатывая их, вы можете написать сценарий, который использует aws s3 mv - за кулисами он фактически будет выполнять копирование и удаление. - person John Rotenstein; 01.01.2020

Поскольку у вас нет поля даты в ваших данных, вы не сможете создавать эти разделы только с помощью Spark.

Одно из возможных решений состоит в том, чтобы перечислить все файлы CSV из корзины S3 и собрать метаданные: creation/modification time и owner. Для этого вы можете использовать API файловой системы Hadoop. Что-то вроде этого:

import org.apache.hadoop.fs._

val path = new Path("s3://main-bucket/")
val fs = path.getFileSystem(spark.sessionState.newHadoopConf)
val files = fs.listStatus(path)

val filesMeta = files.map{f => (f.getPath().toString, f.getModificationTime(), f.getOwner())}

Для рекурсивного листинга вы можете использовать _ 4_.

С помощью этого списка вы можете создать DataFrame со столбцами: file_path, timestamp, owner, который вы можете объединить с DataFrame, который вы получаете из всех данных, используя столбец input_file_name(), и добавить столбцы timestamp и creator с некоторым форматированием даты.

Теперь у вас есть создатель столбца и дата, поэтому вы можете написать паркет и разделение по этим двум столбцам, чтобы получить структуру, которую вы ищете.

person blackbishop    schedule 01.01.2020

При условии, что ваши файлы данных содержат дату и время, включенные в их имена, вы можете использовать «секретную» переменную $PATH в запросе Athena. Затем вы можете использовать regexp_extract и функции datetime для создания столбцов, которые затем можно использовать в запросе CTAS для разделения ваших данных.

Вот структура фиктивных файлов в моем S3:

s3://main-bucket/questions=59541533
├── folder-1
|    ├── file1-2019-01-01.json
|    ├── file1-2019-01-02.json
|    ├── file1-2019-02-01.json
|    └── file1-2019-02-02.json
├── folder-2
|    ├── file1-2019-01-01.json
|    ├── file1-2019-01-02.json
|    ├── file1-2019-02-01.json
|    └──  file1-2019-02-02.json

Затем я определил таблицу, которая указывает на folder-1:

CREATE EXTERNAL TABLE `stackoverflow`.`question_59541533_v1`(
  `foo` int,
  `bar` int)
ROW FORMAT SERDE
  'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'paths'='row,uf')
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://main-bucket/questions=59541533/folder-1'
TBLPROPERTIES (
  'classification'='json',
  'compressionType'='none'
  'typeOfData'='file')

Вот как может выглядеть запрос, где я извлекаю дату из имени файла с довольно наивным regexp_extract < / a> поскольку имена файлов детерминированы.

SELECT
    YEAR("file_date") as "year",
    MONTH("file_date") as "month",  
    DAY("file_date") as "day"
FROM(
    SELECT
        DATE(regexp_extract("$PATH", '(.)*/(.)*([0-9]{4}-[0-9]{2}-[0-9]{2}).json', 3)) as "file_date",
    FROM
        "stackoverflow"."question_59541533_v1"
)
ORDER BY month, day

Результат:

year  | month | day
---------------------
2019  | 1     | 1
2019  | 1     | 2
2019  | 2     | 1
2019  | 2     | 2

Теперь вы можете преобразовать данные и разделить их за один раз с помощью запроса CTAS (не забудьте выбрать все исходные данные).

CREATE TABLE partitioned_and_in_parquet
WITH (
    format = 'PARQUET',
    external_location = 's3://new_s3_location/',
    partitioned_by = ARRAY['year', 'month', 'day']
) AS (
    SELECT
        * , -- select existing data
        YEAR("file_date") as "year",
        MONTH("file_date") as "month",  
        DAY("file_date") as "day"
    FROM(
        SELECT
            * ,  -- select existing data
            DATE(regexp_extract("$PATH", '(.)*/(.)*([0-9]{4}-[0-9]{2}-[0-9]{2}).json', 3)) as "file_date",
        FROM
            "stackoverflow"."question_59541533_v1"
    )    
)

Преимущество этого подхода в том, что вы разделяете данные и конвертируете их в паркет за один раз. Кроме того, вы должны заранее знать, сколько с вас будет взиматься плата, поскольку цены на Athena зависят от объема сканируемых данных, то есть 1 ТБ = 5 $.

Одним из основных недостатков этого подхода является то, что запрос CTAS имеет ограничение на запись только 100 разделов за раз. Поэтому, если у вас есть более 3 месяцев запроса данных, запрос завершится неудачно, если вы не добавите предложение WHERE, чтобы преодолеть это. Вы можете $PATH в пункте WHERE. Я помню, что я тестировал его некоторое время назад, и даже когда файлы находились в одной «папке», Афина не сканировала содержимое файлов, следовательно, с вас не будет взиматься многократная оплата. Но я предлагаю это проверить. Как вариант, просто остановитесь на месячном уровне.

Еще одна вещь, которую вам необходимо учитывать, - это количество выходных файлов запроса CTAS и их размеры. Как правило, вы можете использовать сегментирование, т.е. bucketed_by = ARRAY['some_column'], bucket_count = 3, чтобы управлять им, но это может увеличить время выполнения.

В любом случае, есть много вещей, с которыми можно поиграть, если вы решите сделать это с Афиной.

person Ilya Kisil    schedule 01.01.2020