У меня проблема, когда мне нужно один раз извлекать N-таблицы из базы данных реплики PostgreSQL и ежедневно все строки, которые редактируются в течение 45 дней. Реплика настроена на работу в качестве горячей резервной реплики, где обновления основной базы данных приводят к тому, что мои соединения/транзакции уничтожаются, а DatabaseError выдается.
Я попытался использовать именованный курсор на стороне сервера с размером итерации, равным 100 000, но проблема остается. Я также изменил уровень транзакции на REPEATABLE READ.
Мне нужно записать результаты таблицы SELECT * FROM в файлы Apache Avro и переместить их в облачное хранилище. Из-за нехватки места на диске эти файлы необходимо перемещать и удалять между итерациями, поэтому это требует дополнительного времени для открытия соединения.
Любые предложения, как избежать:
ERROR 2019-02-01 15:51:25,317: DatabaseError occurred: Traceback (most recent call last):
File "main.py", line 202, in export_data
rows = cur.fetchmany(itersize)
File "/home/userA/data-export/lib/python2.7/site-packages/psycopg2/extras.py", line 93, in fetchmany
res = super(DictCursorBase, self).fetchmany(size)
TransactionRollbackError: terminating connection due to conflict with recovery
DETAIL: User query might have needed to see row versions that must be removed.
HINT: In a moment you should be able to reconnect to the database and repeat your command.
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
Я также пытался выполнить эту работу с помощью Apache Sqoop, но в конечном итоге возникла та же проблема.
with connection(connection_params=connection_params) as c:
c.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ)
with cursor(connection=c, cursorname='{brand}_{database}_{tablename}'.format(brand=brand_name,database=db, tablename=table_name)) as cur:
try:
cur.itersize = 100000
cur.execute(sql)
results = []
while True:
rows = cur.fetchmany(100000)
if not rows:
break
for row in rows:
results.append(return_record(columns=list(row.keys()),row=row, cast_types=True))
outputpath = './exports/file/test.avro'
if writer(schemafile=schema_file, outputpath=outputpath, data=results):
logging.info('Write finished, moving file {} to GCS'.format(outputpath))
upload_failed += 1 if not upload_file(storage_config=config, filepath=outputpath) else upload_failed
else:
write_failed += 1
counter += 1
del results[:]
except ValueError:
logging.error('ValueError occurred: %s', traceback.format_exc())
cli.report(traceback.format_exc())
except (Exception, DatabaseError):
logging.error('DatabaseError occurred: %s', traceback.format_exc())
cli.report(traceback.format_exc())