Как мне собирать данные из sparklyr в блестящем реактивном контексте?

Я не могу собирать данные (скажем, для построения графика) из sparklyr (на удаленном искровом кластере) в контексте блестящего реактивного. Я могу запустить весь код вручную (скажем, в rstudio или в сеансе консоли r), и если я запускаю тот же фрагмент кода для сбора данных в блестящем, но вне реактивного контекста, он работает нормально.

Вот файл ui.R:

library(shiny)
library(shinythemes)

shinyUI(fluidPage(theme=shinytheme("darkly"),

titlePanel(HTML("Spark test")),

sidebarLayout(
  sidebarPanel(
      selectInput("symbol",h4("Select symbol"),choices=c("")),
      actionButton("doclick","Plot")
  ),

mainPanel(
    tags$head(tags$style("#testplot{height:85vh !important;}")),
   plotOutput("testplot")
)
  )
))

Файл server.R выглядит следующим образом:

library(shiny)
library(dplyr)

server<-function(input, output, session) {

  source("sparktest.R")
  sc<<-connectToSpark()
  tdf<<-loadData()
  symlist<<-as.character(tdf %>% distinct(Sym) %>% pull()) 

  updateSelectInput(session,"symbol",choices=sort(as.character(symlist)))
  plotsym<-reactiveVal(0)

  #plotdf<-tdf %>% filter(Sym=="AA") %>% 
  #  group_by(pdate) %>% summarise(dct=n()) %>% select(pdate,dct) %>%
  #  collect

  observeEvent(input$doclick,
  {
     plotsym(input$symbol)      
     output$testplot <- renderPlot({
      #plotTest2(plotdf)
      plotTest(tdf,plotsym())
    })
   })


 on.exit(spark_disconnect_all())
}

Наконец, соответствующий раздел файла sparktest.R:

plotTest<-function(df,symbol) {
  plotdf<-df %>% filter(Sym==symbol) %>% 
  group_by(pdate) %>% summarise(dct=n()) %>% select(pdate,dct) %>%
  collect

  p1<-plotdf %>% ggplot(aes(x=pdate))+
  geom_line(aes(y=dct))
  ylab("Day count")
  p1    
}

plotTest2<-function(df) {
  p1<-df %>% ggplot(aes(x=pdate))+
  geom_line(aes(y=dct))
  ylab("Day count")
  p1    
}

Подключение к Spark в порядке, а пользовательский интерфейс предоставляет простой селектор для биржевого символа. Если выбрано, искровые данные фильтруются для этого символа и создается простой график ежедневных подсчетов. Все нормально работает вручную, но процесс завершается сбоем при запуске через блестящую. Сбой происходит в части "сбора" конвейера в plotTest. Вот трассировка:

Warning: Error in writeBin: invalid connection
Stack trace (innermost first):
126: writeBin
125: core_invoke_method
124: invoke_method.spark_shell_connection
123: invoke_method
122: invoke.shell_jobj
121: invoke
120: spark_version
119: create_hive_context.spark_shell_connection
118: create_hive_context
117: hive_context
116: invoke
115: .local
114: dbSendQuery
113: dbSendQuery
112: db_collect.DBIConnection
111: db_collect
110: collect.tbl_sql
109: collect
108: function_list[[k]]
107: withVisible
106: freduce
105: _fseq
104: eval
103: eval
102: withVisible
101: %>%
100: plotTest
 99: renderPlot [/home/rubedo/ShinyApps/sparktest/server.R#36]
 89: <reactive:plotObj>
 78: plotObj
 77: origRenderFunc
 76: output$testplot
  1: runApp

Тот же результат возникает, если я пытаюсь сделать что-то вроде «собрать», будь то «тянуть» или «как_tibble»... все, что пытается уменьшить искровую таблицу до простого неленивого набора данных.

В серверном коде, если я вместо этого использую закомментированный раздел, где я выполняю фильтрацию вне наблюдаемого события, а затем делаю график для отфильтрованных данных в наблюдаемом событии, все в порядке. Но как только я пытаюсь выполнить эту фильтрацию и последующий «сбор» в реактивном контексте, это не удается.

Есть ли принципиальная несовместимость между попыткой собрать эту коллекцию в реактивном контексте? Или есть другой метод, который нужно использовать? Я отмечаю, что пытался выполнить прямой sql через dbGetQuery, и это также не удалось. Все, что пытается перевести ленивый объект в неленивый в реактивном контексте, для меня терпит неудачу.

Любая помощь приветствуется.


person organguy    schedule 13.06.2018    source источник
comment
Скорее всего, проблема с подключением к искре не в порядке. on.exit запустится сразу, так что у вас не будет соединения. Этот вопрос stackoverflow связан и должен показать вам, как это сделать: заголовок stackoverflow.com/questions/23276491/   -  person Fredrik Isaksson    schedule 15.06.2018
comment
Связанные R Shiny и Spark: как освободить ресурсы Spark?   -  person Alper t. Turker    schedule 16.06.2018
comment
@FredrikIsaksson, спасибо. Замена вызова on.exit другим подходом решила проблему.   -  person organguy    schedule 17.06.2018