Проблема производительности Postgresql с HikariCP

Я пытаюсь загрузить большие данные в одну таблицу на сервере PostgreSQL (всего 40 миллионов строк) небольшими партиями (6000 строк в каждом csv). Я подумал, что HikariCP идеально подойдет для этой цели.

Это пропускная способность, которую я получаю при вставке данных с использованием Java 8 (1.8.0_65), драйвера JDBC Postgres 9.4.1211 и HikariCP 2.4.3.

6000 строк за 4 минуты 42 секунды.

Что я делаю неправильно и как увеличить скорость вставки?

Еще пару слов о моей настройке:

  • Программа работает на моем ноутбуке за корпоративной сетью.
  • Сервер Postgres 9.4 — это Amazon RDS с db.m4.large и SSD на 50 ГБ.
  • Для таблицы еще не создан явный индекс или первичный ключ.
  • Программа вставляет каждую строку асинхронно с большим пулом потоков для хранения запросов, как показано ниже:

    private static ExecutorService executorService = new ThreadPoolExecutor(5, 1000, 30L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));
    

Конфигурация источника данных:

        private DataSource getDataSource() {
                if (datasource == null) {
                    LOG.info("Establishing dataSource");
                    HikariConfig config = new HikariConfig();
                    config.setJdbcUrl(url);
                    config.setUsername(userName);
                    config.setPassword(password);
                    config.setMaximumPoolSize(600);// M4.large 648 connections tops
                    config.setAutoCommit(true); //I tried autoCommit=false and manually committed every 1000 rows but it only increased 2 minute and half for 6000 rows
                    config.addDataSourceProperty("dataSourceClassName","org.postgresql.ds.PGSimpleDataSource");
                    config.addDataSourceProperty("dataSource.logWriter", new PrintWriter(System.out));
                    config.addDataSourceProperty("cachePrepStmts", "true");
                    config.addDataSourceProperty("prepStmtCacheSize", "1000");
                    config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
                    config.setConnectionTimeout(1000);

                    datasource = new HikariDataSource(config);
                }
                return datasource;
            }

Здесь я читаю исходные данные:

    private void readMetadata(String inputMetadata, String source) {
            BufferedReader br = null;
            FileReader fr = null;
            try {
                br = new BufferedReader(new FileReader(inputMetadata));
                String sCurrentLine = br.readLine();// skip header;
                if (!sCurrentLine.startsWith("xxx") && !sCurrentLine.startsWith("yyy")) {
                    callAsyncInsert(sCurrentLine, source);
                }
                while ((sCurrentLine = br.readLine()) != null) {
                    callAsyncInsert(sCurrentLine, source);
                }
            } catch (IOException e) {
                LOG.error(ExceptionUtils.getStackTrace(e));
            } finally {
                try {
                    if (br != null)
                        br.close();

                    if (fr != null)
                        fr.close();

                } catch (IOException ex) {
                    LOG.error(ExceptionUtils.getStackTrace(ex));
                }
            }
    }

Я вставляю данные асинхронно (или пытаюсь сделать это с помощью jdbc!):

            private void callAsyncInsert(final String line, String source) {
                    Future<?> future = executorService.submit(new Runnable() {
                        public void run() {
                            try {
                                dataLoader.insertRow(line, source);
                            } catch (SQLException e) {
                                LOG.error(ExceptionUtils.getStackTrace(e));
                                try {
                                    errorBufferedWriter.write(line);
                                    errorBufferedWriter.newLine();
                                    errorBufferedWriter.flush();
                                } catch (IOException e1) {
                                    LOG.error(ExceptionUtils.getStackTrace(e1));
                                }
                            }
                        }
                    });
                    try {
                        if (future.get() != null) {
                            LOG.info("$$$$$$$$" + future.get().getClass().getName());
                        }
                    } catch (InterruptedException e) {
                        LOG.error(ExceptionUtils.getStackTrace(e));
                    } catch (ExecutionException e) {
                        LOG.error(ExceptionUtils.getStackTrace(e));
                    }
                }

Мой DataLoader.insertRow ниже:

            public void insertRow(String row, String source) throws SQLException {
                    String[] splits = getRowStrings(row);
                    Connection conn = null;
                    PreparedStatement preparedStatement = null;
                    try {
                        if (splits.length == 15) {
                            String ... = splits[0];
                            //blah blah blah

                            String insertTableSQL = "insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ";
                            conn = getConnection();
                            preparedStatement = conn.prepareStatement(insertTableSQL);
                            preparedStatement.setString(1, column1);
                            //blah blah blah
                            preparedStatement.executeUpdate();
                            counter.incrementAndGet();
                            //if (counter.get() % 1000 == 0) {
                                //conn.commit();
                            //}
                        } else {
                            LOG.error("Invalid row:" + row);
                        }
                    } finally {
                        /*if (conn != null) {
                            conn.close();   //Do preparedStatement.close(); rather connection.close
                        }*/
                        if (preparedStatement != null) {
                            preparedStatement.close();
                        }
                    }
                }

