Я не могу собирать данные (скажем, для построения графика) из sparklyr (на удаленном искровом кластере) в контексте блестящего реактивного. Я могу запустить весь код вручную (скажем, в rstudio или в сеансе консоли r), и если я запускаю тот же фрагмент кода для сбора данных в блестящем, но вне реактивного контекста, он работает нормально.
Вот файл ui.R:
library(shiny)
library(shinythemes)
shinyUI(fluidPage(theme=shinytheme("darkly"),
titlePanel(HTML("Spark test")),
sidebarLayout(
sidebarPanel(
selectInput("symbol",h4("Select symbol"),choices=c("")),
actionButton("doclick","Plot")
),
mainPanel(
tags$head(tags$style("#testplot{height:85vh !important;}")),
plotOutput("testplot")
)
)
))
Файл server.R выглядит следующим образом:
library(shiny)
library(dplyr)
server<-function(input, output, session) {
source("sparktest.R")
sc<<-connectToSpark()
tdf<<-loadData()
symlist<<-as.character(tdf %>% distinct(Sym) %>% pull())
updateSelectInput(session,"symbol",choices=sort(as.character(symlist)))
plotsym<-reactiveVal(0)
#plotdf<-tdf %>% filter(Sym=="AA") %>%
# group_by(pdate) %>% summarise(dct=n()) %>% select(pdate,dct) %>%
# collect
observeEvent(input$doclick,
{
plotsym(input$symbol)
output$testplot <- renderPlot({
#plotTest2(plotdf)
plotTest(tdf,plotsym())
})
})
on.exit(spark_disconnect_all())
}
Наконец, соответствующий раздел файла sparktest.R:
plotTest<-function(df,symbol) {
plotdf<-df %>% filter(Sym==symbol) %>%
group_by(pdate) %>% summarise(dct=n()) %>% select(pdate,dct) %>%
collect
p1<-plotdf %>% ggplot(aes(x=pdate))+
geom_line(aes(y=dct))
ylab("Day count")
p1
}
plotTest2<-function(df) {
p1<-df %>% ggplot(aes(x=pdate))+
geom_line(aes(y=dct))
ylab("Day count")
p1
}
Подключение к Spark в порядке, а пользовательский интерфейс предоставляет простой селектор для биржевого символа. Если выбрано, искровые данные фильтруются для этого символа и создается простой график ежедневных подсчетов. Все нормально работает вручную, но процесс завершается сбоем при запуске через блестящую. Сбой происходит в части "сбора" конвейера в plotTest. Вот трассировка:
Warning: Error in writeBin: invalid connection
Stack trace (innermost first):
126: writeBin
125: core_invoke_method
124: invoke_method.spark_shell_connection
123: invoke_method
122: invoke.shell_jobj
121: invoke
120: spark_version
119: create_hive_context.spark_shell_connection
118: create_hive_context
117: hive_context
116: invoke
115: .local
114: dbSendQuery
113: dbSendQuery
112: db_collect.DBIConnection
111: db_collect
110: collect.tbl_sql
109: collect
108: function_list[[k]]
107: withVisible
106: freduce
105: _fseq
104: eval
103: eval
102: withVisible
101: %>%
100: plotTest
99: renderPlot [/home/rubedo/ShinyApps/sparktest/server.R#36]
89: <reactive:plotObj>
78: plotObj
77: origRenderFunc
76: output$testplot
1: runApp
Тот же результат возникает, если я пытаюсь сделать что-то вроде «собрать», будь то «тянуть» или «как_tibble»... все, что пытается уменьшить искровую таблицу до простого неленивого набора данных.
В серверном коде, если я вместо этого использую закомментированный раздел, где я выполняю фильтрацию вне наблюдаемого события, а затем делаю график для отфильтрованных данных в наблюдаемом событии, все в порядке. Но как только я пытаюсь выполнить эту фильтрацию и последующий «сбор» в реактивном контексте, это не удается.
Есть ли принципиальная несовместимость между попыткой собрать эту коллекцию в реактивном контексте? Или есть другой метод, который нужно использовать? Я отмечаю, что пытался выполнить прямой sql через dbGetQuery, и это также не удалось. Все, что пытается перевести ленивый объект в неленивый в реактивном контексте, для меня терпит неудачу.
Любая помощь приветствуется.
on.exit
запустится сразу, так что у вас не будет соединения. Этот вопрос stackoverflow связан и должен показать вам, как это сделать: заголовок stackoverflow.com/questions/23276491/ - person Fredrik Isaksson   schedule 15.06.2018