Apache Spark разстояние между две точки с помощта на squaredDistance

Имам RDD колекции от вектори, където всеки вектор представлява точка с x и y координати. Например файлът е както следва:

1.1 1.2
6.1 4.8
0.1 0.1
9.0 9.0
9.1 9.1
0.4 2.1

чета го:

  def parseVector(line: String): Vector[Double] = {
    DenseVector(line.split(' ')).map(_.toDouble)
  }

  val lines = sc.textFile(inputFile)
  val points = lines.map(parseVector).cache()

Освен това имам епсилон:

  val eps = 2.0

За всяка точка искам да намеря нейните съседи, които са в рамките на епсилон разстояние. Правя го:

points.foreach(point =>
  // squaredDistance(point, ?) what should I write here?
)

Как мога да обвържа всички точки и за всяка точка да намеря нейните съседи? Вероятно използвате функцията map?


person Bob    schedule 25.10.2014    source източник


Отговори (4)


Бихте могли да направите нещо като:

val distanceBetweenPoints = points.cartesian(points)
    .filter{case (x,y) => (x!=y)} // remove the (x,x) diagonal
    .map{case (x,y) => ((x,y),distance(x,y))}
val pointsWithinEps = distanceBetweenPoints.filter{case ((x,y),distance) => distance <= eps)}

Можете също така да комбинирате изчислението на разстоянието във филтъра, ако след това не ви интересува разстоянието между точките.

person maasg    schedule 25.10.2014
comment
Благодаря ти много! Аз съм много нов в Spark и се опитвам да внедря DBSCAN алгоритъм за клъстериране с помощта на spark. Това беше първата стъпка. Следващата стъпка е да направите цикъл на всички точки и да направите цикъл на всички точки и да изпълните горепосочената процедура за всяка точка. В алгоритъма трябва да съхранявам посетените точки - трябва ли да ги съхранявам в отделен масив или да използвам някакъв указател в Tuple, за да определя дали е бил посетен? - person Bob; 25.10.2014
comment
Имам и други въпроси относно изпълнението. Мога ли да получа вашата консултация, където да задам и други въпроси. Например чрез Skype. - person Bob; 25.10.2014
comment
Възможно ли е индивидуална консултация? Бих искал да ви покажа кода и да разбера какво мислите за него. - person Bob; 26.10.2014
comment
@Bob, ако зададете въпросите си тук, всички ще се възползват от това и може да получите повече от едно мнение/гледна точка за вашия подход. - person maasg; 26.10.2014
comment
Добре. Благодаря! Тогава ще пусна друг въпрос. - person Bob; 26.10.2014
comment
Моля, разгледайте този въпрос: stackoverflow.com/ въпроси/26577032/ - person Bob; 26.10.2014

Дори ако този отговор все още е приет, поставям тук като бележка, че приетото решение, което е основно същото, предложено в github repo, което е предложено, не е наистина мащабируемо поради декартова операция, която има O(n^2) като сложност и с огромни набори от данни, това определено е проблем.

Има друго решение, т.е. друго изпълнение на алгоритъма DBSCAN върху Spark, което може да се намери тук https://github.com/alitouka/spark_dbscan. Това решение предлага различен подход, който разделя набора от данни RDD в „кутии“. По този начин близките точки могат да бъдат само тези в същата кутия на разглежданата точка и тези, които са по-малко от епсилон далеч от границите на съседните дялове. По този начин сложността се свежда до O(m^2), където m е n/k, като k е броят на дяловете. Освен това се извършват други оптимизации (ако имате нужда от допълнителни подробности, можете да прочетете кода, да се свържете с автора или да ме попитате).

Предишните реализации имат някои ограничения: поддържат се само евклидови и манхатънски мерки и само набор от данни с много малко измерения може да бъде обработен успешно. За да преодолея тези проблеми, създадох този клон, който има за цел да премахне всички тези проблеми: https://github.com/speedymrk9/spark_dbscan/tree/distance-measure-independent. Сега изглежда, че работи добре и всички проблеми са решени, въпреки че продължавам да го тествам, за да съм напълно сигурен, че няма недостатъци, преди да направя заявка за изтегляне.

person mgaido    schedule 03.07.2015

Можете да използвате SparkAI библиотека и да направите нещо като:

import org.aizook.scala.clustering.Spark_DBSCAN.DBSCAN val cluster:Dbscan = new Dbscan(3,5,data) cluster.predict((2000,(48.3,33.1)))

с

`val data: RDD(Long,(Double, Double)
eps = 3
minPts = 5`
person David    schedule 10.11.2014
comment
Можете ли да дадете пример за входен файл? Получавам изключение в основната нишка java.lang.UnsupportedOperationException: изключение за празна колекция. - person Bob; 12.11.2014
comment
@Bob Това беше, защото (48.3,33.1) не пасва на клъстер и трябва да се класифицира като шум. По-нататък предоставих допълнителна информация - person David; 20.11.2014

@Bob Това беше, защото (48.3,33.1) не пасва на клъстер и трябва да се класифицира като шум. Направих актуализация на библиотеката SparkAI и тя трябва да върне -1 всеки път, когато прогнозата отговаря на шума

import org.aizook.scala.clustering.Spark_DBSCAN.Dbscan
val eps = 2
val minPts = 2
val data = sc.textFile("data.txt").map(_.split(" ")).map(p => (p(0).trim.toDouble, p(1).trim.toDouble)).zipWithUniqueId().map(x => (x._2,x._1)).cache;
val cluster:Dbscan = new Dbscan(eps,minPts,data)
cluster.predict((data.count+1,(9.0,10.0)))  // Should return 1 for cluster 1
cluster.predict((data.count+2,(2.0,2.0)))   // Should return 0 for cluster 0
cluster.predict((data.count+3,(15.0,23.0))) // Should return -1 for noise

с data.txt, съдържащ извадката от данни, която сте изпратили:

1.1 1.2
6.1 4.8
0.1 0.1
9.0 9.0
9.1 9.1
0.4 2.1
person David    schedule 15.11.2014