При мониторинге в pgAdmin4 я заметил несколько вещей:

  • максимальное количество транзакций в секунду было близко к 50.
  • Активная сессия базы данных была только одна, всего сессий было 15.
  • Слишком много блоков ввода-вывода (около 500, не уверен, что это должно вызывать беспокойство)

скриншот из pgAdmin


person bkrish    schedule 03.03.2017    source источник
comment
Уменьшите размер пула соединений и количество используемых потоков: большее количество соединений (и больше потоков) не обязательно приводит к повышению производительности, есть даже точка (которая, вероятно, намного ниже ваших текущих настроек), когда больше соединений (и больше потоков) потоков) фактически приведет к снижению производительности и пропускной способности. Кроме того, вы должны закрыть соединение в своем методе, который возвращает его в пул соединений для повторного использования.   -  person Mark Rotteveel    schedule 03.03.2017
comment
Кроме того, действительно ли вы проверили, связано ли узкое место с асинхронной вставкой, возможно, проблема связана с кодом, который вы не показываете (который вызывает callAsyncInsert).   -  person Mark Rotteveel    schedule 03.03.2017
comment
Спасибо за ответ:   -  person bkrish    schedule 04.03.2017
comment
Уменьшен пул соединений и количество потоков до 10. Кроме того, закрытое соединение после вставки (что закрывает объект ConnectionProxy). Там, где я вызываю callAsyncInsert, нет ничего сложного, просто прочитайте csv и передайте его в callAsyncInsert. После внесения этих изменений осталось 4 минуты 42 секунды. Какие-нибудь мысли?   -  person bkrish    schedule 04.03.2017
comment
Вы можете попробовать не делать это асинхронно и вместо этого использовать пакетную вставку? Покажите код, который вызывает callAsyncInsert? Также имейте в виду, что подключение с вашего ноутбука к базе данных, размещенной на AWS, может иметь небольшую задержку. Вы проверяли это на локальной базе данных?   -  person Mark Rotteveel    schedule 04.03.2017
comment
Я обновил вопрос с кодом, который вызывает callAsyncInsert. Я попробую с пакетной вставкой без асинхронности. Я не могу установить локальную базу данных, но моя конечная цель — выполнить этот код из EC2.   -  person bkrish    schedule 04.03.2017


Ответы (1)


Вы абсолютно точно хотите использовать пакетные вставки, когда инструкция подготавливается вне цикла и отключается автоматическая фиксация. В псевдокоде:

PreparedStatement stmt = conn.prepareStatement("insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
while ( <data> ) {
   stmt.setString(1, column1);
   //blah blah blah
   stmt.addBatch();
}
stmt.executeBatch();
conn.commit();

Даже один поток в одном соединении должен иметь возможность вставлять> 5000 строк в секунду.

ОБНОВЛЕНИЕ. Если вы хотите использовать многопоточность, количество подключений должно быть равно количеству ядер ЦП базы данных x1,5 или 2. Количество потоков обработки должно соответствовать этому, и каждый поток обработки должен обрабатывать один файл CSV с использованием шаблона выше. Однако вы можете обнаружить, что много одновременных вставок в одну и ту же таблицу создают слишком много конфликтов за блокировку в БД, и в этом случае вам нужно уменьшить количество потоков обработки, пока не будет найден оптимальный параллелизм.

Пул правильного размера и параллелизм должны быть выше, чтобы достичь > 20 000 строк в секунду.

Также обновите HikariCP до версии 2.6.0.

person brettw    schedule 04.03.2017
comment
Количество потоков для многопоточного импорта определяется не только количеством ЦП на сервере, но и количеством жестких дисков на этом сервере. - person a_horse_with_no_name; 04.03.2017
comment
@a_horse_with_no_name Хотя это и правда, с Amazon RDS узнать этот номер невозможно. - person brettw; 04.03.2017
comment
Ok. Я изменил программу в соответствии с предложением. Обновлен до 2.6.0. Добавлена ​​пакетная вставка и используется только соединение для загрузки данных. Теперь я вижу большую разницу с двумя разными типами набора данных. Набор данных № 1 составляет 500 тыс. строк в одном CSV-файле (точнее, 499951) — 00:02:08,670 минут. Набор данных № 2 составляет 498 КБ в 83 файлах CSV, где каждые 6 КБ строк занимают 00:02:09,674 минуты. Таким образом, я могу получить пропускную способность 3840 ish/sec. Я мог бы получить больше, если бы у меня не было тяжелой системы регистрации макросов, обработки ошибок и т. д. Но я доволен этим. Большое спасибо мистеру Вулриджу за этот фреймворк и Марку за помощь. - person bkrish; 06.03.2017