Я пытаюсь добавить значения в столбец с заданным типом через JAVA API.
Кажется, что соединитель игнорирует тип CollectionBehavior, который я устанавливаю, и всегда переопределяет предыдущую коллекцию.
Даже когда я использую CollectionRemove, удаляемое значение добавляется в коллекцию.
Я следую примеру, показанному на:
Я использую:
- искра-сердечник_2.11 2.2.0
- искра-кассандра-коннектор_2.11 2.0.5
- Кассандра 2.1.17
Может быть, эта функция не поддерживается в этих версиях?
Вот код реализации:
// CASSANDRA TABLE
CREATE TABLE test.profile (
id text PRIMARY KEY,
dates set<bigint>,
)
// ENTITY
public class ProfileRow {
public static final Map<String, String> namesMap;
static {
namesMap = new HashMap<>();
namesMap.put("id", "id");
namesMap.put("dates", "dates");
}
private String id;
private Set<Long> dates;
public ProfileRow() {}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Set<Long> getDates() {
return dates;
}
public void setDates(Set<Long> dates) {
this.dates = dates;
}
}
public void execute(JavaSparkContext context) {
List<ProfileRow> elements = new LinkedList<>();
ProfileRow profile = new ProfileRow();
profile.setId("fGxTObQIXM");
Set<Long> dates = new HashSet<>();
dates.add(1l);
profile.setDates(dates);
elements.add(profile);
JavaRDD<ProfileRow> rdd = context.parallelize(elements);
RDDAndDStreamCommonJavaFunctions<T>.WriterBuilder wb = javaFunctions(rdd)
.writerBuilder("test", "profile", mapToRow(ProfileRow.class, ProfileRow.namesMap));
CollectionColumnName appendColumn = new CollectionColumnName("dates", Option.empty(), CollectionAppend$.MODULE$);
scala.collection.Seq<ColumnRef> columnRefSeq = JavaApiHelper.toScalaSeq(Arrays.asList(appendColumn));
SomeColumns columnSelector = SomeColumns$.MODULE$.apply(columnRefSeq);
wb.withColumnSelector(columnSelector);
wb.saveToCassandra();
}
Спасибо,
Шай