Потоковая передача результатов с помощью Blaze и SqlAlchemy

Я пытаюсь использовать Blaze/Odo для чтения большого (~ 70 миллионов строк) набора результатов из Redshift. По умолчанию SqlAlchemy попытается прочитать весь результат в память, прежде чем начать его обработку. Этого можно избежать с помощью execution_options(stream_results=True) в движке/сеансе или yield_per(sane_number) в запросе. При работе из Blaze SqlAchemy запросы генерируются незаметно, оставляя подход execution_options. К сожалению, следующие броски и ошибка.

from sqlalchemy import create_engine
from blaze import Data

redshift_params = (redshift_user, redshift_pass, redshift_endpoint, port, dbname)
engine_string = "redshift+psycopg2://%s:%s@%s:%d/%s" % redshift_params

engine = create_engine(engine_string,
       execution_options=dict(stream_results=True)
       )

db = Data(engine)

Исключение составляет:

...
/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.pyc in __buffer_rows(self)
   1124             return
   1125         size = getattr(self, '_bufsize', 1)
-> 1126         self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
   1127         self._bufsize = self.size_growth.get(size, size)
   1128         if self._max_row_buffer is not None:

InternalError: (psycopg2.InternalError) opening multiple cursors from within the same client connection is not allowed.

Если я пропущу execution_options=dict(stream_results=True), то вышеприведенное сработает, но я сделаю что-то вроде

odo(db.mytable, 'mytable.bcolz')

закончится память для больших таблиц.

Использование execution_options(stream_results=True) работает с pandas.read_csv. Следующий код работает нормально, используя лишь умеренное количество памяти:

from sqlalchemy import create_engine
import pandas as pd

redshift_params = (redshift_user, redshift_pass, redshift_endpoint, port, dbname)
engine_string = "postgresql+psycopg2://%s:%s@%s:%d/%s" % redshift_params

engine = create_engine(engine_string,
       execution_options=dict(stream_results=True)
       )
compression='bz2'
res = pd.read_sql_query(queryString
            engine,
            chunksize=2**20)
for i, df in enumerate(res):
    df.to_csv('results-%s.csv.%s' % (i, compression), compression=compression)

Это полная трассировка стека:

...

Data(engine)

No handlers could be found for logger "sqlalchemy.pool.QueuePool"
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/mahler/anaconda/lib/python2.7/site-packages/blaze/interactive.py", line 122, in Data
    dshape = discover(data)
  File "/home/mahler/anaconda/lib/python2.7/site-packages/multipledispatch/dispatcher.py", line 164, in __call__
    return func(*args, **kwargs)
  File "/home/mahler/anaconda/lib/python2.7/site-packages/odo/backends/sql.py", line 242, in discover
    return discover(metadata)
  File "/home/mahler/anaconda/lib/python2.7/site-packages/multipledispatch/dispatcher.py", line 164, in __call__
    return func(*args, **kwargs)
  File "/home/mahler/anaconda/lib/python2.7/site-packages/odo/backends/sql.py", line 248, in discover
    metadata.reflect(views=metadata.bind.dialect.supports_views)
  File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/sql/schema.py", line 3623, in reflect
    bind.dialect.get_view_names(conn, schema)
  File "<string>", line 2, in get_view_names
  File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/reflection.py", line 42, in cache
    return fn(self, con, *args, **kw)
  File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/dialects/postgresql/base.py", line 2347, in get_view_names
    for row in connection.execute(s)]
  File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 713, in __iter__
    row = self.fetchone()
  File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 1026, in fetchone
    self.cursor, self.context)
  File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
    exc_info
  File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 200, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb)
  File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 1017, in fetchone
    row = self._fetchone_impl()
  File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 1139, in _fetchone_impl
    self.__buffer_rows()
  File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 1126, in __buffer_rows
    self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
sqlalchemy.exc.InternalError: (psycopg2.InternalError) opening multiple cursors from within the same client connection is not allowed.

person Daniel Mahler    schedule 10.02.2016    source источник
comment
В удаленном ответе пользователя Qbaza ниже говорится: Я заметил, что получаю ту же ошибку при использовании Pandas, и мой запрос возвращает 0 строк, поэтому убедитесь, что вы действительно получаете действительный результат из БД. Это была моя проблема.   -  person jeremycg    schedule 14.06.2017
comment
@jeremycg Тот же код возвращал результаты, если я не устанавливал stream_results=True   -  person Daniel Mahler    schedule 14.06.2017
comment
Да, это не окончательный ответ, но я добавил его в качестве комментария на случай, если у следующего гуглера возникнет та же проблема, и недостаточно представителей, чтобы увидеть удаленные ответы.   -  person jeremycg    schedule 14.06.2017