Луч Apache: программное создание секционированных таблиц

Я пишу облачный поток данных, который считывает сообщения из Pubsub и сохраняет их в BigQuery. Я хочу использовать секционированную таблицу (по дате), и я использую Timestamp, связанный с сообщением, чтобы определить, в какой раздел следует поместить сообщение. Ниже мой код:

      BigQueryIO.writeTableRows()
        .to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
            private static final long serialVersionUID = 1L;

            @Override
              public TableDestination apply(ValueInSingleWindow<TableRow> value) {
                log.info("Row value : {}", value.getValue());
                Instant timestamp = value.getTimestamp();
                String partition = DateTimeFormat.forPattern("yyyyMMdd").print(timestamp);
                TableDestination td = new TableDestination(
                    "<project>:<dataset>.<table>" + "$" + partition, null);
                log.info("Table Destination : {}", td);
                return td;
              }
          })            
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)         
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
    .withSchema(tableSchema);

Когда я развертываю поток данных, я вижу операторы журнала в Stackdriver, однако сообщения не вставляются в таблицы BigQuery, и я получаю следующую ошибку:

Request failed with code 400, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables
severity:  "WARNING"  

Итак, похоже, что он не может создать таблицу, что привело к ошибке вставки. Нужно ли мне изменить определение потока данных, чтобы это работало? Если нет, есть ли другой способ программно создать секционированные таблицы?

Я использую луч Apache 2.0.0.


person Darshan Mehta    schedule 31.10.2017    source источник


Ответы (1)


Это была ошибка в BigQueryIO, которая была исправлена ​​в Beam 2.2. Вы можете использовать версию Beam со снимками или дождаться завершения выпуска 2.2 (процесс выпуска в настоящее время находится в процессе).

person jkff    schedule 31.10.2017
comment
Спасибо за обновления. Пара дополнительных вопросов (извините): 1. Есть ли другой способ создания таблиц в конвейере потока данных (кроме использования Apache Beam) и 2. Когда мы ожидаем выхода луча 2.2? - person Darshan Mehta; 31.10.2017
comment
Вы можете создавать таблицы, используя API BigQuery напрямую cloud.google.com/bigquery/docs/ справочники / библиотеки. Выпуск Beam - это процесс сообщества Apache, поэтому жестких гарантий быть не может, но, по-видимому, это произойдет в ближайшие недели или две, я думаю. Вы можете следить за потоком thread.html / - person jkff; 01.11.2017
comment
Спасибо за ваш вклад @jkff. - person Darshan Mehta; 02.11.2017
comment
Обновил библиотеку до 2.2.0. Теперь с тем же кодом мы получаем следующую ошибку: Cannot read partition information from a table that is not partitioned:. Итак, похоже, создается таблица без раздела? - person Darshan Mehta; 19.01.2018
comment
Не могли бы вы поделиться полной трассировкой стека ошибки и идентификатором задания потока данных? - person jkff; 20.01.2018