Как нужно обрабатывать CancelledError на типичных веб-серверах aiohttp?

У меня есть приложение веб-сервера aiohttp с такими обработчиками, как:

async def handler(request):
    async with request.app["db"].acquire() as db:
        row = await query(db)

    return aiohttp.web.json_response(row)

Где app["db"] - это какой-то объединенный ресурс (aiopg, aioredis, теперь не имеет значения). Это отлично работало до сегодняшнего дня. Ни с того ни с сего все клиенты начали отключаться по таймауту, а журналы приложений заполнились трассировками вроде

[2017-05-21 17:58:24,254] ERROR    [aiohttp.server] Error handling request
Traceback (most recent call last):
  File "/virtualenv/lib/python3.6/site-packages/aiohttp/web_server.py", line 61, in handle_request
    resp = yield from self._handler(request)
  File "/virtualenv/lib/python3.6/site-packages/aiohttp/web.py", line 249, in _handle
    resp = yield from handler(request)
  File "/visio-longer/visio_longer/views/communicate/__init__.py", line 81, in legacy_communicate
    async with request.app["db"].acquire() as db:
  File "/virtualenv/src/aiopg/aiopg/utils.py", line 140, in __aenter__
    self._conn = yield from self._coro
  File "/virtualenv/src/aiopg/aiopg/sa/engine.py", line 162, in _acquire
    raw = yield from self._pool.acquire()
  File "/virtualenv/src/aiopg/aiopg/utils.py", line 67, in __iter__
    resp = yield from self._coro
  File "/virtualenv/src/aiopg/aiopg/pool.py", line 168, in _acquire
    with (yield from self._cond):
  File "/usr/lib/python3.6/asyncio/locks.py", line 67, in __iter__
    yield from self.acquire()
  File "/usr/lib/python3.6/asyncio/locks.py", line 176, in acquire
    yield from fut
concurrent.futures._base.CancelledError

Ключевым моментом здесь является получение CancelledError (отключение клиента по таймауту) при получении соединения с базой данных из пула:

  File "/visio-longer/visio_longer/views/communicate/__init__.py", line 81, in legacy_communicate
    async with request.app["db"].acquire() as db:

У меня была запущена сопрограмма, которая выводила состояние пула (size и freesize) каждые 5 секунд, и на данный момент в пуле было много свободных подключений!

Многочасовые исследования привели к теории, что получение CancelledError при выполнении диспетчера контекста пула __atexit__ прерывало процесс возврата соединения к пулу, что приводило к неисправности пула. Я нашел фиксацию, которая исправляла это поведение в asyncpg, aioredis содержит аналогичный код , Я также сделал неудобное исправление для aiopg. Ничего из этого не помогло - я все еще получал одинаковые ошибки от aioredis и aiopg.

Ситуация разрешилась заменой

async def handler(request):
    async with request.app["db"].acquire() as db:
        row = await query(db)

    return aiohttp.web.json_response(row)

путем обертывания каждого фрагмента кода, использующего пулы соединений, asyncio.shield:

async def handler(request):
    async def process():
        async with request.app["db"].acquire() as db:
            row = await query(db)

    return aiohttp.web.json_response(asyncio.shield(process(row)))

Таким образом, прерванные запросы по-прежнему обрабатывались до конца (включая возврат полученных ресурсов в пул).

Так должно быть? Теперь мой код выглядит ужасно, и нет никакой гарантии, что в следующий раз я не забуду обернуть свои пулы asyncio.shield. Как правильно решить эту проблему (очевидно, библиотеки сами не могут это исправить)?


person themylogin    schedule 21.05.2017    source источник
comment
Я уверен, что вы найдете vorpus.org/blog/ control-c-handle-in-python-and-trio интересно.   -  person Quentin Pradet    schedule 24.05.2017


Ответы (1)


Я не понимаю, чем это отличается от любого другого исключения. CanceledError - это очередная ошибка. Все ваши функции выхода из менеджеров контекста запускаются, поэтому у них должна быть возможность высвободить ресурсы. Кажется, что каким-то образом будущее, связанное с блокировкой в ​​пуле соединений, отменяется. Это кажется проблематичным и, похоже, не должно быть связано с отменой клиентского соединения. Я предполагаю, что существует сопрограмма, соответствующая выполняемому запросу, и эта сопрограмма должна быть отменена при потере соединения. Этот вызов Task.cancel вызовет CanceledError внутри сопрограммы. Если предположить, что CanceledError в конечном итоге поднимается из сопрограммы, результатом Task будет CanceledError. Однако я не понимаю, как будущее, связанное с блокировками, отменяется.

Если вы ожидаете чего-то в функции выхода для диспетчера контекста, или в блоке finally, или в каком-либо другом коде очистки, вам может потребоваться больше внимания. Например, если вам нужна блокировка в функции выхода для возврата некоторого ресурса, тогда вам может понадобиться цикл while, который ловит CanceledError (и, в конечном итоге, повторно его поднимает), или использовать async.shield. Я бы сделал это только для конкретного кода в обработчике функции выхода / очистки, а не для всего вашего запроса. Также подумайте, может ли использование обратного вызова для возврата вашего ресурса (через loop.call_soon) быть лучшим выбором, чем ожидание в побочном коде очистки. Если код очистки не ожидает, то новый CanceledError не может быть поднят в нем. Функции выхода из контекстного менеджера и блоки finally могут быть запущены CanceledError в другом месте, даже если они не ожидают, но это не отличается от любого другого исключения

person Sam Hartman    schedule 21.05.2017
comment
Я описывал ситуацию, когда CancelledError происходил внутри функции выхода из диспетчера контекста. Это помешало бы ему нормально завершить работу и оставить пул в согласованном состоянии. - person themylogin; 21.05.2017
comment
Как это происходит для функции выхода из соединения с базой данных? Я как бы ожидал, что возвращение соединения с пулом ничего не будет ждать. Обновив ответ, добавив некоторые мысли о функциях выхода, хотя - person Sam Hartman; 21.05.2017
comment
Все функции возврата в пул из библиотек. Я использовал переменную условия блокировки await для уведомления сопрограмм, ожидающих, что ресурс будет доступен в пустом пуле. Экранирование этого кода не помогло (возможно, вы можете взглянуть на github.com/themylogin/aiopg/commit/ и думаете, чего не хватает?) - person themylogin; 22.05.2017
comment
Это вряд ли похоже на MCVE. Stackoverflow не предназначен для бесплатной отладки больших сложных систем. :-) - person Sam Hartman; 22.05.2017
comment
Попытка получить MCVE привела к пониманию того, что не только aexit, но и aenter тоже следует охранять. Спасибо за Ваш интерес! - person themylogin; 22.05.2017