Пиша облачен поток от данни, който чете съобщения от Pubsub и ги съхранява в BigQuery. Искам да използвам разделена таблица (по дата) и използвам Timestamp
, свързан със съобщението, за да определя в кой дял трябва да влезе съобщението. По-долу е моят код:
BigQueryIO.writeTableRows()
.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
private static final long serialVersionUID = 1L;
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
log.info("Row value : {}", value.getValue());
Instant timestamp = value.getTimestamp();
String partition = DateTimeFormat.forPattern("yyyyMMdd").print(timestamp);
TableDestination td = new TableDestination(
"<project>:<dataset>.<table>" + "$" + partition, null);
log.info("Table Destination : {}", td);
return td;
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchema(tableSchema);
Когато разположа потока от данни, мога да видя отчетите за регистрационни файлове в Stackdriver, но съобщенията не се вмъкват в таблиците на BigQuery и получавам следната грешка:
Request failed with code 400, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables
severity: "WARNING"
Така че изглежда, че не е в състояние да създаде таблица, което води до грешка при вмъкване. Трябва ли да променя дефиницията на потока от данни, за да може това да работи? Ако не, има ли друг начин за програмно създаване на разделените таблици?
Използвам Apache beam 2.0.0.