Изявления за избор на Java API на Spark Datastax

Използвам урок тук в този Github, за да стартирам spark на cassandra с помощта на проект на java maven: https://github.com/datastax/spark-cassandra-connector.

Разбрах как да използвам директни CQL изрази, тъй като преди това съм задавал въпрос за това тук: Запитване на данни в Cassandra чрез Spark в проект на Java Maven

Сега обаче се опитвам да използвам Java API на datastax от страх, че оригиналният ми код в първоначалния ми въпрос няма да работи за Datastax версията на Spark и Cassandra. По някаква странна причина не ми позволява да използвам .where, въпреки че в документацията е посочено, че мога да използвам точно този израз. Ето моят код:

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App implements Serializable
{

    // firstly, we define a bean class
    public static class Person implements Serializable {
        private Integer id;
        private String fname;
        private String lname;
        private String role;

        // Remember to declare no-args constructor
        public Person() { }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getfname() { return fname; }
        public void setfname(String fname) { this.fname = fname; }

        public String getlname() { return lname; }
        public void setlname(String lname) { this.lname = lname; }

        public String getrole() { return role; }
        public void setrole(String role) { this.role = role; }

        // other methods, constructors, etc.
    }

    private transient SparkConf conf;
    private App(SparkConf conf) {
        this.conf = conf;
    }


    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {

        JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
        System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
       }



    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

ето я грешката:

14/09/23 13:46:53 ERROR executor.Executor: Exception in task ID 0
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
    at com.sun.proxy.$Proxy8.prepare(Unknown Source)
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:293)
    ... 27 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:156)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:131)
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:711)
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:849)
    ... 3 more
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
14/09/23 13:46:53 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; aborting job
14/09/23 13:46:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/09/23 13:46:53 INFO scheduler.DAGScheduler: Failed to run toArray at App.java:65
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/09/23 13:46:53 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

Знам, че грешката ми е точно в този раздел:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });

Когато премахна .where(), работи. Но специално в github се казва, че трябва да можете да изпълнявате съответно .where и .map функции. Някой има ли някаква аргументация за това? или решение? Благодаря.

редактиране получавам грешката да изчезне, когато вместо това използвам този израз:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("id=?", "1").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });

Нямам представа защо тази опция работи, но не и останалите ми варианти. Ето инструкциите, които изпълних в моя cql, за да знаете как изглежда моето ключово пространство:

    session.execute("DROP KEYSPACE IF EXISTS tester");
    session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
    session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
    session.execute("CREATE TABLE tester.empByRole (id INT, fname TEXT, lname TEXT, role TEXT, PRIMARY KEY (role,id))");
    session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");       

    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0001," +
                  "'Angel'," +
                  "'Pay'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0002," +
                  "'John'," +
                  "'Doe'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0003," +
                  "'Jane'," +
                  "'Doe'," +
                  "'IT Analyst'" +
                  ");");
    session.execute(
          "INSERT INTO tester.empByRole (id, fname, lname, role) " +
          "VALUES (" +
              "0001," +
              "'Angel'," +
              "'Pay'," +
              "'IT Engineer'" +
              ");");
    session.execute(
              "INSERT INTO tester.empByRole (id, fname, lname, role) " +
              "VALUES (" +
                  "0002," +
                  "'John'," +
                  "'Doe'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.empByRole (id, fname, lname, role) " +
              "VALUES (" +
                  "0003," +
                  "'Jane'," +
                  "'Doe'," +
                  "'IT Analyst'" +
                  ");");
        session.execute(
              "INSERT INTO tester.dept (id, dname) " +
              "VALUES (" +
                  "1553," +
                  "'Commerce'" +
                  ");");

person angyxpoo    schedule 23.09.2014    source източник


Отговори (2)


Методът where добавя ALLOW FILTERING към вашата заявка под кориците. Това не е магически куршум, тъй като все още не поддържа произволни полета като предикати на заявка. По принцип полето трябва да бъде или индексирано, или колона за групиране. Ако това не е практично за вашия модел на данни, можете просто да използвате метода filter в RDD. Недостатъкът е, че филтърът се извършва в Spark, а не в Cassandra.

Така че полето id работи, защото се поддържа в клауза CQL WHERE, докато аз приемам, че ролята е просто обикновено поле. Моля, обърнете внимание, че НЕ ви предлагам да индексирате полето си или да го промените на колона за групиране, тъй като не знам вашия модел на данни.

person rs_atl    schedule 24.09.2014
comment
Това е полезно, но защо не мога да използвам тази заявка с spark .where("role=?", "IT Engineer") в таблицата empByRole, където role е моят първичен ключ? - person angyxpoo; 24.09.2014

Има ограничение в Spark Cassandra Connector, че методът where няма да работи върху ключове за разделяне. Във вашата таблица empByRole ролята е ключ за разделяне, оттук и грешката. Трябва да работи правилно върху колони за клъстериране или индексирани колони (вторични индекси).

Това се проследява като проблем 37 в проекта GitHub и работата е продължава.

На страницата с документи на Java API показаните примери използват .where("name=?", "Anna"). Предполагам, че това име не е ключ за разделяне, но примерът може да е по-ясен за това.

person BrianC    schedule 24.09.2014
comment
Благодаря ви за отговора. Надяваме се, че този проблем скоро ще бъде решен. - person angyxpoo; 24.09.2014
comment
@angyxpoo страницата с документи е актуализирана, за да бъде малко по-ясно кога можете да използвате къде: github.com/datastax/spark-cassandra-connector/blob/master/doc/ - person BrianC; 29.09.2014
comment
Благодаря @BrianC Това определено е ново допълнение. Отивам да го проверя. - person angyxpoo; 30.09.2014