Что делать с репликами горячего резерва и проблемой с курсором на стороне сервера

У меня проблема, когда мне нужно один раз извлекать N-таблицы из базы данных реплики PostgreSQL и ежедневно все строки, которые редактируются в течение 45 дней. Реплика настроена на работу в качестве горячей резервной реплики, где обновления основной базы данных приводят к тому, что мои соединения/транзакции уничтожаются, а DatabaseError выдается.

Я попытался использовать именованный курсор на стороне сервера с размером итерации, равным 100 000, но проблема остается. Я также изменил уровень транзакции на REPEATABLE READ.

Мне нужно записать результаты таблицы SELECT * FROM в файлы Apache Avro и переместить их в облачное хранилище. Из-за нехватки места на диске эти файлы необходимо перемещать и удалять между итерациями, поэтому это требует дополнительного времени для открытия соединения.

Любые предложения, как избежать:

ERROR 2019-02-01 15:51:25,317: DatabaseError occurred: Traceback (most recent call last):
  File "main.py", line 202, in export_data
    rows = cur.fetchmany(itersize)
  File "/home/userA/data-export/lib/python2.7/site-packages/psycopg2/extras.py", line 93, in fetchmany
    res = super(DictCursorBase, self).fetchmany(size)
TransactionRollbackError: terminating connection due to conflict with recovery
DETAIL:  User query might have needed to see row versions that must be removed.
HINT:  In a moment you should be able to reconnect to the database and repeat your command.
server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.

Я также пытался выполнить эту работу с помощью Apache Sqoop, но в конечном итоге возникла та же проблема.

with connection(connection_params=connection_params) as c:
    c.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ)
    with cursor(connection=c, cursorname='{brand}_{database}_{tablename}'.format(brand=brand_name,database=db, tablename=table_name)) as cur:
        try:
            cur.itersize = 100000
            cur.execute(sql)
            results = []
            while True:
                rows = cur.fetchmany(100000)
                if not rows:
                   break
                for row in rows: 
          results.append(return_record(columns=list(row.keys()),row=row, cast_types=True))

            outputpath = './exports/file/test.avro'

            if writer(schemafile=schema_file, outputpath=outputpath, data=results):
              logging.info('Write finished, moving file {} to GCS'.format(outputpath))
              upload_failed += 1 if not upload_file(storage_config=config, filepath=outputpath) else upload_failed
            else:
              write_failed += 1
            counter += 1
            del results[:]
        except ValueError:
          logging.error('ValueError occurred: %s', traceback.format_exc()) 
          cli.report(traceback.format_exc())
        except (Exception, DatabaseError):
          logging.error('DatabaseError occurred: %s', traceback.format_exc())
          cli.report(traceback.format_exc())

person Joni Kämppä    schedule 01.02.2019    source источник


Ответы (1)


Эта ошибка не имеет ничего общего с вашим запросом, только с вашей конфигурацией репликации и длительностью запроса.

Конфликты репликации возникают, когда

  1. VACUUM удаляет старые версии строк на первичном сервере, которые могут понадобиться для выполнения длительного запроса на резервном сервере.

  2. ACCESS EXCLUSIVE блокирует первичный конфликт с запросами в резерве. Такие блокировки берутся ALTER TABLE, DROP TABLE, TRUNCATE, CREATE INDEX, CLUSTER и подобными, а также когда VACUUM усекает пустые страницы в конце таблицы.

Вы страдаете от первой проблемы.

Есть два средства:

  1. Установите hot_standby_feedback = on в режим ожидания. Тогда первичный сервер не будет удалять старые версии строк во время VACUUM, которые могут быть нужны резервному серверу. Недостатком является то, что это может привести к раздуванию таблицы на первичном сервере, если автоочистка занятых таблиц заблокирована.

  2. Установите max_standby_streaming_delay на значение, большее, чем ваш самый длинный запрос в резервной системе (или -1 для бесконечности). Затем конфликтующие изменения на первичном сервере передаются в резервный, но применение изменений задерживается. Это означает, что резерв может отставать. Эта техника также помогает при втором типе конфликта, упомянутом выше.

Вы должны сделать свой выбор, но не забывайте, что все методы имеют отрицательные стороны, которые могут быть неприемлемы.

person Laurenz Albe    schedule 01.02.2019
comment
Спасибо, Лоренц! Это подтвердило имеющиеся у меня варианты, которые немного ограничены, поскольку IT-OPS сопротивляется внесению каких-либо изменений в настройку базовой инфраструктуры. Но теперь я чувствую себя более уверенно, чтобы протолкнуть изменения :) - person Joni Kämppä; 04.02.2019