Я пишу облачный поток данных, который считывает сообщения из Pubsub и сохраняет их в BigQuery. Я хочу использовать секционированную таблицу (по дате), и я использую Timestamp
, связанный с сообщением, чтобы определить, в какой раздел следует поместить сообщение. Ниже мой код:
BigQueryIO.writeTableRows()
.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
private static final long serialVersionUID = 1L;
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
log.info("Row value : {}", value.getValue());
Instant timestamp = value.getTimestamp();
String partition = DateTimeFormat.forPattern("yyyyMMdd").print(timestamp);
TableDestination td = new TableDestination(
"<project>:<dataset>.<table>" + "$" + partition, null);
log.info("Table Destination : {}", td);
return td;
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchema(tableSchema);
Когда я развертываю поток данных, я вижу операторы журнала в Stackdriver, однако сообщения не вставляются в таблицы BigQuery, и я получаю следующую ошибку:
Request failed with code 400, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables
severity: "WARNING"
Итак, похоже, что он не может создать таблицу, что привело к ошибке вставки. Нужно ли мне изменить определение потока данных, чтобы это работало? Если нет, есть ли другой способ программно создать секционированные таблицы?
Я использую луч Apache 2.0.0.