Расстояние Apache Spark между двумя точками с использованием SquaredDistance

У меня есть коллекция векторов RDD, где каждый вектор представляет собой точку с координатами x и y. Например, файл выглядит следующим образом:

1.1 1.2
6.1 4.8
0.1 0.1
9.0 9.0
9.1 9.1
0.4 2.1

Я читаю это:

  def parseVector(line: String): Vector[Double] = {
    DenseVector(line.split(' ')).map(_.toDouble)
  }

  val lines = sc.textFile(inputFile)
  val points = lines.map(parseVector).cache()

Кроме того, у меня есть эпсилон:

  val eps = 2.0

Для каждой точки я хочу найти ее соседей, которые находятся на расстоянии эпсилон. Я делаю:

points.foreach(point =>
  // squaredDistance(point, ?) what should I write here?
)

Как я могу зациклить все точки и для каждой точки найти своих соседей? Вероятно, используя функцию map?


person Bob    schedule 25.10.2014    source источник


Ответы (4)


Вы можете сделать что-то вроде:

val distanceBetweenPoints = points.cartesian(points)
    .filter{case (x,y) => (x!=y)} // remove the (x,x) diagonal
    .map{case (x,y) => ((x,y),distance(x,y))}
val pointsWithinEps = distanceBetweenPoints.filter{case ((x,y),distance) => distance <= eps)}

Вы также можете комбинировать расчет расстояния в фильтре, если впоследствии вам не нужно расстояние между точками.

person maasg    schedule 25.10.2014
comment
Большое спасибо! Я очень новичок в Spark и пытаюсь реализовать алгоритм кластеризации DBSCAN с помощью spark. Это был первый шаг. Следующий шаг — зациклить все точки — зациклить все точки и выполнить описанную выше процедуру для каждой точки. В алгоритме я должен хранить посещенные точки - должен ли я хранить их в отдельном массиве или использовать какой-либо указатель в кортеже, чтобы определить, был ли он посещен? - person Bob; 25.10.2014
comment
Также у меня есть другие вопросы по реализации. Могу ли я получить вашу консультацию, где я мог бы задать и другие вопросы. Например, через скайп. - person Bob; 25.10.2014
comment
Возможна индивидуальная консультация? Я хотел бы показать вам код и узнать, что вы думаете о нем. - person Bob; 26.10.2014
comment
@ Боб, если вы зададите свои вопросы здесь, все выиграют от этого, и вы можете получить более одного мнения / мнения о вашем подходе. - person maasg; 26.10.2014
comment
Хорошо. Спасибо! Тогда я опубликую еще один вопрос. - person Bob; 26.10.2014
comment
Пожалуйста, взгляните на этот вопрос: stackoverflow.com/ вопросы/26577032/ - person Bob; 26.10.2014

Даже если этот ответ еще не принят, я помещаю здесь в качестве уведомления о том, что принятое решение, которое в основном такое же, как и предложенное в репозитории github, которое было предложено, на самом деле не масштабируется из-за декартовой операции, которая имеет O(n^2) как сложность и с огромные наборы данных, это определенно проблема.

Есть другое решение, то есть другая реализация алгоритма DBSCAN над Spark, которую можно найти здесь https://github.com/alitouka/spark_dbscan. Это решение предлагает другой подход, который разбивает набор данных RDD на «блоки». Таким образом, близкими точками могут быть только те, которые находятся в том же ящике рассматриваемой точки, и те, которые меньше эпсилон вдали от границ смежных разбиений. Таким образом, сложность снижается до O(m^2), где m равно n/k, а k — количеству разделов. Кроме того, выполняются другие оптимизации (если вам нужны дополнительные подробности, вы можете прочитать код, связаться с автором или спросить меня).

Предыдущие реализации имеют некоторые ограничения: поддерживаются только евклидовы и манхэттенские меры, и можно успешно обрабатывать только наборы данных с очень небольшим количеством измерений. Чтобы преодолеть эти проблемы, я создал эту ветку, которая призвана устранить все эти проблемы: https://github.com/speedymrk9/spark_dbscan/tree/distance-measure-independent. Теперь, похоже, он работает нормально, и все проблемы решены, хотя я продолжаю тестировать его, чтобы убедиться, что в нем нет недостатков, прежде чем делать запрос на включение.

person mgaido    schedule 03.07.2015

Вы можете использовать библиотеку SparkAI и сделать что-то вроде:

import org.aizook.scala.clustering.Spark_DBSCAN.DBSCAN val cluster:Dbscan = new Dbscan(3,5,data) cluster.predict((2000,(48.3,33.1)))

с

`val data: RDD(Long,(Double, Double)
eps = 3
minPts = 5`
person David    schedule 10.11.2014
comment
Можете ли вы привести пример входного файла? Я получаю исключение в потоке main java.lang.UnsupportedOperationException: исключение пустой коллекции. - person Bob; 12.11.2014
comment
@Bob Это произошло потому, что (48.3,33.1) не соответствует кластеру и его следует классифицировать как шум. Я предоставил некоторую дополнительную информацию в дальнейшем - person David; 20.11.2014

@Bob Это произошло потому, что (48.3,33.1) не соответствует кластеру и его следует классифицировать как шум. Я зафиксировал обновление в библиотеке SparkAI, и оно должно возвращать -1 каждый раз, когда прогноз соответствует шуму.

import org.aizook.scala.clustering.Spark_DBSCAN.Dbscan
val eps = 2
val minPts = 2
val data = sc.textFile("data.txt").map(_.split(" ")).map(p => (p(0).trim.toDouble, p(1).trim.toDouble)).zipWithUniqueId().map(x => (x._2,x._1)).cache;
val cluster:Dbscan = new Dbscan(eps,minPts,data)
cluster.predict((data.count+1,(9.0,10.0)))  // Should return 1 for cluster 1
cluster.predict((data.count+2,(2.0,2.0)))   // Should return 0 for cluster 0
cluster.predict((data.count+3,(15.0,23.0))) // Should return -1 for noise

с data.txt, содержащим образец данных, который вы отправили:

1.1 1.2
6.1 4.8
0.1 0.1
9.0 9.0
9.1 9.1
0.4 2.1
person David    schedule 15.11.2014