Ошибка записи кадра данных в avro в искре: NoSuchElementException

Я пытаюсь сохранить фрейм данных в файле avro в spark, но если не получается только с одним фреймом данных. Код кажется правильным, потому что он отлично работает с другими фреймами данных и даже с подмножеством одного и того же фрейма данных:

def write_df_avro(df, outputFolder, outputFile):
    outputFile = os.path.join(outputFolder, outputFile)
    df_coal = df
    df_coal.registerTempTable('table')
    df_coal.write.format("com.databricks.spark.avro").mode('overwrite').save(outputFile)

Я получаю сообщение об ошибке записи строк из-за java.util.NoSuchElementException: next для пустого итератора

Есть идеи?

Это вывод стека:

  WARN TaskSetManager: Lost task 48.0 in stage 18.0 (TID 605, 192.168.0.1): org.apache.spark.SparkException: Task failed while writing rows.
          at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
          at org.apache.spark.scheduler.Task.run(Task.scala:88)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
  Caused by: java.util.NoSuchElementException: next on empty iterator
          at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
          at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
          at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
          at com.databricks.spark.avro.AvroOutputWriter$$anonfun$com$databricks$spark$avro$AvroOutputWriter$$createConverterToAvro$7.apply(AvroOutputWriter.scala:141)
          at com.databricks.spark.avro.AvroOutputWriter.write(AvroOutputWriter.scala:70)
          at org.apache.spark.sql.sources.OutputWriter.writeInternal(interfaces.scala:380)
          at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:242)
          ... 8 more

  16/10/13 18:08:29 ERROR TaskSetManager: Task 48 in stage 18.0 failed 4 times; aborting job
  16/10/13 18:08:29 ERROR InsertIntoHadoopFsRelation: Aborting job.
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 48 in stage 18.0 failed 4 times, most recent failure: Lost task 48.3 in stage 18.0 (TID 671, 192.168.0.3): org.apache.spark.SparkException: Task failed while writing rows.
          at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
          at org.apache.spark.scheduler.Task.run(Task.scala:88)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
  Caused by: java.util.NoSuchElementException: next on empty iterator
          at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
          at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
          at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
          at com.databricks.spark.avro.AvroOutputWriter$$anonfun$com$databricks$spark$avro$AvroOutputWriter$$createConverterToAvro$7.apply(AvroOutputWriter.scala:141)
          at com.databricks.spark.avro.AvroOutputWriter.write(AvroOutputWriter.scala:70)
          at org.apache.spark.sql.sources.OutputWriter.writeInternal(interfaces.scala:380)
          at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:242)
          ... 8 more

  Driver stacktrace:
          at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1282)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1281)
          at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
          at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
          at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1281)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
          at scala.Option.foreach(Option.scala:236)
          at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1507)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1469)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
          at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
          at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
          at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
          at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
          at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
          at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
          at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
          at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
          at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
          at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
          at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
          at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
          at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
          at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
          at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:497)
          at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
          at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
          at py4j.Gateway.invoke(Gateway.java:259)
          at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
          at py4j.commands.CallCommand.execute(CallCommand.java:79)
          at py4j.GatewayConnection.run(GatewayConnection.java:207)
          at java.lang.Thread.run(Thread.java:745)
  Caused by: org.apache.spark.SparkException: Task failed while writing rows.
          at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
          at org.apache.spark.scheduler.Task.run(Task.scala:88)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          ... 1 more
  Caused by: java.util.NoSuchElementException: next on empty iterator
          at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
          at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
          at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
          at com.databricks.spark.avro.AvroOutputWriter$$anonfun$com$databricks$spark$avro$AvroOutputWriter$$createConverterToAvro$7.apply(AvroOutputWriter.scala:141)
          at com.databricks.spark.avro.AvroOutputWriter.write(AvroOutputWriter.scala:70)
          at org.apache.spark.sql.sources.OutputWriter.writeInternal(interfaces.scala:380)
          at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:242)
          ... 8 more
  16/10/13 18:08:29 WARN TaskSetManager: Lost task 100.0 in stage 18.0 (TID 657, 192.168.0.3): TaskKilled (killed intentionally)
  16/10/13 18:08:29 ERROR DefaultWriterContainer: Job job_201610131808_0000 aborted.
  Traceback (most recent call last):
    File "/home/xxx/xxx/avro_to_df.py", line 194, in <module>
      main()
    File "/home/xxx/xxx/avro_to_df.py", line 172, in main
      write_df_avro(new_df, tempfile.gettempdir() , 'sacame_de_aquiiii' )
    File "/home/xxx/xxx/avro_to_df.py", line 92, in write_df_avro
      df_coal.write.format("com.databricks.spark.avro").mode('overwrite').save(outputFile)
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 332, in save
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
      w.close()
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
  py4j.protocol.Py4JJavaError: An error occurred while calling o264.save.
  : org.apache.spark.SparkException: Job aborted.
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
          at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
          at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
          at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
          at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
          at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
          at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
          at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
          at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
          at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
          at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
          at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
          at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
          at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:497)
          at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
          at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
          at py4j.Gateway.invoke(Gateway.java:259)
          at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
          at py4j.commands.CallCommand.execute(CallCommand.java:79)
          at py4j.GatewayConnection.run(GatewayConnection.java:207)
          at java.lang.Thread.run(Thread.java:745)
  Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 48 in stage 18.0 failed 4 times, most recent failure: Lost task 48.3 in stage 18.0 (TID 671, 192.168.0.3): org.apache.spark.SparkException: Task failed while writing rows.
          at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
          at org.apache.spark.scheduler.Task.run(Task.scala:88)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
  Caused by: java.util.NoSuchElementException: next on empty iterator
          at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
          at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
          at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
          at com.databricks.spark.avro.AvroOutputWriter$$anonfun$com$databricks$spark$avro$AvroOutputWriter$$createConverterToAvro$7.apply(AvroOutputWriter.scala:141)
          at com.databricks.spark.avro.AvroOutputWriter.write(AvroOutputWriter.scala:70)
          at org.apache.spark.sql.sources.OutputWriter.writeInternal(interfaces.scala:380)
          at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:242)
          ... 8 more

  Driver stacktrace:
          at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1282)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1281)
          at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
          at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
          at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1281)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
          at scala.Option.foreach(Option.scala:236)
          at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1507)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1469)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
          at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
          at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
          ... 27 more
  Caused by: org.apache.spark.SparkException: Task failed while writing rows.
          at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
          at org.apache.spark.scheduler.Task.run(Task.scala:88)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          ... 1 more
  Caused by: java.util.NoSuchElementException: next on empty iterator
          at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
          at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
          at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
          at com.databricks.spark.avro.AvroOutputWriter$$anonfun$com$databricks$spark$avro$AvroOutputWriter$$createConverterToAvro$7.apply(AvroOutputWriter.scala:141)
          at com.databricks.spark.avro.AvroOutputWriter.write(AvroOutputWriter.scala:70)
          at org.apache.spark.sql.sources.OutputWriter.writeInternal(interfaces.scala:380)
          at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:242)
          ... 8 more

person Daniel Argüelles    schedule 13.10.2016    source источник
comment
Можете ли вы опубликовать копию схемы, которую вы пытаетесь сохранить (используйте df.printSchema() ). Ошибка возникает при преобразовании ваших типов данных DF во внутренние типы avro, которые будут записаны.   -  person Ryan Widmaier    schedule 13.10.2016
comment
Спасибо @RyanW за вашу помощь. Схема была простой, только двойники и строки.   -  person Daniel Argüelles    schedule 21.10.2016


Ответы (1)


Я нашел проблему. Данные были неправильными, и были строки с меньшим количеством столбцов, чем весь набор данных.

person Daniel Argüelles    schedule 21.10.2016