Сбой задания Spark с несколькими контекстами

В приложении spark мы создаем два контекста spark:

1) Для чтения данных из файловой системы.

2) Для подключения и загрузки данных в Cassandra.

В одном приложении мы можем запустить только один контекст искры искры, поэтому мы останавливаем 1-й, а затем запускаем второй.

Я получал следующую ошибку.

Error 1) 16/03/10 05:40:44 ERROR Utils: Uncaught exception in thread      Thread-2
java.io.IOException: Target log file already exists        (hdfs:///var/log/spark/apps/application_1457586850134_0001_2)
    at `org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:225)`

Это произошло потому, что в hadoop каталог/файл не обязательно должен присутствовать при выполнении задания (1-й контекст), поэтому для запуска 2-го контекста присутствовал каталог каталога/журнала журнала. Так была указанная выше ошибка.

Я решил проблему, установив spark.eventLog.overwrite=true


Error 2) WARN executor.CoarseGrainedExecutorBackend: An unknown (ip-10-93-141-13.ec2.internal:48849) driver disconnected. 16/03/10 06:47:37                                                            ERROR executor.CoarseGrainedExecutorBackend: Driver 10.93.141.13:48849disassociated! Shutting down.

Я попытался увеличить

spark.yarn.driver.memoryOverhead=1024

spark.yarn.executor.memoryOverhead=1024

Но проблема все еще существует.


Ошибка 3)

Exception in thread "main" java.io.IOException: Failed to connect to /10.93.141.13:46008
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
    at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:200)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:183)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Я зарегистрировался на основном узле, он не прослушивает порт 46008.


Ошибка 4 )

WARN YarnAllocator: Container marked as failed:              container_1457586850134_0006_01_000006 on host: ip-10-164-169-         46.ec2.internal. Exit status: 1. Diagnostics: Exception from container-   launch.
Container id: container_1457586850134_0006_01_000006
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
at org.apache.hadoop.util.Shell.run(Shell.java:456)
at   org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1

16/03/10 06:47:17 WARN ApplicationMaster: Reporter thread fails 1 time(s) in a row.
java.lang.IllegalStateException: RpcEnv already stopped.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:159)
    at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:131)
    at org.apache.spark.rpc.netty.NettyRpcEnv.send(NettyRpcEnv.scala:192)
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.send(NettyRpcEnv.scala:516)
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1$$anonfun$apply$7.apply(YarnAllocator.scala:531)
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1$$anonfun$apply$7.apply(YarnAllocator.scala:512)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1.apply(YarnAllocator.scala:512)
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1.apply(YarnAllocator.scala:442)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.spark.deploy.yarn.YarnAllocator.processCompletedContainers(YarnAllocator.scala:442)
    at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:242)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:368)

Вышеупомянутая ошибка возникает из-за того, что контейнер постоянно выходит из строя.


Похоже, что вышеупомянутая проблема с искрой возникла из-за нескольких контекстов, о которых говорилось в списке рассылки.

https://mail-archives.apache.org/mod_mbox/spark-issues/201602.mbox/%3CJIRA.12936774.1454603470000.316442.1454707659978@Atlassian.JIRA%3E

sparkContext.stop() не может освободить ресурсы.


Я использую следующие параметры

--class com.mobi.vserv.driver.Query5kPids                            
  --conf spark.eventLog.overwrite=true  
  --conf spark.yarn.executor.memoryOverhead=1024                       
  --conf spark.yarn.driver.memoryOverhead=1024                             
  --num-executors 4                
  --executor-memory 3g            
  --executor-cores 2                  
  --driver-memory 3g

Я работаю на EMR с ведущим и двумя подчиненными узлами, главный имеет 8 ядер и 16 ГБ памяти, а каждый подчиненный имеет 4 ядра по 5120 МБ доступной памяти.

