Конфликт данных во временных таблицах hadoop

У меня есть поток, который выполняет искровые задания на кластерах Dataproc параллельно для разных зон. Для каждой зоны он создает кластер, выполняет искровое задание и удаляет кластер после его завершения.

В задании Spark используется метод org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset, передающий BigQuery Configuration для сохранения данных в таблице BigQuery. Задание сохраняет данные в нескольких таблицах, вызывая метод saveAsNewAPIHadoopDataset более одного раза для каждого задания.

Проблема в том, что иногда я получаю сообщение об ошибке, вызванное конфликтом во временном наборе данных BigQuery Hadoop, который он внутренне создает для выполнения заданий:

Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 409 Conflict
{
 "code" : 409,
 "errors" : [ {
   "domain" : "global",
   "message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013",
   "reason" : "duplicate"
 } ],
 "message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013"
}
    at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.io.bigquery.BigQueryOutputCommitter.setupJob(BigQueryOutputCommitter.java:107)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1150)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1078)
    at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:819)
    ...

Временная метка 201802250620_0013 в приведенном выше исключении имеет суфикс _0013, который, я не уверен, представляет время.

Я думаю, что иногда задания выполняются одновременно и пытаются создать набор данных с той же временной меткой в ​​имени. Либо в параллельном задании, либо внутри того же задания при другом вызове saveAsNewAPIHadoopDataset.

Как избежать этой ошибки, не задерживая выполнение задания?

Я использую следующую зависимость:

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>bigquery-connector</artifactId>
    <version>0.10.2-hadoop2</version>
    <scope>provided</scope>
</dependency>

Версия образа Dataproc - 1.1.

Изменить 1:

Я пробовал использовать IndirectBigQueryOutputFormat, но теперь я получаю сообщение об ошибке, говорящее, что выходной путь gcs уже существует, даже передавая его в разное время при каждом вызове saveAsNewAPIHadoopDataset.

Вот мой код: SparkConf sc = new SparkConf (). SetAppName ("MyApp");

try (JavaSparkContext jsc = new JavaSparkContext(sc)) {
    JavaPairRDD<String, String> filesJson = jsc.wholeTextFiles(jsonFolder, parts);
    JavaPairRDD<String, String> jsons = filesJson.flatMapToPair(new FileSplitter()).repartition(parts);
    JavaPairRDD<Object, JsonObject> objsJson = jsons.flatMapToPair(new JsonParser()).filter(t -> t._2() != null).cache();

    objsJson
    .filter(new FilterType(MSG_TYPE1))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE1", "gs://my-bucket/tmp1"));

    objsJson
    .filter(new FilterType(MSG_TYPE2))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE2", "gs://my-bucket/tmp2"));

    objsJson
    .filter(new FilterType(MSG_TYPE3))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE3", "gs://my-bucket/tmp3"));

    // here goes another ingestion process. same code as above but diferrent params, parsers, etc.
}

Configuration createConf(String table, String outGCS) {
  Configuration conf = new Configuration();
  BigQueryOutputConfiguration.configure(conf, table, null, outGCS, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class);
  conf.set("mapreduce.job.outputformat.class", IndirectBigQueryOutputFormat.class.getName());
  return conf;
}

person Bruno    schedule 13.03.2018    source источник


Ответы (1)


Я считаю, что может происходить то, что каждый картограф пытается создать свой собственный набор данных. Это довольно неэффективно (и сжигает вашу дневную квоту пропорционально количеству картографов).

Альтернативой является использование IndirectBigQueryOutputFormat для класса вывода:

IndirectBigQueryOutputFormat сначала буферизует все данные во временную таблицу облачного хранилища, а затем, при commitJob, копирует все данные из облачного хранилища в BigQuery за одну операцию. Его использование рекомендуется для больших заданий, поскольку для каждого задания Hadoop / Spark требуется только одно задание «загрузки» BigQuery, по сравнению с BigQueryOutputFormat, который выполняет одно задание BigQuery для каждой задачи Hadoop / Spark.

См. Пример здесь: https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example.

person tix    schedule 14.03.2018
comment
Я попробовал, как в примере, но шесть раз вызвал saveAsNewAPIHadoopDataset, по одному для каждой целевой таблицы. Для каждого вызова я передаю другую конфигурацию с другой таблицей и параметром outputGcsPath, но теперь я получаю сообщение об ошибке, в котором говорится, что выходной путь уже существует, даже если он передается по-разному при каждом вызове. - person Bruno; 17.03.2018
comment
Я просто попробовал этот пример с двумя вызовами, записывающими последовательно в две разные таблицы / пути GCS, и он работал нормально. Можете ли вы поделиться кодом, который вы используете? Вы снова звоните BigQueryOutputConfiguration.configure? - person Guillem Xercavins; 19.03.2018