У меня есть приложение веб-сервера aiohttp с такими обработчиками, как:
async def handler(request):
async with request.app["db"].acquire() as db:
row = await query(db)
return aiohttp.web.json_response(row)
Где app["db"]
- это какой-то объединенный ресурс (aiopg
, aioredis
, теперь не имеет значения). Это отлично работало до сегодняшнего дня. Ни с того ни с сего все клиенты начали отключаться по таймауту, а журналы приложений заполнились трассировками вроде
[2017-05-21 17:58:24,254] ERROR [aiohttp.server] Error handling request
Traceback (most recent call last):
File "/virtualenv/lib/python3.6/site-packages/aiohttp/web_server.py", line 61, in handle_request
resp = yield from self._handler(request)
File "/virtualenv/lib/python3.6/site-packages/aiohttp/web.py", line 249, in _handle
resp = yield from handler(request)
File "/visio-longer/visio_longer/views/communicate/__init__.py", line 81, in legacy_communicate
async with request.app["db"].acquire() as db:
File "/virtualenv/src/aiopg/aiopg/utils.py", line 140, in __aenter__
self._conn = yield from self._coro
File "/virtualenv/src/aiopg/aiopg/sa/engine.py", line 162, in _acquire
raw = yield from self._pool.acquire()
File "/virtualenv/src/aiopg/aiopg/utils.py", line 67, in __iter__
resp = yield from self._coro
File "/virtualenv/src/aiopg/aiopg/pool.py", line 168, in _acquire
with (yield from self._cond):
File "/usr/lib/python3.6/asyncio/locks.py", line 67, in __iter__
yield from self.acquire()
File "/usr/lib/python3.6/asyncio/locks.py", line 176, in acquire
yield from fut
concurrent.futures._base.CancelledError
Ключевым моментом здесь является получение CancelledError
(отключение клиента по таймауту) при получении соединения с базой данных из пула:
File "/visio-longer/visio_longer/views/communicate/__init__.py", line 81, in legacy_communicate
async with request.app["db"].acquire() as db:
У меня была запущена сопрограмма, которая выводила состояние пула (size
и freesize
) каждые 5 секунд, и на данный момент в пуле было много свободных подключений!
Многочасовые исследования привели к теории, что получение CancelledError
при выполнении диспетчера контекста пула __atexit__
прерывало процесс возврата соединения к пулу, что приводило к неисправности пула. Я нашел фиксацию, которая исправляла это поведение в asyncpg
, aioredis
содержит аналогичный код , Я также сделал неудобное исправление для aiopg
. Ничего из этого не помогло - я все еще получал одинаковые ошибки от aioredis
и aiopg
.
Ситуация разрешилась заменой
async def handler(request):
async with request.app["db"].acquire() as db:
row = await query(db)
return aiohttp.web.json_response(row)
путем обертывания каждого фрагмента кода, использующего пулы соединений, asyncio.shield
:
async def handler(request):
async def process():
async with request.app["db"].acquire() as db:
row = await query(db)
return aiohttp.web.json_response(asyncio.shield(process(row)))
Таким образом, прерванные запросы по-прежнему обрабатывались до конца (включая возврат полученных ресурсов в пул).
Так должно быть? Теперь мой код выглядит ужасно, и нет никакой гарантии, что в следующий раз я не забуду обернуть свои пулы asyncio.shield
. Как правильно решить эту проблему (очевидно, библиотеки сами не могут это исправить)?