Ниже мой код.

    public class Query5kPids implements Serializable{

    static List<UserSetGet> ListFromS3 = new ArrayList<UserSetGet>();

    public static void main(String[] args) throws JSONException, IOException, InterruptedException, URISyntaxException {


    SparkConf conf = new SparkConf();
    conf.setAppName("Spark-Cassandra Integration");
    conf.setMaster("yarn-cluster");
    conf.set("spark.cassandra.connection.host", "12.16.193.19");
    conf.set("spark.cassandra.connection.port", "9042");


    SparkConf conf1 = new SparkConf().setAppName("SparkAutomation").setMaster("yarn-cluster");

    Query5kPids app1 = new Query5kPids(conf1);
    app1.run1(file);

    Query5kPids app = new Query5kPids(conf);
    System.out.println("Both RDD has been generated");
    app.run();

   }

   private void run() throws JSONException, IOException, InterruptedException {

   JavaSparkContext sc = new JavaSparkContext(conf);
   query(sc);
   sc.stop();
   }

   private void run1(File file) throws JSONException, IOException,     InterruptedException {
   JavaSparkContext sc = new JavaSparkContext(conf);
   getData(sc,file);
   sc.stop();

  }

   private void getData(JavaSparkContext sc, File file) {

   JavaRDD<String> Data = sc.textFile(file.toString());
   System.out.println("RDD Count is " + Data.count());

   // Other map opetations to convert to UserSetGet RDD.
   ListFromS3 = Data.collect();

   }
   private void query(JavaSparkContext sc) {

   System.out.prin`enter code here`tln("RDD Count is " +  ListFromS3.size());
   //This gets printed. 
   //Which means it application is coming to the second part of the program.

   for (int i = 0; i < ListFromS3.size(); i++) {

   sb.append(ListFromS3.get(i).getApnid());
   sb.append(',');
   }
   sb.setLength(sb.length() - 3);

   JavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable("dmp", "user_profile_spark_test").select       ("app_day_count", "app_first_seen","app_last_seen", "app_usage_count", "total_day_count", "total_usage_count")
 .where("apnid IN ('" + sb + "')");

   if(cassandraRDD.isEmpty()){

   JavaRDD<UserSetGet> rddFromGz = sc.parallelize(ListFromS3);

   CassandraJavaUtil.javaFunctions(rddFromGz).writerBuilder("dmp", "user_profile_spark_test", mapToRow(UserSetGet.class)).saveToCassand();
        logger.info("DataSaved");
   }
   }

   }

Ниже мой ПОМ

   <dependencies>
   <dependency>
   <groupId>org.apache-extras.cassandra-jdbc</groupId>
   <artifactId>cassandra-jdbc</artifactId>
   <version>1.2.5</version>
   </dependency>

   <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>3.8.1</version>
   <scope>test</scope>
   </dependency>
   <dependency>
   <groupId>org.codehaus.jettison</groupId>
   <artifactId>jettison</artifactId>
   <version>1.3.7</version>
   </dependency>
   <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.17</version>
   </dependency>


   <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.10</artifactId>
   <version>1.6.0</version>
   </dependency>

   <dependency>
   <groupId>com.datastax.spark</groupId>
   <artifactId>spark-cassandra-connector_2.10</artifactId>
   <version>1.5.0-M1</version>
   </dependency>

   <dependency>
   <groupId>com.datastax.cassandra</groupId>
   <artifactId>cassandra-driver-core</artifactId>
   <version>2.1.6</version>
   </dependency>

   <dependency>
   <groupId>com.datastax.spark</groupId>
   <artifactId>spark-cassandra-connector-java_2.10</artifactId>
   <version>1.5.0-M3</version>
   </dependency>

   <dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-collections4</artifactId>
   <version>4.1</version>
   </dependency>
   </dependencies>

person Rahul Koshaley    schedule 11.03.2016    source источник


Ответы (1)


Мы проводим тесты с локальным искровым контекстом и используем следующий «хак» для решения проблем с коллизиями:

sc.stop()
// To avoid Akka rebinding to the same port, since it doesn't unbind 
// immediately on shutdown

System.clearProperty("spark.driver.port")

По какой-то причине вы используете 2 разных искровых контекста? Почему нельзя использовать только 1?

