Какую переменную выбрать для увеличения меток узлов в алгоритме обнаружения сообщества

Я работаю над алгоритмом обнаружения сообщества, который использует концепцию распространения меток на узлы. У меня проблема с выбором истинного типа для переменной Label_counter.

у нас есть алгоритм с именем LPA(label propagation algorithm), который распространяет метки на узлы посредством итераций. воспринимайте ярлыки как свойство узла. начальная метка для каждого узла - это идентификатор узла, и в итерациях узлы обновляют свою новую метку на основе самой частой метки среди своих соседей. алгоритм, над которым я работаю, похож на LPA. сначала каждый узел имеет начальную метку, равную 0, а затем узлы получают новые метки. когда узлы обновляются и получают новые метки, в зависимости от некоторых условий Label_counter следует увеличивать на единицу, чтобы использовать это значение в качестве метки для других узлов. например label = 1 или label = 2 и так далее. например, у нас есть набор данных клуба zachary karate, в котором 34 узла, а в наборе данных 2 сообщества. начальное состояние такое:

 (1,0)
 (2,0)
   .
   .
   .
 (34,0)

первое число - это идентификатор узла, второе - метка. по мере того, как узлы получают новую метку, Label_counter приращения, а другие узлы в следующих итерациях получают новую метку и снова Label_counter приращения.

 (1,1)
 (2,1)
 (3,1)
   .
   .
   .
 (33,3)
 (34,3)

узлы с одинаковой меткой принадлежат одному сообществу.

у меня есть проблема: поскольку узлы в RDD и переменные распределены по машинам (каждая машина имеет копию переменных), а исполнители не имеют связи друг с другом, если исполнитель обновляет Label_counter, другие исполнители не будут проинформированы о новых значение Label_counter и, возможно, узлы получат неправильные метки. Верно ли использовать Accumulator в качестве счетчика меток в этом случае, потому что аккумуляторы являются общими переменными на разных машинах, или есть другие способы решения этой проблемы ???


person Hamid Roghani    schedule 30.08.2019    source источник


Ответы (1)


В Spark всегда сложно вычислить индексные значения, потому что они зависят от вещей, которые есть не во всех разделах. Могу предложить следующую идею.

  1. Вычислить количество раз, когда условие выполняется для каждой секции.
  2. Вычислите накопленное приращение для каждого раздела, чтобы мы знали начальное приращение для каждого раздела.
  3. Увеличивайте значения раздела на основе этого начального приращения

Вот как может выглядеть код. Позвольте мне начать с настройки нескольких вещей.

// Let's define some condition
def condition(node : Long) = node % 10 == 1

// step 0, generate the data
val rdd = spark.range(34)
    .select('id+1).repartition(10).rdd
    .map(r => (r.getAs[Long](0), 0))
    .sortBy(_._1).cache()
rdd.collect
Array[(Long, Int)] = Array((1,0), (2,0), (3,0), (4,0), (5,0), (6,0), (7,0), (8,0),
 (9,0), (10,0), (11,0), (12,0), (13,0), (14,0), (15,0), (16,0), (17,0), (18,0),
 (19,0), (20,0), (21,0), (22,0), (23,0), (24,0), (25,0), (26,0), (27,0), (28,0),
 (29,0), (30,0), (31,0), (32,0), (33,0), (34,0))

Затем суть решения:

// step 1 and 2
val partIncrInit = rdd
    // to each partition, we associate the number of times we need to increment
    .mapPartitionsWithIndex{ case (i,p) =>
        Iterator(i -> p.map(_._1).count(condition))
    }
    .collect.sorted // sort by partition index
    .map(_._2) // we don't need the index anymore
    .scanLeft(0)(_+_) // cumulated sum

// step 3, we increment each partition based on this initial increment.
val result = rdd
    .mapPartitionsWithIndex{ case (i, p) =>
        var incr = 0
        p.map{ case (node, value) =>
            if(condition(node))
                incr+=1
            (node, partIncrInit(i) + value + incr) 
        }
    }
result.collect

Array[(Long, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1),
 (9,1), (10,1), (11,2), (12,2), (13,2), (14,2), (15,2), (16,2), (17,2), (18,2),
 (19,2), (20,2), (21,3), (22,3), (23,3), (24,3), (25,3), (26,3), (27,3), (28,3),
 (29,3), (30,3), (31,4), (32,4), (33,4), (34,4))
person Oli    schedule 02.09.2019
comment
Спасибо за ваш ответ. Я понимаю, о чем вы, это не то, что я искал. но это дало мне новую идею. я должен проверить это на моем коде. спасибо за время, которое вы потратили на помощь - person Hamid Roghani; 06.09.2019
comment
Могу я спросить, что вы искали? - person Oli; 06.09.2019
comment
я имею в виду, что приращение label_counter не зависит от количества условий, выполненных на одном разделе, например, возможно, label_counter не обновляется для некоторой итерации, а после этого увеличивается, и нам нужен label_counter из предыдущей итерации. Я имею в виду, что это не похоже на то, что узлы, которые находятся в одном разделе, получают одинаковую метку, это итерационный алгоритм, который, например, узел обновляет свою метку, и мы должны распространять эту метку на своих соседей. нам нужно это значение каждый раз, чтобы передавать другим узлам. - person Hamid Roghani; 06.09.2019
comment
например, может быть, один узел находится в одном разделе, а другой сосед - в другом разделе, и в зависимости от некоторых условий эти два узла должны иметь одинаковую метку. Мне нужно, чтобы это значение было готово каждый раз для передачи узлам. label_counter не зависит от раздела. мы делаем некоторые вычисления и обновляем label_counter и распространяем его на других соседей. label_counter должен быть готов предоставить каждому узлу в любом разделе, что условие для него истинно - person Hamid Roghani; 06.09.2019