diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 349a8366..c5cb3af5 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -3,6 +3,7 @@ import json import logging from dataclasses import dataclass +from threading import Thread from typing import Any, Awaitable, Callable, Optional from urllib.parse import parse_qsl @@ -18,6 +19,11 @@ _logger = logging.getLogger(__name__) +def start_background_loop(loop: asyncio.AbstractEventLoop) -> None: + asyncio.set_event_loop(loop) + loop.run_forever() + + @dataclass class WebsocketConnection: scope: dict @@ -42,17 +48,26 @@ async def connect(self) -> None: elif user is None: _logger.warning("IDOM websocket is missing AuthMiddlewareStack!") - self._idom_dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop()) + self._recv_queue_loop = asyncio.new_event_loop() + t = Thread( + target=start_background_loop, args=[self._recv_queue_loop], daemon=True + ) + t.start() + asyncio.run_coroutine_threadsafe( + self._run_dispatch_loop(), self._recv_queue_loop + ) async def disconnect(self, code: int) -> None: - if self._idom_dispatcher_future.done(): - await self._idom_dispatcher_future - else: - self._idom_dispatcher_future.cancel() + self._recv_queue_loop.stop() await super().disconnect(code) async def receive_json(self, content: Any, **kwargs: Any) -> None: - await self._idom_recv_queue.put(LayoutEvent(**content)) + asyncio.run_coroutine_threadsafe( + self._recv_queue_put(content, **kwargs), self._recv_queue_loop + ) + + async def _recv_queue_put(self, content: Any, **kwargs: Any): + await self._recv_queue.put(LayoutEvent(**content)) async def _run_dispatch_loop(self): view_id = self.scope["url_route"]["kwargs"]["view_id"] @@ -78,12 +93,12 @@ async def _run_dispatch_loop(self): ) return - self._idom_recv_queue = recv_queue = asyncio.Queue() + self._recv_queue = asyncio.Queue() try: await dispatch_single_view( Layout(component_instance), self.send_json, - recv_queue.get, + self._recv_queue.get, ) except Exception: await self.close()