Фоновая задача Tornado с использованием cx_Oracle

Я запускаю сервер Bokeh, используя базовый Tornado.

Мне нужно, чтобы сервер обновлял некоторые данные в какой-то момент. Это делается путем выборки строк из базы данных Oracle с помощью Cx_Oracle.

Благодаря PeriodicCallback Tornado проверяет каждые 30 секунд, если новые данные должны быть загружены:

server.start() 
from tornado.ioloop import PeriodicCallback
pcallback = PeriodicCallback(db_obj.reload_data_async, 10 * 1e3)
pcallback.start()
server.io_loop.start()

Где db_obj - это экземпляр класса, который заботится о функциях, связанных с БД (подключение, выборка,...).

По сути, вот как выглядит функция reload_data_async:

executor = concurrent.futures.ThreadPoolExecutor(4)

# methods of the db_obj class ...

@gen.coroutine
def reload_data_async(self):
  # ... first, some code to check if the data should be reloaded ...
  # ...
  if data_should_be_reloaded:
    new_data = yield executor.submit(self.fetch_data)

def fetch_data(self):
   """ fetch new data in the DB """
   cursor = cx.Cursor(self.db_connection) 
   cursor.execute("some SQL select request that takes time (select * from ...)")

   rows = cursor.fetchall()
   # some more processing thereafter 
   # ...

В принципе, это работает. Но когда я пытаюсь прочитать данные во время их загрузки в fetch_data (нажав для отображения в графическом интерфейсе), программа вылетает из-за состояния гонки (я полагаю?): она обращается к данным, пока они извлекаются одновременно.

Я только что обнаружил, что tornado.concurrent.futures не являются потокобезопасными:

tornado.concurrent.Future похож на concurrent.futures.Future, но не является потокобезопасным (и, следовательно, быстрее для использования с однопоточными циклами событий).

В общем, я думаю, что мне следует создать новый поток, чтобы позаботиться об операциях CX_Oracle. Могу ли я сделать это с помощью Tornado и продолжать использовать функцию PerodicCallback? Как преобразовать асинхронную операцию в потокобезопасную? Как это сделать?

PS: я использую Python 2.7

Спасибо


person carmellose    schedule 09.05.2018    source источник
comment
Что вы пытаетесь прочитать? Допустим, вы сохраняете извлеченные данные в списке (или что-то еще, что угодно), а затем пытаетесь прочитать этот список в другом потоке (основном потоке). Я не думаю, что это вызовет сбой.   -  person Sraw    schedule 09.05.2018
comment
@ Как хорошо, он вылетает во время fetchall и воспроизводим в 100% случаев. По сути, я делаю select * from ..., поэтому я пытаюсь прочитать кучу или строки, столбцы которых являются числами и строками.   -  person carmellose    schedule 09.05.2018


Ответы (1)


Решил!

@Sraw прав: это не должно вызывать сбоев.

Объяснение: fetch_data() использует cx Oracle Connection объект (self.db_connection), который НЕ является потокобезопасным по умолчанию. Установка для параметра threaded значения True оборачивает общее соединение мьютексом, как описано в документации Cx Oracle:

Ожидается, что параметр threaded будет логическим выражением, указывающим, следует ли Oracle оборачивать доступ к соединениям с помощью мьютекса. Выполнение этого в однопоточных приложениях приводит к снижению производительности примерно на 10-15%, поэтому по умолчанию используется значение False.

Итак, я в своем коде только что изменил следующее, и теперь он работает без сбоев, когда пользователь пытается получить доступ к данным во время их обновления:

# inside the connect method of the db_obj class
self.db_connection = cx.connect('connection string', threaded=True) # False by default
person carmellose    schedule 09.05.2018