Параллельные операции data.table

У меня есть объект data.table в моем скрипте, и мне нужно преобразовать отдельный столбец даты и времени в объект POSIXct. Я использую fastPOSIXct() для преобразования в POSIXct. Я нахожу некоторые узкие места с data.table операциями для этого. Основные проблемы заключаются в объединении строк перед преобразованием в POSIXct и самом преобразовании в POSIXct. Я использую stri_c(), чтобы сделать paste0() быстрее. Есть ли способ распараллелить это вычисление, чтобы ускорить его? Читаемый файл csv представляет собой большой файл размером около 2 ГБ.

структура data.table

  index = match(file, csv_files)
  print(paste0("Starting File #", index, ". Running Active Future Filter."))
  csv1 = fread(file = file)
  # create row for date in Date format
  csv1[, date := as.Date(integer())]
  csv1[, trade_date := as.character(trade_date)]
  csv1[, date := as.Date(trade_date[1], "%Y%m%d"), by = trade_date]
  csv1[, active_exp := character()]
  csv1[, active_exp := get.active.future(date = date[1]), by = date]
  nrow(csv1)
  csv1 = csv1[csv1$active_exp == as.character(csv1$contract_delivery_date),]
  # Add POSIXct
  # keep only essential columns
  csv1 = csv1[,-c(3, 6, 9, 23, 11:21)]
  print(paste0("Running POSIX"))
  # establish date-time POSIX for tick
  csv1[, date_time_char := as.character()]
  # creates date time character type to be converted to POSIXct
  csv1[, date_time_char := stri_c(date[1], trade_time[1], sep = " "), by = list(date, trade_time)]
  
  # convert date time character type to POSIXct
  csv1[, date_time := fastPOSIXct(date_time_char[1]), by = date_time_char]
  # time shift for date when time is between 17:00 and 00:00
  print("Making time adjustment")
  csv1[,adj_date_time := time.shift(date_time[1]), by = date_time]
  # convert date
  csv1[, date := as.Date(adj_date_time, tz = "UTC")]
  # overwrite current loaded file

person Pceam    schedule 28.11.2020    source источник
comment
Добро пожаловать! Пожалуйста, сделайте ваши примеры воспроизводимыми. Кроме того, что касается вопроса, я не уверен, что это какие-то узкие места с операциями data.table, чтобы сделать это, поскольку преобразование из строк в POSIXct происходит медленно.   -  person Cole    schedule 29.11.2020
comment
@Cole приносит извинения за то, что не может включить данные, они оплачены и не могут быть размещены на общедоступном форуме. Я могу отправить str() или любую другую информацию.   -  person Pceam    schedule 29.11.2020
comment
@jangorecki это полезно, спасибо!   -  person Pceam    schedule 29.11.2020
comment
Воспроизводимость не означает совместное использование личных данных. Это означает, что нужно потратить время на предоставление набора данных, который может смоделировать проблему.   -  person Cole    schedule 29.11.2020
comment
@Cole опубликовал ответ, используя ваш код и общий вывод данных.   -  person Pceam    schedule 30.11.2020
comment
привет, тебе действительно нужен by при вставке и использовании fastPOSIXct?   -  person chinsoon12    schedule 30.11.2020
comment
@ chinsoon12 Я думал то же самое, но я подумал, что это может быть связано с производительностью. Если у вас много повторяющихся дат, может быть быстрее сгруппировать по строке, а затем выполнить преобразование для одной строки, а не для всех.   -  person Cole    schedule 30.11.2020
comment
это похоже на данные тикера будущих транзакций SPX, поэтому я подозреваю, что trade_time сокращается до миллисекунд   -  person chinsoon12    schedule 01.12.2020


Ответы (2)


Это должно помочь (не проверено, потому что пример не воспроизводим):

index = match(file, csv_files)

csv1 = fread(file = file)
# create row for date in Date format
csv1[, trade_date := as.character(trade_date)]
csv1[,
     c("date", "active_exp") := {
       date1 = as.Date(.BY[[1L]], "%Y%m%d")
       active_exp = get.active.future(date1)
       .(date1, active_exp)},
     by = trade_date]

csv1 = csv1[active_exp == contract_delivery_date), -c(3, 6, 9, 23, 11:21)]

# creates date time character type to be converted to POSIXct
csv1[, 
     c("date_time", "adj_date_time", "date") = {
       date_time = fastPOSIXct(stri_c(.BY[[1L]]), .BY[[2L]], sep = " ")
       adj_date_time = time.shift(date_time)
       date = as.Date(adj_date_time, tz = "UTC")
     }
     , by = .(date, trade_time)
]

Было несколько мест, где группировка производилась на более или менее одних и тех же данных. Каждый раз, когда мы группируем, это означает, что мы вызываем forder() и должны группировать данные. Вместо этого мы можем просто попытаться сделать все сразу.

