У меня есть поток, который выполняет искровые задания на кластерах Dataproc параллельно для разных зон. Для каждой зоны он создает кластер, выполняет искровое задание и удаляет кластер после его завершения.
В задании Spark используется метод org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset
, передающий BigQuery Configuration для сохранения данных в таблице BigQuery. Задание сохраняет данные в нескольких таблицах, вызывая метод saveAsNewAPIHadoopDataset более одного раза для каждого задания.
Проблема в том, что иногда я получаю сообщение об ошибке, вызванное конфликтом во временном наборе данных BigQuery Hadoop, который он внутренне создает для выполнения заданий:
Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 409 Conflict
{
"code" : 409,
"errors" : [ {
"domain" : "global",
"message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013",
"reason" : "duplicate"
} ],
"message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013"
}
at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.io.bigquery.BigQueryOutputCommitter.setupJob(BigQueryOutputCommitter.java:107)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1150)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1078)
at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:819)
...
Временная метка 201802250620_0013 в приведенном выше исключении имеет суфикс _0013, который, я не уверен, представляет время.
Я думаю, что иногда задания выполняются одновременно и пытаются создать набор данных с той же временной меткой в имени. Либо в параллельном задании, либо внутри того же задания при другом вызове saveAsNewAPIHadoopDataset.
Как избежать этой ошибки, не задерживая выполнение задания?
Я использую следующую зависимость:
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>bigquery-connector</artifactId>
<version>0.10.2-hadoop2</version>
<scope>provided</scope>
</dependency>
Версия образа Dataproc - 1.1.
Изменить 1:
Я пробовал использовать IndirectBigQueryOutputFormat
, но теперь я получаю сообщение об ошибке, говорящее, что выходной путь gcs уже существует, даже передавая его в разное время при каждом вызове saveAsNewAPIHadoopDataset
.
Вот мой код: SparkConf sc = new SparkConf (). SetAppName ("MyApp");
try (JavaSparkContext jsc = new JavaSparkContext(sc)) {
JavaPairRDD<String, String> filesJson = jsc.wholeTextFiles(jsonFolder, parts);
JavaPairRDD<String, String> jsons = filesJson.flatMapToPair(new FileSplitter()).repartition(parts);
JavaPairRDD<Object, JsonObject> objsJson = jsons.flatMapToPair(new JsonParser()).filter(t -> t._2() != null).cache();
objsJson
.filter(new FilterType(MSG_TYPE1))
.saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE1", "gs://my-bucket/tmp1"));
objsJson
.filter(new FilterType(MSG_TYPE2))
.saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE2", "gs://my-bucket/tmp2"));
objsJson
.filter(new FilterType(MSG_TYPE3))
.saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE3", "gs://my-bucket/tmp3"));
// here goes another ingestion process. same code as above but diferrent params, parsers, etc.
}
Configuration createConf(String table, String outGCS) {
Configuration conf = new Configuration();
BigQueryOutputConfiguration.configure(conf, table, null, outGCS, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class);
conf.set("mapreduce.job.outputformat.class", IndirectBigQueryOutputFormat.class.getName());
return conf;
}