Как запускать параллельные потоки в AWS Glue PySpark?

У меня есть искровая работа, которая просто извлекает данные из нескольких таблиц с одинаковыми преобразованиями. По сути, цикл for, который выполняет итерацию по списку таблиц, запрашивает таблицу каталога, добавляет метку времени, а затем вставляет в Redshift (пример ниже).

На выполнение этой работы уйдет около 30 минут. Есть ли способ запустить их параллельно в одном контексте искры / клея? Я не хочу создавать отдельные работы с клеем, если я могу этого избежать.

import datetime
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import *


# query the runtime arguments
args = getResolvedOptions(
    sys.argv,
    ["JOB_NAME", "redshift_catalog_connection", "target_database", "target_schema"],
)

# build the job session and context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# set the job execution timestamp
job_execution_timestamp = datetime.datetime.utcnow()

tables = []

for table in tables:
    catalog_table = glueContext.create_dynamic_frame.from_catalog(
        database="test", table_name=table, transformation_ctx=table
    )
    data_set = catalog_table.toDF().withColumn(
        "batchLoadTimestamp", lit(job_execution_timestamp)
    )

    # covert back to glue dynamic frame
    export_frame = DynamicFrame.fromDF(data_set, glueContext, "export_frame")

    # remove null rows from dynamic frame
    non_null_records = DropNullFields.apply(
        frame=export_frame, transformation_ctx="non_null_records"
    )

    temp_dir = os.path.join(args["TempDir"], redshift_table_name)

    stores_redshiftSink = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame=non_null_records,
        catalog_connection=args["redshift_catalog_connection"],
        connection_options={
            "dbtable": f"{args['target_schema']}.{redshift_table_name}",
            "database": args["target_database"],
            "preactions": f"truncate table {args['target_schema']}.{redshift_table_name};",
        },
        redshift_tmp_dir=temp_dir,
        transformation_ctx="stores_redshiftSink",
    ) ```

person sewardth    schedule 03.07.2020    source источник


Ответы (1)


Вы можете сделать следующее, чтобы ускорить этот процесс.

  1. Разрешить одновременное выполнение задания.
  2. Выделите достаточное количество DPU.
  3. Передайте список таблиц в качестве параметра
  4. Выполняйте задание параллельно, используя рабочие процессы Glue или пошаговые функции.

Теперь предположим, что у вас есть 100 таблиц для приема, вы можете разделить список на 10 таблиц в каждой и запустить задание одновременно 10 раз.

Поскольку ваши данные будут загружаться параллельно, время выполнения задания Glue будет уменьшено, что приведет к меньшим затратам.

Альтернативный подход, который будет намного быстрее, - это прямое использование утилиты красного смещения.

  1. Создайте таблицу с красным смещением и оставьте столбец batchLoadTimestamp по умолчанию для current_timestamp.
  2. Теперь создайте команду копирования и загрузите данные в таблицу прямо из s3.
  3. Запустите команду копирования, используя задание оболочки Glue Python с использованием pg8000.

Почему этот подход будет быстрее ?? Поскольку соединитель jdbc с красным смещением искры сначала выгружает фрейм данных искры в s3, а затем подготавливает команду копирования в таблицу красного смещения. И при запуске команды копирования напрямую вы удаляете накладные расходы на выполнение команды выгрузки, а также считываете данные в искру df.

person Shubham Jain    schedule 04.07.2020
comment
Оба подхода очень хорошо сработали для моего варианта использования. Спасибо! - person sewardth; 05.07.2020