В основном то, что я пытаюсь сделать, это создать корзину GCS на основе tenantID (является частью события) и записать эти события с помощью FileIO.writeDynamic с использованием динамического именования файлов в задании потока данных Google.
Проблема, с которой я столкнулся,
srcEvents.apply("Window", Window
.<MyEvent>into(FixedWindows.of(Duration.standardSeconds(60))))
.apply("WriteAvro", FileIO.<MyEventDestination, MyEvent>writeDynamic()
.by(groupFn).via(outputFn, sinkFn)
**.to()** // what to pass as here as i want it to be based on event.getTenantId (gs://test-123)
.withDestinationCoder(destinationCoder)
.withNumShards(100).withNaming(namingFn));
Я создаю ведро gcs до того, как описано выше, вызывая PTranform of srcEvents