Можем ли да пишем avro файлове в динамично създадени GCS кофи (базирани на tenantID)?

По принцип това, което се опитвам да направя, е да създам GCS кофа въз основа на tenantID (идва като част от събитието) и да запиша тези събития с помощта на FileIO.writeDynamic, използвайки динамично именуване на файлове в задание на google dataflow.

Проблемът, пред който съм изправен е

srcEvents.apply("Window", Window
                        .<MyEvent>into(FixedWindows.of(Duration.standardSeconds(60))))
                        .apply("WriteAvro", FileIO.<MyEventDestination, MyEvent>writeDynamic()
                                        .by(groupFn).via(outputFn, sinkFn)
                                        **.to()** // what to pass as here as i want it to be based on event.getTenantId (gs://test-123)
                                        .withDestinationCoder(destinationCoder)
                                        .withNumShards(100).withNaming(namingFn));

Създавам gcs контейнер преди горното, като извиквам PTranform of srcEvents


person nocturnal    schedule 19.05.2020    source източник
comment
Вътре в метода to() трябва да напишете папката/кофата, в която искате да запазите вашите файлове. Тук това е документацията за writeDynamic(), можете да намерите примери за използване. Това ли търсите?   -  person Alexandre Moraes    schedule 20.05.2020
comment
ДА, наясно съм, че трябва да дадем пътя на директорията на кофата GCS.. Въпросът ми е как да генерирам името на кофата динамично с помощта на tenantId?   -  person nocturnal    schedule 20.05.2020
comment
Мисля, че методът .to() работи само в една статична директория, дефинирана по време на изграждането на тръбопровода, така че не работи за този случай на употреба. Но може би бихте могли да опитате да добавите префикс към имената на файловете с желаната от вас структура на кофа+директория, тъй като това се прави за всеки елемент. (Вижте тази статия: medium.com/@imrenagi/). Ако това работи, това трябва да е отговорът.   -  person Daniel Oliveira    schedule 21.05.2020


Отговори (1)


Успях да разреша това, като използвах опцията withTempDirectory, където предоставих временен път на кофата gcs и използването на именуването на файлове за изграждане на динамичен път на кофа за домейн

srcEvents.apply("Window", Window .<MyEvent>into(FixedWindows.of(Duration.standardSeconds(60)))) .apply("WriteAvro", FileIO.<MyEventDestination, MyEvent>writeDynamic() .by(groupFn).via(outputFn, sinkFn) .withTempDirectory("gs://temp-blah/") .withDestinationCoder(destinationCoder) .withNumShards(100).withNaming(namingFn)); namingFn to build filename gs://domain-123/2020-05-01/event.avro

person nocturnal    schedule 03.06.2020