person Igor Berman    schedule 11.03.2016
comment
Спасибо за ответ, мне вызвать System.clearProperty(spark.driver.port) до sc.stop() или после него, и извините за глупость, откуда System.clearProperty выберет spark.driver.port? также я использую 2 контекста искры, так как 1-й контекст считывает данные из файловой системы, а второй читает из cassandra. - person Rahul Koshaley; 11.03.2016
comment
Причина, по которой я спрашивал с тех пор, я использую порт драйвера по умолчанию и не указываю его явно, просто чтобы добавить, все ли в порядке, если я укажу его явно - person Rahul Koshaley; 11.03.2016
comment
Я явно указал порт драйвера для обоих контекстов, но теперь он дает Failed to connect to /10.230.132.121:51324, это IP-адрес одного из основных узлов. И контейнеры все еще терпят неудачу. - person Rahul Koshaley; 11.03.2016
comment
Я все еще не понимаю тебя. Вы пытались использовать только 1 контекст? Я понимаю, что вы читаете из hdfs, а затем из cassandra, но что произойдет, если вы будете использовать тот же контекст? Никогда не указывал порт явно. System.crearProperty(spark.driver.port); просто удаляет свойство spark.driver.port из зарегистрированных свойств системы (которое задается контекстом искры) - person Igor Berman; 11.03.2016
comment
Плохо, у меня сложилось впечатление, что для чтения из HDFS нам нужен другой контекст :(, но в любом случае в моем следующем задании мне нужно подключиться к MongoDB и cassandra, поэтому я думаю, мне придется использовать два разных контекста. - person Rahul Koshaley; 11.03.2016
comment
Я все еще не получаю, я должен сделать System.clearProperty(spark.driver.port) в коде после sc.stop() , но тогда откуда я должен удалить spark.driver.port ? Какой файл? - person Rahul Koshaley; 11.03.2016
comment
нет, имхо можно использовать только 1 контекст для всего. в вашем коде есть sc.stop(); просто поместите System.clearProperty(spark.driver.port) после него... вот и все (или просто используйте 1 контекст) - person Igor Berman; 11.03.2016
comment
Хорошо, спасибо, но тогда мне нужно создать две конфигурации: одну для mongodb, а затем для Cassandra, затем я создам объект контекста Spark с conf, привязанным к Mongodb, но затем для второй конфигурации, связанной с Cassandra, как я буду это использовать Объект контекста снова? - person Rahul Koshaley; 11.03.2016
comment
Juss, чтобы добавить изящный выход, все в порядке. Если я использую addShutdownHook, т.е. Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {sc.stop(); System.clearProperty(spark.drive.port) ;System.out.println(Выполняется обработчик завершения работы); } }); - person Rahul Koshaley; 11.03.2016
comment
если ваша конфигурация для cassandra и mongo не конфликтует (т. е. у них разные свойства), то вы можете иметь 1 конфигурацию (унифицированную) для обоих вариантов использования. Что касается хука выключения, не уверен, что вам нужно очищать там свойство, так как при закрытии вашего jvm вы не сможете запустить другой контекст искры (т.е. этот System.clearProperty(spark.drive.port) необходим только тогда, когда вы запустить контекст искры - остановите его, а затем запустите другой контекст искры в том же процессе jvm) - person Igor Berman; 11.03.2016
comment
Большое спасибо, Игорь :) Вы спасатель, я протестировал его, используя один контекст, он работал, просто для эксперимента я также сделаю System.clearProperty() и поделюсь результатом, надеюсь, вопрос здесь поможет другим разработчикам. - person Rahul Koshaley; 11.03.2016
comment
здесь developer.couchbase.com/documentation/server /4.0/connectors/ , он говорит создать CouchbaseSparkContext csc = CouchbaseContext(sc); из JavaSparkContext , поэтому использование этого с cassandra будет проблемой. Пожалуйста, предложите, можно ли что-то сделать, используя один и тот же контекст как для cassandra, так и для coachdb. - person Rahul Koshaley; 14.03.2016
comment
Вы можете обернуть тот же контекст искры в контекст искры тренерской базы. - person Igor Berman; 14.03.2016