Тестирование каналов 2.0 с помощью pytest-asyncio с получением RuntimeError: цикл событий закрыт

Я пытаюсь протестировать новый channels 2.0 с pytest-asyncio (0.8.0). Если я помещаю разные утверждения в одну и ту же функцию, например:

import json
import pytest
from concurrent.futures._base import TimeoutError
from channels.testing import WebsocketCommunicator
from someapp.consumers import MyConsumer


@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():
    path = 'foo'
    communicator = WebsocketCommunicator(MyConsumer, path)
    connected, subprotocol = await communicator.connect()
    assert connected
    return communicator


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1_and_2():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

тогда я не получаю ошибку. Но если я разделю тестовые случаи, например:

@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_2():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

то я получаю следующую ошибку при втором вызове receive_form:

with pytest.raises(TimeoutError):
>           await communicator.receive_from()

someapp/tests/test_consumers_async.py:106: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/generic/websocket.py:19: in websocket_connect
self.connect()
someapp/consumers.py:22: in connect
self.group_name, self.channel_name)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:49: in __call__
return call_result.result()
/usr/lib/python3.6/concurrent/futures/_base.py:432: in result
return self.__get_result()
/usr/lib/python3.6/concurrent/futures/_base.py:384: in __get_result
raise self._exception
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:63: in main_wrap
result = await self.awaitable(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels_redis/core.py:290: in group_add
await connection.expire(group_key, self.group_expiry)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/commands/__init__.py:152: in __exit__
self._release_callback(conn)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/pool.py:361: in release
conn.close()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:352: in close
self._do_close(ConnectionForcedCloseError())
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:359: in _do_close
self._writer.transport.close()
/usr/lib/python3.6/asyncio/selector_events.py:621: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/lib/python3.6/asyncio/base_events.py:574: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

def _check_closed(self):
if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/lib/python3.6/asyncio/base_events.py:357: RuntimeError

Также, если я это сделаю (как в https://channels.readthedocs.io/en/latest/topics/testing.html):

await communicator.disconnect()

вместо:

await communicator.send_input({
    "type": "websocket.disconnect",
    "code": 1000,
})

то возникает следующая ошибка:

>       await communicator.disconnect()

someapp/tests/test_consumers_async.py:96: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:100: in disconnect
    await self.future
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
    await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
    return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
    return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
    result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
    return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
    handler(message)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <someapp.consumers.ChatConsumer object at 0x7f38fcc55240>
message = {'code': 1000, 'type': 'websocket.disconnect'}

    def websocket_disconnect(self, message):
        """
            Called when a WebSocket connection is closed. Base level so you don't
            need to call super() all the time.
            """
        # TODO: group leaving
>       self.disconnect(message["code"])
E       TypeError: disconnect() takes 1 positional argument but 2 were given

Что я должен сделать, чтобы разделить эти тестовые случаи в соответствующих отдельных тестовых функциях?


Изменить: ошибка отключения тривиальна - я забыл добавить этот позиционный аргумент (code) в метод подкласса:

def disconnect(self, code):
    AsyncToSync(self.channel_layer.group_discard)('foo', self.channel_name)

Edit2: это была ошибка, связанная с каналом Redis - исправление находится в последней версии channels и asgiref основных веток на github.


person kradem    schedule 14.02.2018    source источник
comment
Я думаю, может быть, это глобальное соединение или глобальный цикл событий, вы можете попробовать pytestmark = pytest.mark.asyncio(forbid_global_loop=True), и мне интересно, есть ли что-то подобное в djangodb   -  person ZhouQuan    schedule 16.02.2018
comment
Никаких улучшений; Боюсь, это устарело (параметр forbid_global_loop удален.).   -  person kradem    schedule 19.02.2018


Ответы (1)


Это помогает? (Хотя я не проверял.)

@pytest.fixture
async def communicator(db):
    path = 'foo'
    communicator = WebsocketCommunicator(MyConsumer, path)
    connected, subprotocol = await communicator.connect()
    assert connected
    return communicator


@pytest.mark.asyncio
async def test_1(communicator):
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })


@pytest.mark.asyncio
async def test_2(communicator):
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })
person yofee    schedule 16.02.2018
comment
При таком подходе (с небольшими изменениями) мне вроде как разрешено разделять тестовые примеры, но мой тестовый код работает не так, как должен. Может быть, мне стоит поиграть с приспособлением event_loop, чтобы решить эту проблему. Я прокомментирую здесь, если и когда я закончу ... - person kradem; 17.02.2018