Ни один из элементов в PCollection не влияет на выделенных рабочих

У меня есть конвейер, состоящий из трех этапов. На первом этапе это ParDo, который принимает 5 URL-адресов в коллекции PCollection. И каждый из 5 элементов генерирует тысячи URL-адресов и выводит их. Таким образом, ввод второго шага - это еще одна коллекция PCollection, размер которой может составлять 100-400 КБ. На последнем этапе извлеченные данные каждого URL-адреса сохраняются в службе хранилища.

Я заметил, что первый шаг, который генерирует список URL-адресов из 5 входных URL-адресов, выделил 5 рабочих и генерирует новый набор URL-адресов. Но после завершения первого шага количество рабочих не уменьшается и достигает 1. А пока выполняется второй этап, он выполняется только в 1 рабочем (с одним рабочим мой поток данных выполняется в течение последних 2 дней. Поэтому, просматривая журналы, я делаю логическое предположение, что первый шаг выполнен).

Итак, мой вопрос: несмотря на то, что размер коллекции PCollection большой, почему он не разделен между рабочими или почему не выделяется больше рабочих? Шаг 2 - это простой веб-скребок, который очищает заданный URL и выводит строку. Которая затем сохраняется в хранилище.


person ted    schedule 05.11.2020    source источник


Ответы (1)


Dataflow пытается соединить шаги вместе, чтобы создать объединенные шаги. Таким образом, даже если у вас есть несколько ParDo в конвейере, они будут объединены вместе и будут выполняться как один шаг.

Также после объединения масштабирование потока данных ограничивается шагом в начале объединенного шага.

Я подозреваю, что у вас есть Create преобразование, состоящее из нескольких элементов в верхней части вашего конвейера. В этом случае поток данных может масштабироваться только до количества элементов в этом Create преобразовании.

Один из способов предотвратить такое поведение - это разрыв слияния после одного (или нескольких) преобразований ParDo с большим разветвлением. Это можно сделать, добавив Reshuffle.viaRandomKey () преобразовать после него (который содержит GroupByKey). Учитывая, что Reshuffle является преобразованием идентичности, ваш конвейер не требует дополнительных изменений.

Дополнительную информацию о слиянии и способы предотвратить это.

person chamikara    schedule 09.11.2020