Как применить функцию к каждой строке в SparkR?

У меня есть файл в формате CSV, который содержит таблицу со столбцами «id», «timestamp», «action», «value» и «location». Я хочу применить функцию к каждой строке таблицы, и я уже написал код в R следующим образом:

user <- read.csv(file_path,sep = ";")
num <- nrow(user)
curLocation <- "1"
for(i in 1:num) {
    row <- user[i,]
    if(user$action != "power")
        curLocation <- row$value
    user[i,"location"] <- curLocation
}

Скрипт R отлично работает, и теперь я хочу применить его к SparkR. Однако я не мог получить доступ к i-й строке непосредственно в SparkR и не смог найти никакой функции для управления каждой строкой в ​​документация по SparkR.

Какой метод следует использовать, чтобы добиться того же эффекта, что и в R-скрипте?

Кроме того, по совету @chateaur я попытался использовать функцию dapply следующим образом:

curLocation <- "1"
schema <- structType(structField("Sequence","integer"), structField("ID","integer"), structField("Timestamp","timestamp"), structField("Action","string"), structField("Value","string"), structField("Location","string"))
setLocation <- function(row, curLoc) {
    if(row$Action != "power|battery|level"){
        curLoc <- row$Value
    }
    row$Location <- curLoc
}
bw <- dapply(user, function(row) { setLocation(row, curLocation)}, schema)
head(bw)

Затем я получил ошибку: сообщение об ошибке

Я просмотрел предупреждающее сообщение условие имеет длину> 1, и будет использоваться только первый элемент, и я нашел что-то https://stackoverflow.com/a/29969702/4942713. Это заставило меня задуматься, представляет ли параметр row в функции dapply весь раздел моего фрейма данных вместо одной строки< /сильный>? Может быть, функция dapply не является желательным решением?

Позже я попытался изменить функцию, как посоветовал @chateaur. Вместо dapply я использовал dapplyCollect, что избавляет меня от необходимости указывать схему. Оно работает!

changeLocation <- function(partitionnedDf) {
    nrows <- nrow(partitionnedDf)
    curLocation <- "1"
    for(i in 1:nrows){
        row <- partitionnedDf[i,]
        if(row$action != "power") {
            curLocation <- row$value
        }
    partitionnedDf[i,"location"] <- curLocation
    }
    partitionnedDf
}

bw <- dapplyCollect(user, changeLocation)

person Scorpion775    schedule 13.02.2017    source источник
comment
Вы можете использовать sparklyr (тот же синтаксис, что и dplyr)   -  person Mostafa    schedule 13.02.2017
comment
@DimitriPetrenko Что делать, если мне нужно использовать SparkR? Может ли SparkR добиться такого эффекта?   -  person Scorpion775    schedule 14.02.2017


Ответы (1)


Скорпион775,

Вы должны поделиться своим кодом sparkR. Не забывайте, что в R и sparkR данные обрабатываются по-разному.

Откуда: http://spark.apache.org/docs/latest/sparkr.html< / а>,

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

Затем вы можете посмотреть функцию dapply здесь: https://spark.apache.org/docs/2.1.0/api/R/dapply.html

Вот рабочий пример:

changeLocation <- function(partitionnedDf) {
    nrows <- nrow(partitionnedDf)
    curLocation <- as.integer(1)

    # Loop over each row of the partitionned data frame
    for(i in 1:nrows){
        row <- partitionnedDf[i,]

        if(row[1] != "power") {
            curLocation <- row[2]
        }
        partitionnedDf[i,3] <- curLocation
    }

    # Return modified data frame
    partitionnedDf
}

# Load data
df <- read.df("data.csv", "csv", header="false", inferSchema = "true")

head(collect(df))

# Define schema of dataframe
schema <- structType(structField("action", "string"), structField("value", "integer"),
                     structField("location", "integer"))

# Change location of each row                    
df2 <- dapply(df, changeLocation, schema)

head(df2)
person chateaur    schedule 13.02.2017
comment
Я взглянул на функцию dapply и обнаружил, что она используется для применения функции к каждому разделу SparkDataFrame. Насколько я понимаю, понятие partition не имеет ничего общего с row. Меня беспокоит то, что я не знаю, как написать функцию для применения к SparkDataFrame. В настоящее время я знаю только, как реализовать функцию, которую я хочу, в R, но не в SparkR. Не могли бы вы дать мне несколько советов? - person Scorpion775; 14.02.2017
comment
Я не эксперт по искрам, но я думаю, что разделы — это данные, разделенные для распределения по кластеру. Не могли бы вы попробовать приведенный выше пример и сказать мне, соответствует ли он вашим потребностям? - person chateaur; 14.02.2017
comment
Спасибо за совет. Я пытался следовать вашей инструкции, но получил ошибку, как показано в вопросе. - person Scorpion775; 15.02.2017
comment
Я отредактировал свой пост, попробуйте и оставьте отзыв :) Моя предыдущая ошибка заключалась в том, что я думал, что в функции dapply у нас есть строки. Фактически у нас есть фрейм данных. Я считаю, что искра обрежет фрейм данных, отправит каждую часть на другой узел и применит функцию (здесь changeLocation). Если бы кто-нибудь мог подтвердить? - person chateaur; 16.02.2017
comment
Это работает до тех пор, пока вместо этого я использую функцию dapplyCollect. В этом случае мне не нужно указывать схему. - person Scorpion775; 17.02.2017