Spark job бросает NPE

В работе Spark я читаю из Kafka и после некоторых вычислений сохраняю данные в Cassandra.

Исходный код

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import com.edureka.capstone.Customer;
import com.edureka.capstone.FileProperties;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

import org.apache.spark.streaming.Duration;

public class SparkStreamingCustomerJob {

    private final static Logger LOGGER = LoggerFactory.getLogger(SparkStreamingCustomerJob.class);

    private static SparkConf conf = null;

    private static Map<String, String> kafkaParams = new HashMap<>();

    private static JavaStreamingContext ssc = null;

    static {
        Properties prop = FileProperties.properties;
        if (Boolean.parseBoolean(prop.get("localmode").toString())) {
            conf = new SparkConf().setMaster("local[*]");
        } else {
            conf = new SparkConf();
        }
        conf.setAppName(SparkStreamingCustomerJob.class.getName());
        conf.set("spark.cassandra.connection.host", prop.get("com.smcc.app.cassandra.host").toString());
        if (prop.get("spark.cassandra.auth.username") != null) {
            conf.set("spark.cassandra.auth.username", prop.get("spark.cassandra.auth.username").toString());
            conf.set("spark.cassandra.auth.password", prop.get("spark.cassandra.auth.password").toString());
        } else {
            conf.set("hadoop.home.dir", "/");
        }
        conf.setAppName(SparkStreamingCardJob.class.getName());

        kafkaParams.put("metadata.broker.list", prop.get("metadata.broker.list").toString());
        kafkaParams.put("auto.offset.reset", prop.get("auto.offset.reset").toString());
        kafkaParams.put("group.id", prop.get("group.id").toString());
        kafkaParams.put("enable.auto.commit", prop.get("enable.auto.commit").toString());

    }

    static VoidFunction<Tuple2<String, String>> mapFunc = new VoidFunction<Tuple2<String, String>>() {

        private static final long serialVersionUID = 1L;

        @Override
        public void call(Tuple2<String, String> arg0) {
            try {
                Schema.Parser parser = new Schema.Parser();
                Schema schema = parser.parse(FileProperties.CUSTOMER_AVRO);
                Injection<GenericRecord, String> recordInjection = GenericAvroCodecs.toJson(schema);
                GenericRecord record = recordInjection.invert(arg0._2).get();

                Customer customer = new Customer(Long.parseLong(record.get("customerId").toString()),
                        record.get("customerName").toString(), record.get("mobileNumber").toString(),
                        record.get("gender").toString(), Long.parseLong(record.get("bithDate").toString()),
                        record.get("email").toString(), record.get("address").toString(),
                        record.get("state").toString(), record.get("country").toString(),
                        Long.parseLong(record.get("pincode").toString()));

                    List<Customer> customerList = Arrays.asList(customer);

                    LOGGER.error("Customer List = {} jsc = {} ", customerList,
                            JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()));

                    CassandraTableScanJavaRDD<CassandraRow> 
      //Throwing master url not found
      customerGenderDetails = javaFunctions(
                            JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()))
                                    .cassandraTable("capstone", "customer").where("customerid = 11111");
                    String gender = null;
                    if (customerGenderDetails.count() > 0) {
                        gender = customerGenderDetails.first().getString("gender");
                    }

                    LOGGER.info("GENDER = {} ", gender);

                    // JavaRDD<Customer> newRDD =
                    // JavaSparkContext.fromSparkContext(SparkContext.getOrCreate())
                    // .parallelize(customerList);

                    JavaRDD<Customer> newRDD = ssc.sparkContext().parallelize(customerList);

                    LOGGER.info("newRDD = {} ", newRDD);

                    javaFunctions(newRDD).writerBuilder("capstone", "customer", mapToRow(Customer.class)).saveToCassandra();
                    LOGGER.error("SAVED TO CASSANDRA");
                } catch (Exception e) {
                    LOGGER.error("Exception occured while parsing = {} ", e.getMessage());
                    throw e;
                }

                }
            };

            public static void main(String[] args) throws InterruptedException {

                ssc = new JavaStreamingContext(JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf)),
                        new Duration(2000));

                Set<String> topics = Collections.singleton("customer_topic");

                JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
                        String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

                VoidFunction<JavaPairRDD<String, String>> iterateFunc = new VoidFunction<JavaPairRDD<String, String>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(JavaPairRDD<String, String> arg0) throws Exception {
                        if (!arg0.isEmpty())
                            arg0.foreach(mapFunc);
                    }
                };

                directKafkaStream.foreachRDD(iterateFunc);

                ssc.start();
                ssc.awaitTermination();

            }

        }
  1. Если я использую JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()) для получения экземпляра SparkContext, я получаю, что основной URL-адрес не найден.
  2. Если я использую ssc.sparkContext(), я получаю NPE. Даже если я использую static для искрового контекста, он вызывает NPE.

В режиме local[*] он работает нормально, но при работе выше с пряжей он выдает вышеуказанные исключения.

spark-submit --class SparkStreamingCustomerJob --master yarn --deploy-mode cluster spark-cassandra-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Спасибо.


person cody123    schedule 20.12.2017    source источник
comment
Так как проблема возникает только при работе на пряже. Вы должны предоставить больше информации, чтобы мы могли вам помочь, например, HADOOP_CONF_DIR и YARN_CONF_DIR, содержимое xmls, настройки пряжи и то, как вы вызвали отправку/оболочку spark.   -  person Ire    schedule 20.12.2017
comment
HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop YARN_HOME=/opt/cloudera/parcels/CDH/lib/hadoop-yarn   -  person cody123    schedule 21.12.2017