Ошибки в PageRank GraphFrames

Я новичок в pyspark и пытаюсь понять, как работает PageRank. Я использую Spark 1.6 в Jupyter на Cloudera. Скриншоты моих вершин и ребер (а также схемы) находятся по этим ссылкам: verticesRDD и edgeRDD

У меня есть следующий код:

#import relevant libraries for Graph Frames
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from graphframes import *

#Read the csv files 
verticesRDD = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true').load("filepath/station.csv")
edgesRDD = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true').load("filepath/trip.csv")

#Renaming the id columns to enable GraphFrame 
verticesRDD = verticesRDD.withColumnRenamed("station_ID", "id")
edgesRDD = edgesRDD.withColumnRenamed("Trip ID", "id")
edgesRDD = edgesRDD.withColumnRenamed("Start Station", "src")
edgesRDD = edgesRDD.withColumnRenamed("End Station", "dst")

#Register as temporary tables for running the analysis
verticesRDD.registerTempTable("verticesRDD")
edgesRDD.registerTempTable("edgesRDD")
#Note: whether i register the RDDs as temp tables or not, i get the same results... so im not sure if this step is really needed

#Make the GraphFrame
g = GraphFrame(verticesRDD, edgesRDD)

Теперь, когда я запускаю функцию pageRank:

g.pageRank(resetProbability=0.15, maxIter=10)

Py4JJavaError: произошла ошибка при вызове o98.run .: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 79.0 не удалась 1 раз, последний сбой: потерянная задача 0.0 на этапе 79.0 (TID 2637, localhost): scala.MatchError: [null, null, [913460,765,8 / 31/2015 23:26, Harry Bridges Plaza (Ferry Building), 50,8 / 31/2015 23:39, San Francisco Caltrain (Townsend на 4-м), 70 288, подписчик, 2139]] (класса org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

results = g.pageRank(resetProbability=0.15, maxIter=10, sourceId="id")

Py4JJavaError: ошибка при вызове o166.run .: org.graphframes.NoSuchVertexException: алгоритм GraphFrame с учетом идентификатора вершины, которого нет в Graph. Идентификатор идентификатора вершины не содержится в GraphFrame (v: [id: int, name: string, lat: double, long: double, dockcount: int, ориентир: строка, установка: строка], e: [src: string, dst: string , id: int, Duration: int, Start Date: string, Start Terminal: int, End Date: string, End Terminal: int, Bike #: int, Subscriber Type: string, Zip Code: string])

ranks = g.pageRank.resetProbability(0.15).maxIter(10).run()

AttributeError: объект 'функция' не имеет атрибута 'resetProbability'

ranks = g.pageRank(resetProbability=0.15, maxIter=10).run()

Py4JJavaError: произошла ошибка при вызове o188.run .: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 90.0 не удалась 1 раз, последний сбой: потерянная задача 0.0 на этапе 90.0 (TID 2641, localhost): scala.MatchError: [null, null, [913460,765,8 / 31/2015 23:26, Harry Bridges Plaza (Ferry Building), 50,8 / 31/2015 23:39, San Francisco Caltrain (Townsend на 4-м), 70 288, подписчик, 2139]] (класса org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

Я читаю PageRank, но не понимаю, где я ошибаюсь .. любой помощь будет оценена


person vikram    schedule 25.05.2018    source источник


Ответы (1)


Проблема заключалась в том, как я определял свои вершины. Я переименовал «station_id» в «id», хотя на самом деле это должно было быть «name». Итак, эта строка

verticesRDD = verticesRDD.withColumnRenamed("station_ID", "id")

должно быть

verticesRDD = verticesRDD.withColumnRenamed("name", "id")

pageRank правильно работает с этим изменением!

person Vikram Devatha    schedule 30.05.2018