Правильное использование трансляции в Spark

Допустим, у меня есть RDD, например (используя pyspark):

 RDDstrings = sc.parallelize(['alpha','alpha4','veta','gamma','delta'])

и я хочу сравнить каждую строку с другими, используя функцию расстояния между строками. Таким образом, конечным результатом в этом случае будет матрица 5x5 с диагональю, которая будет иметь значение 1, поскольку строковое значение сравнивается с собственным значением на диагоналях.

Итак, моя идея заключалась в следующем:

  1. Я создал функцию StringDistance(string,alist), которая принимает на вход string для сравнения с alist строками.
  2. Трансляция RDDstrings:

    broadcastedRDDstrings = sc.broadcast(RDDstrings.collect())
    
  3. Сопоставьте начальный RDD:

     stringsDistances = RDDstrings.map(lambda string:StringDistance(string,broadcastedRDDstrings.value))
    

Итак, в этом преобразовании я сравниваю каждую строку из исходного RDD с теми же строками RDD, которые были переданы позже.

Результаты правильные. Но мой вопрос в том, является ли это лучшим способом реализации и является ли это правильным использованием вещания. Или я должен cache() начальный RDD?

Приветствуются любые отрицательные отзывы.


person Mpizos Dimitris    schedule 03.02.2016    source источник


Ответы (2)


Я бы предложил другой подход.

def patheticDistance(a, b):
    return len(a) - len(b)

rdd = sc.parallelize(["maritza", "alberto", "andres", "dakota", "miguel"]).sortBy(lambda x: x)

distances = (rdd.cartesian(rdd)
                .groupByKey()
                .map(lambda (x, y): [patheticDistance(x, a) for a in y])))

distances.collect()
# [[-1, 0, 0, -1, 0],
#  [-1, 0, 0, -1, 0],
#  [ 0, 1, 1,  0, 1],
#  [-1, 0, 0, -1, 0],
#  [ 0, 1, 1,  0, 1]]
person Alberto Bonsanto    schedule 03.02.2016

Вы можете использовать метод cartesian в RDD, например

rdd.cartesian(rdd)

person Marek Dudek    schedule 05.02.2016