PySpark - Времево припокриване за обект в RDD

Целта ми е да групирам обекти въз основа на времево припокриване.

Всеки обект в моя rdd съдържа start_time и end_time.

Вероятно правя това неефективно, но това, което планирам да направя, е да присвоя идентификатор на припокриване на всеки обект въз основа на това дали има припокриване във времето с някой от другите обекти. Имам логиката за времево припокриване надолу. Тогава се надявам да се групирам по това overlap_id.

Така че първо,

mapped_rdd = rdd.map(assign_overlap_id)
final_rdd = mapped_rdd.reduceByKey(combine_objects)

Сега идва въпросът ми. Как мога да напиша функцията assign_overlap_id?

def assign_overlap_id(x):
  ...
  ...
  return (overlap_id, x)

person archeezee    schedule 01.07.2015    source източник
comment
Как определяте припокриванията? Всякакъв вид, начало, край, вътрешен, равен?   -  person zero323    schedule 01.07.2015
comment
Да, търся всякакъв вид времево припокриване   -  person archeezee    schedule 01.07.2015
comment
Какво ще се случи, ако даден обект се припокрива с повече от един друг обект? И трите ще получат ли един и същ идентификатор на припокриване? След това този идентификатор на припокриване ще се слее с други идентификатори на припокриване със споделени припокриващи се обекти? Това бързо ще стане сложно. Или аз не разбирам?   -  person mattsilver    schedule 01.07.2015
comment
Да, точно така, всички обекти с всякакъв тип припокриване трябва да имат същия overlap_id. Overlap_id може да бъде уникален или всеки обект в RDD може да има един и същ overlap_id   -  person archeezee    schedule 02.07.2015


Отговори (1)


Наивно решение, използващо Spark SQL и Data Frames:

Скала:

import org.apache.spark.sql.functions.udf

case class Interval(start_time: Long, end_time: Long)

val rdd = sc.parallelize(
    Interval(0, 3) :: Interval(1, 4) ::
    Interval(2, 5) :: Interval(3, 4) ::
    Interval(5, 8) :: Interval(7, 10) :: Nil
)

val df = sqlContext.createDataFrame(rdd)

// Simple check if a given intervals overlap
def overlaps(start_first: Long, end_first: Long,
        start_second: Long, end_second: Long):Boolean = {
    (start_second > start_first & start_second < end_first) |
    (end_second > start_first & end_second < end_first) 
}

// Register udf and data frame aliases
// It look like Spark SQL doesn't support
// aliases in FROM clause [1] so we have to
// register df twice
sqlContext.udf.register("overlaps", overlaps)
df.registerTempTable("df1")
df.registerTempTable("df2")

// Join and filter
sqlContext.sql("""
     SELECT * FROM df1 JOIN df2
     WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show

И същото нещо с помощта на PySpark

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

rdd = sc.parallelize([
    (0, 3), (1, 4), 
    (2, 5), (3, 4),
    (5, 8), (7, 10)
])

df = sqlContext.createDataFrame(rdd, ('start_time', 'end_time'))

def overlaps(start_first, end_first, start_second, end_second):
    return ((start_first < start_second < end_first) or
        (start_first < end_second < end_first))

sqlContext.registerFunction('overlaps', overlaps, BooleanType())
df.registerTempTable("df1")
df.registerTempTable("df2")

sqlContext.sql("""
     SELECT * FROM df1 JOIN df2
     WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show()

Трансформации на ниско ниво с групиране по прозорец

Малко по-интелигентен подход е да се генерират двойки кандидати, като се използва прозорец с определена ширина. Ето едно доста опростено решение:

Скала:

// Generates list of "buckets" for a given interval
def genRange(interval: Interval) = interval match {
    case Interval(start_time, end_time) => {
      (start_time / 10L * 10L) to (((end_time / 10) + 1) * 10) by 1
    }
}


// For each interval generate pairs (bucket, interval)
val pairs = rdd.flatMap( (i: Interval) => genRange(i).map((r) => (r, i)))

// Join (in the worst case scenario it is still O(n^2)
// But in practice should be better than a naive
// Cartesian product
val candidates = pairs.
    join(pairs).
    map({
        case (k, (Interval(s1, e1), Interval(s2, e2))) => (s1, e1, s2, e2)
   }).distinct


// For each candidate pair check if there is overlap
candidates.filter { case (s1, e1, s2, e2) => overlaps(s1, e1, s2, e2) }

Python:

def genRange(start_time, end_time):
    return xrange(start_time / 10L * 10L, ((end_time / 10) + 1) * 10)

pairs = rdd.flatMap(lambda (s, e): ((r, (s, e)) for r in genRange(s, e)))
candidates = (pairs
    .join(pairs)
    .map(lambda (k, ((s1, e1), (s2, e2))): (s1, e1, s2, e2))
    .distinct())

candidates.filter(lambda (s1, e1, s2, e2): overlaps(s1, e1, s2, e2))

Въпреки че може да е достатъчно за някои масиви от данни за готово за производство решение, трябва да помислите за прилагане на някакъв най-съвременен алгоритъм като NCList.

  1. http://docs.datastax.com/en/datastax_enterprise/4.6/datastax_enterprise/spark/sparkSqlSupportedSyntax.html
person zero323    schedule 01.07.2015
comment
Това е страхотно - Благодаря ви много! Опитах втория, сега ще опитам първото ви sggestion с използване на кадри с данни и SparkSQL - person archeezee; 02.07.2015