WrappedArray из WrapedArray в массив java

У меня есть столбец с установленным типом, и я использую collect_set() API набора данных spark, который возвращает обернутый массив обернутого массива. Мне нужен один массив из всех значений вложенных обернутых массивов. Как я могу это сделать?

Например. Стол Кассандры:

Col1  
{1,2,3}
{1,5}

Я использую Spark Dataset API.
row.get(0) возвращает обернутый массив обернутого массива.


person rohanagarwal    schedule 26.07.2017    source источник


Ответы (2)


Предположим, у вас есть Dataset<Row> ds, у которого есть столбец value.

+-----------------------+
|value                  |
+-----------------------+
|[WrappedArray(1, 2, 3)]|
+-----------------------+

И у него есть схема ниже

root
 |-- value: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: integer (containsNull = false)

Использование пользовательской функции

Определите UDF1, как показано ниже.

static UDF1<WrappedArray<WrappedArray<Integer>>, List<Integer>> getValue = new UDF1<WrappedArray<WrappedArray<Integer>>, List<Integer>>() {
      public List<Integer> call(WrappedArray<WrappedArray<Integer>> data) throws Exception {
        List<Integer> intList = new ArrayList<Integer>();
        for(int i=0; i<data.size(); i++){
            intList.addAll(JavaConversions.seqAsJavaList(data.apply(i)));
        }
        return intList;
    }
};

Зарегистрируйтесь и позвоните UDF1, как показано ниже

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.callUDF;
import scala.collection.JavaConversions;

//register UDF
spark.udf().register("getValue", getValue, DataTypes.createArrayType(DataTypes.IntegerType));

//Call UDF
Dataset<Row> ds1  = ds.select(col("*"), callUDF("getValue", col("value")).as("udf-value"));
ds1.show();

Использование функции разнесения

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.explode;

Dataset<Row> ds2 = ds.select(explode(col("value")).as("explode-value"));
ds2.show(false);
person abaghel    schedule 26.07.2017
comment
да, это можно сделать, я пробовал по-другому, я взорвал наборы, а затем агрегировал их с помощью collect_set(), так что был только один массив. Вы говорите мне взорвать результат collect_set(). В обоих случаях меня беспокоит, будет ли значительное падение производительности или нет? Вот почему я выбрал сглаживание. Также вы можете указать мне на некоторые учебные пособия, книги и т. Д. Для искры + java (а не scala) + API набора данных - person rohanagarwal; 26.07.2017
comment
Я отредактировал свой ответ, чтобы получить массив с использованием UDF. Надеюсь это поможет. - person abaghel; 17.08.2017

Если у вас есть кадр данных, вы можете использовать udf для выравнивания списка. Ниже приведен простой пример.

import spark.implicits._

import org.apache.spark.sql.functions._
//create a dummy data

val df = Seq(
  (1, List(1,2,3)),
  (1, List (5,7,9)),
  (2, List(4,5,6)),
  (2,List(7,8,9))
).toDF("id", "list")

val df1 = df.groupBy("id").agg(collect_set($"list").as("col1"))

df1.show(false)

Выход для df1:

+---+----------------------------------------------+
|id |col1                                          |
+---+----------------------------------------------+
|1  |[WrappedArray(1, 2, 3), WrappedArray(5, 7, 9)]|
|2  |[WrappedArray(7, 8, 9), WrappedArray(4, 5, 6)]|
+---+----------------------------------------------+


val testUDF = udf((list: Seq[Seq[Integer]]) => {list.flatten})


df1.withColumn("newCol", testUDF($"col1")).show(false)

Выход

+---+----------------------------------------------+------------------+
|id |col1                                          |newCol            |
+---+----------------------------------------------+------------------+
|1  |[WrappedArray(1, 2, 3), WrappedArray(5, 7, 9)]|[1, 2, 3, 5, 7, 9]|
|2  |[WrappedArray(7, 8, 9), WrappedArray(4, 5, 6)]|[7, 8, 9, 4, 5, 6]|
+---+----------------------------------------------+------------------+

Надеюсь, это поможет!

person koiralo    schedule 26.07.2017
comment
не могли бы вы опубликовать эквивалентный код Java для udf. Я видел эту функцию сглаживания в Seq‹Seq‹Integer››, но не мог использовать ее должным образом. - person rohanagarwal; 26.07.2017
comment
Надеюсь, это поможет stackoverflow.com/questions/35348058/ - person koiralo; 26.07.2017
comment
На самом деле мне нужна реализация для flatten, она не так проста, как list.flatten в Java, потому что Scala богаче. Документы для flatten - это одна строка, для меня это не имеет смысла :( - person rohanagarwal; 26.07.2017
comment
вы можете написать udf и прокрутить массив, а затем создать новый массив, который будет плоским. - person koiralo; 26.07.2017