Тестване на канали 2.0 с pytest-asyncio, получаване на RuntimeError: цикълът на събития е затворен

Опитвам се да тествам нов channels 2.0 с pytest-asyncio (0.8.0). Ако поставя различни твърдения в една и съща функция като:

import json
import pytest
from concurrent.futures._base import TimeoutError
from channels.testing import WebsocketCommunicator
from someapp.consumers import MyConsumer


@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():
    path = 'foo'
    communicator = WebsocketCommunicator(MyConsumer, path)
    connected, subprotocol = await communicator.connect()
    assert connected
    return communicator


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1_and_2():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

тогава не получавам грешка. Но ако отделя тестови случаи като:

@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_2():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

тогава получавам следната грешка при второто receive_form извикване:

with pytest.raises(TimeoutError):
>           await communicator.receive_from()

someapp/tests/test_consumers_async.py:106: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/generic/websocket.py:19: in websocket_connect
self.connect()
someapp/consumers.py:22: in connect
self.group_name, self.channel_name)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:49: in __call__
return call_result.result()
/usr/lib/python3.6/concurrent/futures/_base.py:432: in result
return self.__get_result()
/usr/lib/python3.6/concurrent/futures/_base.py:384: in __get_result
raise self._exception
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:63: in main_wrap
result = await self.awaitable(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels_redis/core.py:290: in group_add
await connection.expire(group_key, self.group_expiry)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/commands/__init__.py:152: in __exit__
self._release_callback(conn)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/pool.py:361: in release
conn.close()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:352: in close
self._do_close(ConnectionForcedCloseError())
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:359: in _do_close
self._writer.transport.close()
/usr/lib/python3.6/asyncio/selector_events.py:621: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/lib/python3.6/asyncio/base_events.py:574: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

def _check_closed(self):
if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/lib/python3.6/asyncio/base_events.py:357: RuntimeError

Също така, ако го направя (както в https://channels.readthedocs.io/en/latest/topics/testing.html):

await communicator.disconnect()

вместо:

await communicator.send_input({
    "type": "websocket.disconnect",
    "code": 1000,
})

след това се повдига следната грешка:

>       await communicator.disconnect()

someapp/tests/test_consumers_async.py:96: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:100: in disconnect
    await self.future
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
    await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
    return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
    return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
    result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
    return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
    handler(message)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <someapp.consumers.ChatConsumer object at 0x7f38fcc55240>
message = {'code': 1000, 'type': 'websocket.disconnect'}

    def websocket_disconnect(self, message):
        """
            Called when a WebSocket connection is closed. Base level so you don't
            need to call super() all the time.
            """
        # TODO: group leaving
>       self.disconnect(message["code"])
E       TypeError: disconnect() takes 1 positional argument but 2 were given

Какво трябва да направя, за да отделя тези тестови случаи в съответните отделни тестови функции?


Редактиране: грешката при прекъсване на връзката е тривиална - забравих да добавя този позиционен аргумент (code) в метода на подклас:

def disconnect(self, code):
    AsyncToSync(self.channel_layer.group_discard)('foo', self.channel_name)

Редактиране 2: беше грешка, свързана с канала redis - корекцията на грешка е в най-новите channels и asgiref главни клонове в github.


person kradem    schedule 14.02.2018    source източник
comment
Предполагам, че може би това може да е глобалната връзка или цикълът на глобалните събития, можете да опитате pytestmark = pytest.mark.asyncio(forbid_global_loop=True) и се чудя дали има нещо подобно в djangodb   -  person ZhouQuan    schedule 16.02.2018
comment
Без подобрения; Страхувам се, че това е отхвърлено (Параметърът forbid_global_loop е премахнат.).   -  person kradem    schedule 19.02.2018


Отговори (1)


това помага ли (Въпреки това не го тествах.)

@pytest.fixture
async def communicator(db):
    path = 'foo'
    communicator = WebsocketCommunicator(MyConsumer, path)
    connected, subprotocol = await communicator.connect()
    assert connected
    return communicator


@pytest.mark.asyncio
async def test_1(communicator):
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })


@pytest.mark.asyncio
async def test_2(communicator):
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })
person yofee    schedule 16.02.2018
comment
Донякъде ми е позволено да отделям тестови случаи с този подход (леко модифициран), но тестовият ми код не работи както трябва. Може би трябва да играя с event_loop приспособление, за да реша това. Ще коментирам тук, ако и когато свърша... - person kradem; 17.02.2018