Можем ли мы записать файлы avro в динамически создаваемые корзины GCS (на основе tenantID)?

В основном то, что я пытаюсь сделать, это создать корзину GCS на основе tenantID (является частью события) и записать эти события с помощью FileIO.writeDynamic с использованием динамического именования файлов в задании потока данных Google.

Проблема, с которой я столкнулся,

srcEvents.apply("Window", Window
                        .<MyEvent>into(FixedWindows.of(Duration.standardSeconds(60))))
                        .apply("WriteAvro", FileIO.<MyEventDestination, MyEvent>writeDynamic()
                                        .by(groupFn).via(outputFn, sinkFn)
                                        **.to()** // what to pass as here as i want it to be based on event.getTenantId (gs://test-123)
                                        .withDestinationCoder(destinationCoder)
                                        .withNumShards(100).withNaming(namingFn));

Я создаю ведро gcs до того, как описано выше, вызывая PTranform of srcEvents


person nocturnal    schedule 19.05.2020    source источник
comment
Внутри метода to() вы должны написать папку/корзину, в которую вы хотите сохранить свои файлы. Здесь это документация для writeDynamic(), вы можете найти примеры использования. Это то, что вы ищете?   -  person Alexandre Moraes    schedule 20.05.2020
comment
ДА, я знаю, что нам нужно указать путь к каталогу корзины GCS. Мой вопрос заключается в том, как я могу генерировать имя корзины динамически, используя tenantId?   -  person nocturnal    schedule 20.05.2020
comment
Я думаю, что метод .to() работает только с одним статическим каталогом, определенным во время создания конвейера, поэтому он не работает для этого варианта использования. Но, возможно, вы могли бы попробовать префикс имен файлов с желаемой структурой ведра + каталога, поскольку это делается для каждого элемента. (См. эту статью: medium.com/@imrenagi/). Если это сработает, это должен быть ответ.   -  person Daniel Oliveira    schedule 21.05.2020


Ответы (1)


Я смог решить эту проблему, используя опцию withTempDirectory, где я указал путь к временной корзине gcs и использовал именование файлов для создания динамического пути корзины для каждого домена.

srcEvents.apply("Window", Window .<MyEvent>into(FixedWindows.of(Duration.standardSeconds(60)))) .apply("WriteAvro", FileIO.<MyEventDestination, MyEvent>writeDynamic() .by(groupFn).via(outputFn, sinkFn) .withTempDirectory("gs://temp-blah/") .withDestinationCoder(destinationCoder) .withNumShards(100).withNaming(namingFn)); namingFn to build filename gs://domain-123/2020-05-01/event.avro

person nocturnal    schedule 03.06.2020