multidplyr и group_by() и filter()

У меня есть следующий кадр данных, и я намерен найти все идентификаторы, которые имеют разное ИСПОЛЬЗОВАНИЕ, но один и тот же ТИП.

ID <- rep(1:4, each=3)
USAGE <- c("private","private","private","private",
"taxi","private","taxi","taxi","taxi","taxi","private","taxi")
TYPE <- c("VW","VW","VW","VW","MER","VW","VW","VW","VW","VW","VW","VW")
df <- data.frame(ID,USAGE,TYPE)

Если я побегу

df %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1)

Я получаю желаемый результат. Но мой исходный фрейм данных имеет > 2 млн строк. Поэтому я хотел бы использовать все свои ядра для выполнения этой операции.

Я пробовал этот код с multidplyr:

f1 <- partition(df, ID)
f2 <- f1 %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1)
f3 <- collect(f2)

Но затем появляется следующее сообщение:

Warning message: group_indices_.grouped_df ignores extra arguments

после

f1 <- partition(df, ID)

а также

Error in checkForRemoteErrors(lapply(cl, recvResult)) : 
  4 nodes produced errors; first error: Evaluation error: object 'f1' not found.

после

f2 <- f1%>% group_by(ID, TYPE) %>% filter(f1, n_distinct(USAGE)>1)

Каким будет правильный способ реализовать всю операцию в multidplyr? Большое спасибо.


person Justas Mundeikis    schedule 30.07.2017    source источник


Ответы (1)


Вы должны включить все группирующие переменные в свой вызов partition(). Таким образом, каждое ядро ​​​​имеет все данные, необходимые для выполнения расчета для данной группы.

library(tidyverse)
library(multidplyr)

fast <- df %>%
  partition(ID, TYPE) %>%
  group_by(ID, TYPE) %>%
  filter(n_distinct(USAGE) > 1) %>%
  collect()

Проверка

Вы все равно получите предупреждение о group_indices, но результаты будут такими же, как и у исходного метода dplyr.

slow <- df %>%
  group_by(ID, TYPE) %>%
  filter(n_distinct(USAGE) > 1)

fast == slow
       ID USAGE TYPE
#[1,] TRUE  TRUE TRUE
#[2,] TRUE  TRUE TRUE
#[3,] TRUE  TRUE TRUE

Бенчмаркинг

Теперь большой вопрос: это быстрее? Определение cluster позволяет нам гарантировать, что мы используем все ядра.

library(microbenchmark)
library(parallel)

cluster <- create_cluster(cores = detectCores())

fast_func <- function(df) {
  df %>%
    partition(ID, TYPE, cluster = cluster) %>%
    group_by(ID, TYPE) %>%
    filter(n_distinct(USAGE) > 1) %>%
    collect()
}

slow_func <- function(df) {
  slow <- df %>%
    group_by(ID, TYPE) %>%
    filter(n_distinct(USAGE) > 1)
}

microbenchmark(fast_func(df), slow_func(df))
# Unit: milliseconds
# expr       min        lq      mean    median        uq       max neval cld
# fast_func(df) 41.360358 47.529695 55.806609 50.529851 61.459433 133.53045   100   b
# slow_func(df)  4.717761  6.974897  9.333049  7.796686  8.468594  49.51916   100  a 

В этом случае использование параллельной обработки фактически медленнее. Медиана выполнения для fast_func занимает 56 миллисекунд вместо 9. Это связано с накладными расходами, связанными с управлением потоком данных между кластерами. Но вы сказали, что ваши данные содержат миллионы строк, так что давайте попробуем.

# Embiggen the data
df <- df[rep(seq_len(nrow(df)), each=2000000),] %>% tbl_df()

microbenchmark(fast_func(df), slow_func(df))
# Unit: seconds
# expr       min        lq      mean    median        uq       max neval cld
# fast_func(df) 43.067089 43.781144 50.754600 49.440864 55.308355 65.499095    10   b
# slow_func(df)  1.741674  2.550008  3.529607  3.246665  3.983452  7.214484    10  a 

С гигантским набором данных fast_func все еще медленнее! Бывают случаи, когда параллельная работа сэкономит огромное количество времени, но простой сгруппированный фильтр не обязательно является одним из них.

person Andrew Brēza    schedule 15.08.2017