Чтобы сделать это еще быстрее, я рекомендую freading только те столбцы, которые вам нужны. Связано, после импорта файла я бы сделал фильтр active_exp == contract_delivery_date.

Редактировать Одна вещь, на которую вы могли бы обратить внимание, это использование IDate и ITime для создания столбца POSIXct. Самой медленной частью, вероятно, будет stri_c / paste, за которой следует либо группировка, либо fasttime::fastPOSIXct. Вот как избежать вставки, хотя она использует некоторые Rcpp, чтобы помочь проанализировать столбец метки времени.

Rcpp::cppFunction("
IntegerVector to_time(std::vector< std::string > x) {
  //for format hh:mm:ss 
  int n = x.size();
  IntegerVector out(n);
  
  for (int i = 0; i < n; i++){;
    const std::string xi = x[i]; 
    const int hour = stoi(xi.substr(0, 2));
    const int minute = stoi(xi.substr(3, 2));
    const int second = stoi(xi.substr(6, 2));
    out[i] = hour * 60 * 60 + minute * 60 + second;
  }
  return(out);
}
")

csv1[,
     date_time := {
       date1 = as.IDate(as.character(trade_date), "%Y%m%d")
       time = as.ITime(to_time(trade_time))
       .(as.POSIXct(date1, time, tz = "UTC"))
     }]

При репликации небольшого набора данных миллион раз это занимает 3 секунды, а fasttime::fastPOSIXct() — 20 секунд. Опубликую бенчмаркинг ниже.

Вам также следует подумать об удалении части by = исходного сообщения. Если много повторяющихся дат, то да, группировка по группам иногда может быть более эффективной. Но если есть много уникальных дат и времени, вам, вероятно, лучше пропустить шаг группировки.

csv1 = csv1[rep(seq_len(.N), 1e6L)]

## for use in different use cases
date_col = as.Date(as.character(csv1$trade_date), "%Y%m%d")
IDate_col = as.IDate(date_col)
hour_col = as.ITime(to_time(csv1$trade_time))
bench::mark(
 as_char_to_date = as.Date(as.character(csv1$trade_date), "%Y%m%d")
 ,
 as.ITime(to_time(csv1$trade_time))
 ,
 as_date_to_idate = as.IDate(date_col)
 , as.POSIXct(IDate_col, hour_col, tz = "UTC")
 ,  use_fasttime = fasttime::fastPOSIXct(paste(date_col, csv1$trade_time), "UTC")
 , check = FALSE
)

## # A tibble: 5 x 13
##   expression                                       ## min   median `itr/sec`
##   <bch:expr>                                  <bch:tm> <bch:tm>     <dbl>
## 1 as_char_to_date                                3.03s    3.03s    0.330 
## 2 as.ITime(to_time(csv1$trade_time))             2.03s    2.03s    0.494 
## 3 as_date_to_idate                              13.2ms   13.5ms   23.1   
## 4 as.POSIXct(IDate_col, hour_col, tz = "UTC") 929.14ms 929.14ms    1.08  
## 5 use_fasttime                                  20.39s   20.39s    0.0490
person Cole    schedule 29.11.2020
comment
Это значительно ускорило процесс, но по-прежнему занимает (неоправданно) много времени. - person Pceam; 30.11.2020
comment
См. редактирование. Также обратите внимание, что Rcpp можно относительно легко сделать параллельно с OpenMP. Я протестировал его с 8 потоками, и он был примерно в 4 раза быстрее на шаге ITime примерно за 1,8 с для перехода от Date -> POSIXct против 18 с для Date -> fasttime. - person Cole; 30.11.2020
comment
Это прекрасно работает. Отредактировал одну из скобок date1 = as.IDate(as.character(trade_date, "%Y%m%d")) на date1 = as.IDate(as.character(trade_date), "%Y%m%d") - person Pceam; 02.12.2020
comment
Отредактировано. Вы можете чувствовать себя свободно, чтобы принять. Кроме того, я бы удалил ваш ответ, хотя, возможно, отредактировал ваш вопрос, включив в него набор данных. - person Cole; 02.12.2020

РЕДАКТИРОВАТЬ: Как было предложено @Cole, я использовал эту функцию С++, а также IDate и ITime, чтобы ускорить это. Я не проверял это, но могу сказать, что он работает примерно в 20 раз быстрее, чем предыдущий код. Код ниже.

for(file in csv_files){

  index = match(file, csv_files)
  csv1 = fread(file = file, drop = c(3, 6, 9, 23, 11:21))
  
  options(warn = -1)
  # get date and time and classify as POSIXct
  csv1[,
       c("date_time", "date", "time") := {
         
         time = as.ITime(to_time(trade_time))
         date1 = as.IDate(as.character(trade_date), "%Y%m%d")
         if(hour(time) >= 17){
            POSIX = as.POSIXct(date1, time, tz = "UTC") - days(1)
            .(POSIX, as.Date(POSIX), time)
         }else{
            POSIX = as.POSIXct(date1, time, tz = "UTC")
            .(POSIX, as.Date(POSIX), time)
         }
       }]
  options(warn = 0)
  # covert trade date to character format
  csv1[, trade_date := as.character(trade_date)]
  # run active_exp
  csv1[,
       active_exp := get.active.future(as.Date(date[1])),
       by = date]
  # filter only active future
  csv1 = csv1[active_exp == as.character(contract_delivery_date)]
  # write file
  file_path = paste0("C:/Users/ocean/Documents/CME Data/Active Daily Data CSVs/", index, ".csv")
  fwrite(csv1, file = file_path)
  
  print(paste0("Progress = ", round(index/length(csv_files), 3)*100, "%"))
}

СТАРЫЙ: Я использовал код, предоставленный @Cole, но все еще запускал его довольно долгое время. Я немного отредактировал код, чтобы исправить синтаксис, как показано ниже. файл = csv_files[1] index = match(файл, csv_files)

csv1 = fread(file = file, drop = c(3, 6, 9, 23, 11:21))
# create row for date in Date format
csv1[, trade_date := as.character(trade_date)]
csv1[,
     c("date", "active_exp") := {
       date1 = as.Date(.BY[[1L]], "%Y%m%d")
       active_exp = get.active.future(date1)
       .(date1, active_exp)},
     by = trade_date]

csv1 = csv1[active_exp == contract_delivery_date]

# creates date time character type to be converted to POSIXct
csv1[, 
     c("date_time", "adj_date_time", "date") := {
       date_time = fastPOSIXct(stri_c(.BY[[1L]], .BY[[2L]], sep = " "))
       adj_date_time = time.shift(date_time)
       date = as.Date(adj_date_time, tz = "UTC")
     }
     , by = .(date, trade_time)
]

Чтобы попытаться предложить воспроизводимый пример, dput() данных приведены ниже.

    structure(list(trade_date = c(20200115L, 20200115L, 20200115L, 
20200115L, 20200115L, 20200115L), trade_time = c("17:00:00", 
"17:00:00", "17:00:00", "17:00:00", "17:00:00", "17:00:00"), 
    trade_sequence_number = c(9028350L, 9028357L, 9028366L, 9028394L, 
    9028397L, 9028400L), session_indicator = c("E", "E", "E", 
    "E", "E", "E"), ticker_symbol = c("ES", "ES", "ES", "ES", 
    "ES", "ES"), future_option_index_indicator = c("F", "F", 
    "F", "F", "F", "F"), contract_delivery_date = c(2003L, 2003L, 
    2003L, 2003L, 2003L, 2003L), trade_quantity = c(176L, 0L, 
    3L, 2L, 4L, 10L), strike_price = c(0L, 0L, 0L, 0L, 0L, 0L
    ), trade_price = c(3287.75, 3287.75, 3288, 3288, 3288, 3288
    ), ask_bid_type = c(NA, NA, NA, NA, NA, NA), indicative_quote_type = c(NA, 
    NA, NA, NA, NA, NA), market_quote = c(NA, NA, NA, NA, NA, 
    NA), close_open_type = c("", "O", "", "", "", ""), valid_open_exception = c(NA, 
    NA, NA, NA, NA, NA), post_close = c(NA, NA, NA, NA, NA, NA
    ), cancel_code_type = c(NA, NA, NA, NA, NA, NA), insert_code_type = c(NA, 
    NA, NA, NA, NA, NA), fast_late_indicator = c(NA, NA, NA, 
    NA, NA, NA), cabinet_indicator = c(NA, NA, NA, NA, NA, NA
    ), book_indicator = c(NA, NA, NA, NA, NA, NA), entry_date = c(20200114L, 
    20200114L, 20200114L, 20200114L, 20200114L, 20200114L), exchange_code = c("XCME", 
    "XCME", "XCME", "XCME", "XCME", "XCME")), row.names = c(NA, 
-6L), class = c("data.table", "data.frame"), .internal.selfref = <pointer: 0x000001ef853c1ef0>)

Учитывая размер файла (2-4 ГБ для 38 файлов), мне просто нужно изучить С++ или позволить этому работать всю ночь?

person Pceam    schedule 29.11.2020
comment
Можете ли вы контролировать данные, представленные в csv? fread возьмет дату в формате yyyy-mm-dd и автоматически прочитает ее как формат IDate. - person Cole; 30.11.2020
comment
@Cole Я не могу контролировать данные, поскольку они извлекаются из базы данных с использованием запроса API cURL. - person Pceam; 30.11.2020