График, построенный с использованием graphx, не транслируется должным образом

Я создал график с помощью graphx, и теперь мне нужно извлечь подграфы из исходного графика. users_graph — это RDD, в котором есть подграф, индексированный для пользователя. Проблема в том, что эти подграфы не вычисляются. Я получаю исключение java.lang.NullPointerException, когда пытаюсь работать с этими подграфами.

class VertexProperty(val id:Long) extends Serializable
case class User(val userId:Long, var offset:Int, val userCode:String, val Name:String, val Surname:String, val organizational_unit:String, val UME:String, val person_type:String, val SOD_HIGH:String, val SOD_MEDIUM:String, val SOD_LOW:String, val Under_mitigated:String) extends VertexProperty(userId)
case class Account(val accountId:Long, var offset:Int, val userCode:String, val userId:String, val account_creation_date:String, var disabled:String, var forcechangepwd:String, var pwdlife:String, var numberloginerror:String, var lastchangepwd:String, var lastlogin:String, var lastwronglogin:String, var state:String, var expire:String, var last_cert_time:String, var creation_date:String, var creation_user:String,var challenge_counter:String, var challenge_failed_attempt:String) extends VertexProperty(accountId) //Check if userCode is actually the code in this example.
case class Application(var applicationId:Long, var offset:Int, var Name:String, var Description:String, var Target:String, var Owner:String, var Ownercode:String, var Creation_date:String, var Creation_user:String) extends VertexProperty(applicationId)
case class Entitlement(val entitlementId:Long, var offset:Int, val Name:String, var Code:String, var Description:String, var Type:String, var Application:String, var Administrative:String, var Parent_ID:String, var Owner_code:String, var Scope_type:String, var Business_name:String, var Business_policy:String, var SOD_high:String, var SOD_medium:String, var SOD_low:String) extends VertexProperty(entitlementId)

/*
Some code for computing vertexRDD and edges
*/    

val graph: Graph[VertexProperty,String] = Graph(vertexRDD, edges, new VertexProperty(-1))
val triplets = graph.triplets
val temp = triplets.map(t => t.attr)
val distinct_users = temp.distinct.filter(t => t != "NULL")

var bcast_graph = sc.broadcast(graph)
val users_graph = distinct_users.map(du => du -> bcast_graph.value.subgraph(epred = t => t.attr == du))

person Anshit Chaudhary    schedule 07.06.2016    source источник
comment
Кроме того, я пробовал это, val user_graph = bcast_graph.value.subgraph(epred = t => t.attr == 273) // 273 — это идентификатор пользователя. И это дает мне желаемый результат. Я не знаю, почему я не получаю вывод при использовании функции карты.   -  person Anshit Chaudhary    schedule 07.06.2016
comment
Я не понимаю, почему вы вещаете. Graph, поскольку он состоит из RDDs, по своей природе распараллелен. Как вы думаете, что вы делаете с sc.broadcast(graph) ?   -  person David Griffin    schedule 07.06.2016
comment
Я хочу использовать построенный график в функции карты. Следовательно, мне нужно транслировать его перед использованием там. Я что-то упускаю?   -  person Anshit Chaudhary    schedule 07.06.2016
comment
Да, вы не можете транслировать Graph   -  person David Griffin    schedule 07.06.2016
comment
Я попытался удалить трансляцию для графика и запустил следующий код. val users_graph = different_users.map(du =› du -> graph.subgraph(epred = t =› t.attr == du)) . Это дает мне ошибку, хотя.   -  person Anshit Chaudhary    schedule 07.06.2016
comment
Да, я понял, что вы пытаетесь сделать. Смотрите мой более полный ответ через секунду или три   -  person David Griffin    schedule 07.06.2016


Ответы (1)


Короче говоря, вы не можете broadcast Graph, потому что он содержит RDD (на самом деле их пару). И вы не можете использовать Graphв функции map для RDD, потому что она состоит из RDDs.

Как я уже сказал, это длинная история, почему вы не можете сделать ни того, ни другого — и на самом деле это две стороны одной медали. Это та же проблема, с которой вы сталкиваетесь в любом случае.

Spark построен по принципу ведущий/ведомый. Именно в области памяти мастера определяются RDDs и связанные с ними мета-действия. Но код — ... внутри map(...) — выполняется на подчиненных устройствах (называемых исполнителями). Ваш код карты не может работать внутри исполнителей, когда вы каким-либо образом ссылаетесь на другой RDD -- и broadcast никогда не поможет, потому что ссылки RDD могут существовать только в мастере.

Что вы можете сделать вместо этого? У вас есть два варианта:

  1. Соберите необходимые данные с помощью collect() и либо broadcast эти данные, либо просто обратитесь к ним внутри своего map кода. collect() извлекает все данные в мастер, но, что наиболее важно для вашей проблемы, это означает, что теперь вы можете ссылаться на данные без использования ссылки RDD, поэтому вы можете отправить собранные данные своим исполнителям — либо с помощью broadcast, либо просто ссылаясь на них в вашем коде map(...) (Spark отправит копии данных вашим исполнителям). Какой из этих способов работает или будет работать, зависит от размера ваших данных, ожидаемой скорости и т. д.
  2. Используйте RDD.join() или RDD.cogroup() для одновременной работы с двумя Graphs.

Оба они усложняются тем фактом, что вы работаете со структурой более высокого порядка — GraphX ​​Graph. Вам придется поработать над отдельными Graph.vertices и Graph.edges RDDs отдельно, сделать свои collect() или join(), а затем перестроить окончательный Graph, сшив вместе соответствующие RDDs.

person David Griffin    schedule 07.06.2016
comment
Объясняет все! - person Anshit Chaudhary; 07.06.2016
comment
Я пробовал собирать EdgeTriplets, а затем транслировать их. Это дает мне ошибки, хотя. Пожалуйста, проверьте stackoverflow.com/questions /37710483/ - person Anshit Chaudhary; 08.06.2016