Skip to content

Threaded dispatcher loop #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
31 changes: 23 additions & 8 deletions src/django_idom/websocket_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
from dataclasses import dataclass
from threading import Thread
from typing import Any, Awaitable, Callable, Optional
from urllib.parse import parse_qsl

Expand All @@ -18,6 +19,11 @@
_logger = logging.getLogger(__name__)


def start_background_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()


@dataclass
class WebsocketConnection:
scope: dict
Expand All @@ -42,17 +48,26 @@ async def connect(self) -> None:
elif user is None:
_logger.warning("IDOM websocket is missing AuthMiddlewareStack!")

self._idom_dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop())
self._recv_queue_loop = asyncio.new_event_loop()
t = Thread(
target=start_background_loop, args=[self._recv_queue_loop], daemon=True
)
t.start()
asyncio.run_coroutine_threadsafe(
self._run_dispatch_loop(), self._recv_queue_loop
)

async def disconnect(self, code: int) -> None:
if self._idom_dispatcher_future.done():
await self._idom_dispatcher_future
else:
self._idom_dispatcher_future.cancel()
self._recv_queue_loop.stop()
await super().disconnect(code)

async def receive_json(self, content: Any, **kwargs: Any) -> None:
await self._idom_recv_queue.put(LayoutEvent(**content))
asyncio.run_coroutine_threadsafe(
self._recv_queue_put(content, **kwargs), self._recv_queue_loop
)

async def _recv_queue_put(self, content: Any, **kwargs: Any):
await self._recv_queue.put(LayoutEvent(**content))

async def _run_dispatch_loop(self):
view_id = self.scope["url_route"]["kwargs"]["view_id"]
Expand All @@ -78,12 +93,12 @@ async def _run_dispatch_loop(self):
)
return

self._idom_recv_queue = recv_queue = asyncio.Queue()
self._recv_queue = asyncio.Queue()
try:
await dispatch_single_view(
Layout(component_instance),
self.send_json,
recv_queue.get,
self._recv_queue.get,
)
except Exception:
await self.close()
Expand Down