Ошибка добавления / удаления данных в коллекции Spark Cassandra Connector Java API

Я пытаюсь добавить значения в столбец с заданным типом через JAVA API.

Кажется, что соединитель игнорирует тип CollectionBehavior, который я устанавливаю, и всегда переопределяет предыдущую коллекцию.

Даже когда я использую CollectionRemove, удаляемое значение добавляется в коллекцию.

Я следую примеру, показанному на:

https://datastax-oss.atlassian.net/browse/SPARKC-340?page=com.atlassian.jira.plugin.system.issuetabpanels%3Achangehistory-tabpanel

Я использую:

  • искра-сердечник_2.11 2.2.0
  • искра-кассандра-коннектор_2.11 2.0.5
  • Кассандра 2.1.17

Может быть, эта функция не поддерживается в этих версиях?

Вот код реализации:

// CASSANDRA TABLE
CREATE TABLE test.profile (
    id text PRIMARY KEY,
    dates set<bigint>,
)

// ENTITY
public class ProfileRow {
    public static final Map<String, String> namesMap;
    static {
        namesMap = new HashMap<>();
        namesMap.put("id", "id");
        namesMap.put("dates", "dates");
    }
    private String id;
    private Set<Long> dates;
    public ProfileRow() {}
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public Set<Long> getDates() {
        return dates;
    }
    public void setDates(Set<Long> dates) {
        this.dates = dates;
    }
}


public void execute(JavaSparkContext context) {
    List<ProfileRow> elements = new LinkedList<>();
    ProfileRow profile = new ProfileRow();
    profile.setId("fGxTObQIXM");
    Set<Long> dates = new HashSet<>();
    dates.add(1l);
    profile.setDates(dates);
    elements.add(profile);
    JavaRDD<ProfileRow> rdd = context.parallelize(elements);

    RDDAndDStreamCommonJavaFunctions<T>.WriterBuilder wb = javaFunctions(rdd)
        .writerBuilder("test", "profile", mapToRow(ProfileRow.class, ProfileRow.namesMap));
    CollectionColumnName appendColumn = new CollectionColumnName("dates", Option.empty(), CollectionAppend$.MODULE$);
    scala.collection.Seq<ColumnRef> columnRefSeq = JavaApiHelper.toScalaSeq(Arrays.asList(appendColumn));
    SomeColumns columnSelector = SomeColumns$.MODULE$.apply(columnRefSeq);

    wb.withColumnSelector(columnSelector);
    wb.saveToCassandra();
}

Спасибо,

Шай


person Shai    schedule 30.05.2018    source источник


Ответы (1)


Я нашел ответ. Мне пришлось изменить 2 вещи:

  1. Добавьте столбец первичного ключа в селектор столбцов.
  2. WriterBuilder.withColumnSelector () создает новый экземпляр WriterBuilder, поэтому мне пришлось сохранить новый экземпляр.

:

RDDAndDStreamCommonJavaFunctions<T>.WriterBuilder wb = javaFunctions(rdd)
    .writerBuilder("test", "profile", mapToRow(ProfileRow.class, ProfileRow.namesMap));
ColumnName pkColumn = new ColumnName("id", Option.empty())
CollectionColumnName appendColumn = new CollectionColumnName("dates", Option.empty(), CollectionAppend$.MODULE$);
scala.collection.Seq<ColumnRef> columnRefSeq = JavaApiHelper.toScalaSeq(Arrays.asList(pkColumn, appendColumn));
SomeColumns columnSelector = SomeColumns$.MODULE$.apply(columnRefSeq);

wb = wb.withColumnSelector(columnSelector);
wb.saveToCassandra();
person Shai    schedule 31.05.2018