Spark (2.3.1) Ошибка преобразования памяти LDA (java.lang.OutOfMemoryError в java.io.ByteArrayOutputStream.hugeCapacity (ByteArrayOutputStream.java:123)

Я обучил LDA (размерность: 100, итерация: 100, распределенная версия, мл) с помощью Spark 2.3.2. После этого я хочу преобразовать новый DataSet, используя эту модель. Но когда я трансформирую новые данные, я всегда получаю ошибку памяти, связанную с ошибкой. Я изменил размер данных с x 0,1 на x 0,01. Но всегда получаю ошибку памяти (java.lang.OutOfMemoryError в java.io.ByteArrayOutputStream.hugeCapacity (ByteArrayOutputStream.java:123)

Эта ошибка hugeCapacity (переполнение) происходит, когда размер массива превышает Integer.MAX_VALUE - 8. Но я изменил размер данных на маленький. Я не могу понять, почему произошла эта ошибка.

Если у вас есть какие-либо идеи по этому поводу, приветствую все.

Ниже код

val countvModel = CountVectorizerModel.load("s3://~/")
val ldaModel = DistributedLDAModel.load("s3://~/")
val transformeddata=countvModel.transform(inputData).select("productid", "itemid", "ptkString", "features")
var featureldaDF = ldaModel.transform(transformeddata).select("productid", "itemid", "topicDistribution", "ptkString").toDF("productid", "itemid", "features", "ptkString")
featureldaDF=featureldaDF.persist //this is 328 line

Ниже моя среда

DataSet

  1. Документ: около 100000000 -> 10000000 -> 1000000 (все не пройдены)

  2. Слово: около 3553918 (изменить нельзя)

Искровая среда

  1. память-исполнитель, память-драйвер: 18 ГБ -> 32 г -> 64 -> 128 г (все терпят неудачу)

  2. ядро исполнителя, ядро ​​драйвера: 3

  3. spark.serializer: по умолчанию и org.apache.spark.serializer.KryoSerializer (оба не работают)

  4. spark.executor.memoryOverhead: 18 ГБ -> 36 ГБ сбой

Версия Jave: 1.8.0_191 (Oracle Corporation)

Другое тестирование

  1. Вариант Java: UseParallelGC, UseG1GC (все терпят неудачу)

Ниже журнал

19/03/05 20:59:03 ERROR ApplicationMaster: User class threw exception: java.lang.OutOfMemoryError
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryRelation.scala:107)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryRelation.scala:102)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryRelation.scala:43)
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:97)
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:67)
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:91)
    at org.apache.spark.sql.Dataset.persist(Dataset.scala:2907)
    at coupang.cs.predictforxgboost.App$.main(App.scala:328)
    at coupang.cs.predictforxgboost.App.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)

Для @Ilya Brodezki

Это ошибка сохранения.

java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SortExec.doExecute(SortExec.scala:101)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at org.apache.spark.sql.DataFrameWriter.orc(DataFrameWriter.scala:572)
at coupang.cs.predictforxgboost.App$.main(App.scala:361)
at coupang.cs.predictforxgboost.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)

person Tae-suk Kim    schedule 05.03.2019    source источник


Ответы (1)


Существует множество конфигураций Hadoop, которые могут контролировать или помогать с проблемами ООО, но я думаю, что в вашем случае вы можете просто установить sqlDF.persist (StorageLevel.MEMORY_AND_DISK_SER), чтобы, когда вы выполняете постоянное действие, а не пытаетесь сохранить все в памяти, он будет проливать дополнительные данные на диск. есть много разных флагов для сохранения, которые вы также можете использовать: sqlDF.persist (StorageLevel.MEMORY_AND_DISK), который сохраняет данные несериализованными

person Ilya Brodezki    schedule 05.03.2019
comment
Этот OOM не имеет отношения к сохранению. Если я использую какое-то действие Dataframe, это OOM всегда происходит (например, показать или сохранить). Но я опробую ваш вариант. - person Tae-suk Kim; 06.03.2019
comment
Сообщите мне, поможет ли это, а если нет, добавьте ошибку для сбоя, который вы получили при показе / сохранении - person Ilya Brodezki; 06.03.2019
comment
Добавляю еще журнал при сохранении фрейма данных - person Tae-suk Kim; 07.03.2019
comment
Привет! Удалось ли вам это решить? Я получаю точно такую ​​же ошибку, когда делаю PCA для функций 40.107. У моего хозяина 235 г, а у трех исполнителей по 115 г! - person Des0lat0r; 22.11.2020