Итак, если я правильно понимаю, вы хотите иметь список смежности, в котором окончательный RDD состоит из пар ключ-значение, где ключ является узлом, а значение - списком его ребер. Возможно, вы имели в виду то, что показано ниже? Хотя я считаю, что если вы хотите, чтобы «g» отображалось в вашем окончательном RDD, было бы разумно иметь его в вашем списке ребер как ( «g», «»), поскольку вы хотите передать, что у него нет ребер.
Чтобы присоединиться, нам нужно преобразовать список узлов в пару RDD, поэтому сначала мы распараллеливаем, чтобы создать RDD, а затем сопоставляем фиктивное значение, чтобы у нас были пары ключ-значение.
Теперь мы можем соединить два СДР друг с другом, и результатом будут только те ключи, которые существуют в обоих СДР, в данном случае «a» и «f». Наконец, мы удаляем фиктивное значение, которое мы добавили к узлам RDD и groupByKey, чтобы сгруппировать наши значения вместе.
nodes = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
edges = [('a','b'), ('a','e'), ('f', 'a'), ('k','l')]
nodesRDD = sc.parallelize(nodes).map(lambda n: (n, ''))
edgesRDD = sc.parallelize(edges)
joinedRDD = nodesRDD.join(edgesRDD).map(lambda tup: (tup[0], tup[1][1]))
groupedRDD = joinedRDD.groupByKey()
groupedRDD.map(lambda x : (x[0], list(x[1]))).collect()
Out[146]: [('f', ['a']), ('a', ['b', 'e'])]
Счет аналогичен, но теперь нас не интересуют фактические значения узлов, а только их количество:
nodes = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
edges = [('a','b'), ('a','e'), ('f', 'a'), ('k','l')]
nodesRDD = sc.parallelize(nodes).map(lambda n: (n, 0))
edgesRDD = sc.parallelize(edges).map(lambda tup: (tup[0], 1))
joinedRDD = nodesRDD.join(edgesRDD).map(lambda tup: (tup[0], tup[1][1]))
reducedRDD = joinedRDD.reduceByKey(lambda a, b: a + b)
reducedRDD.collect()
Выход[159]: [('f', 1), ('a', 2)]
person
esap120
schedule
15.04.2016