Я пытаюсь понять, можно ли запустить экземпляр asyncio.Server
, когда цикл событий уже выполняется методом run_forever
(из отдельного потока, конечно). Как я понимаю, сервер можно запустить либо по loop.run_until_complete(asyncio.start_server(...))
, либо по await asyncio.start_server(...)
, если цикл уже запущен. Первый способ для меня неприемлем, так как цикл уже запущен методом run_forever. Но я также не могу использовать выражение ожидания, так как я собираюсь запустить его из-за пределов «области цикла» (например, из основного метода, который нельзя пометить как асинхронный, верно?)
def loop_thread(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.close()
print("loop clesed")
class SchedulerTestManager:
def __init__(self):
...
self.loop = asyncio.get_event_loop()
self.servers_loop_thread = threading.Thread(
target=loop_thread, args=(self.loop, ))
...
def start_test(self):
self.servers_loop_thread.start()
return self.servers_loop_thread
def add_router(self, router):
r = self.endpoint.add_router(router)
host = router.ConnectionParameters.Host
port = router.ConnectionParameters.Port
srv = TcpServer(host, port)
server_coro = asyncio.start_server(
self.handle_connection, self.host, self.port)
# does not work since add_router is not async
# self.server = await server_coro
# does not work, since the loop is already running
# self.server = self.loop.run_until_complete(server_coro)
return r
def maind():
st_manager = SchedulerTestManager()
thread = st_manager.start_test()
router = st_manager.add_router(router)
Конечно, самое простое решение — добавить все маршрутизаторы (серверы) перед запуском теста (запуском цикла). Но я хочу попробовать это реализовать, чтобы можно было добавить маршрутизатор, когда тест уже запущен. Я думал, что методы loop.call_soon
(call_soon_threadsafe
) могут мне помочь, но, похоже, они не могут запланировать сопрограмму, а просто простую функцию.
Надеюсь, что мое объяснение не очень запутанное. Заранее спасибо!