Я новичок в pyspark и пытаюсь понять, как работает PageRank. Я использую Spark 1.6 в Jupyter на Cloudera. Скриншоты моих вершин и ребер (а также схемы) находятся по этим ссылкам: verticesRDD и edgeRDD
У меня есть следующий код:
#import relevant libraries for Graph Frames
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from graphframes import *
#Read the csv files
verticesRDD = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true').load("filepath/station.csv")
edgesRDD = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true').load("filepath/trip.csv")
#Renaming the id columns to enable GraphFrame
verticesRDD = verticesRDD.withColumnRenamed("station_ID", "id")
edgesRDD = edgesRDD.withColumnRenamed("Trip ID", "id")
edgesRDD = edgesRDD.withColumnRenamed("Start Station", "src")
edgesRDD = edgesRDD.withColumnRenamed("End Station", "dst")
#Register as temporary tables for running the analysis
verticesRDD.registerTempTable("verticesRDD")
edgesRDD.registerTempTable("edgesRDD")
#Note: whether i register the RDDs as temp tables or not, i get the same results... so im not sure if this step is really needed
#Make the GraphFrame
g = GraphFrame(verticesRDD, edgesRDD)
Теперь, когда я запускаю функцию pageRank:
g.pageRank(resetProbability=0.15, maxIter=10)
Py4JJavaError: произошла ошибка при вызове o98.run .: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 79.0 не удалась 1 раз, последний сбой: потерянная задача 0.0 на этапе 79.0 (TID 2637, localhost): scala.MatchError: [null, null, [913460,765,8 / 31/2015 23:26, Harry Bridges Plaza (Ferry Building), 50,8 / 31/2015 23:39, San Francisco Caltrain (Townsend на 4-м), 70 288, подписчик, 2139]] (класса org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
results = g.pageRank(resetProbability=0.15, maxIter=10, sourceId="id")
Py4JJavaError: ошибка при вызове o166.run .: org.graphframes.NoSuchVertexException: алгоритм GraphFrame с учетом идентификатора вершины, которого нет в Graph. Идентификатор идентификатора вершины не содержится в GraphFrame (v: [id: int, name: string, lat: double, long: double, dockcount: int, ориентир: строка, установка: строка], e: [src: string, dst: string , id: int, Duration: int, Start Date: string, Start Terminal: int, End Date: string, End Terminal: int, Bike #: int, Subscriber Type: string, Zip Code: string])
ranks = g.pageRank.resetProbability(0.15).maxIter(10).run()
AttributeError: объект 'функция' не имеет атрибута 'resetProbability'
ranks = g.pageRank(resetProbability=0.15, maxIter=10).run()
Py4JJavaError: произошла ошибка при вызове o188.run .: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 90.0 не удалась 1 раз, последний сбой: потерянная задача 0.0 на этапе 90.0 (TID 2641, localhost): scala.MatchError: [null, null, [913460,765,8 / 31/2015 23:26, Harry Bridges Plaza (Ferry Building), 50,8 / 31/2015 23:39, San Francisco Caltrain (Townsend на 4-м), 70 288, подписчик, 2139]] (класса org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
Я читаю PageRank, но не понимаю, где я ошибаюсь .. любой помощь будет оценена