Skip to content

Commit 9ff7372

Browse files
committed
Threaded dispatcher loop
1 parent 71968ee commit 9ff7372

File tree

1 file changed

+43
-35
lines changed

1 file changed

+43
-35
lines changed

src/django_idom/websocket_consumer.py

+43-35
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
import asyncio
33
import json
44
import logging
5+
from threading import Thread
56
from typing import Any
67
from urllib.parse import parse_qsl
78

9+
from asgiref.sync import async_to_sync
810
from channels.generic.websocket import AsyncJsonWebsocketConsumer
911
from idom.core.dispatcher import dispatch_single_view
1012
from idom.core.layout import Layout, LayoutEvent
@@ -23,46 +25,52 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
2325

2426
async def connect(self) -> None:
2527
await super().connect()
26-
self._idom_dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop())
28+
self._idom_dispatcher_thread: Thread = Thread(target=self._run_dispatch_loop)
29+
self._idom_dispatcher_thread.daemon = True
30+
self._idom_dispatcher_thread.start()
2731

2832
async def disconnect(self, code: int) -> None:
29-
if self._idom_dispatcher_future.done():
30-
await self._idom_dispatcher_future
31-
else:
32-
self._idom_dispatcher_future.cancel()
33+
await self._idom_recv_queue.put(None)
34+
self._idom_dispatcher_thread.join()
3335
await super().disconnect(code)
3436

3537
async def receive_json(self, content: Any, **kwargs: Any) -> None:
3638
await self._idom_recv_queue.put(LayoutEvent(**content))
3739

40+
@async_to_sync
3841
async def _run_dispatch_loop(self):
39-
view_id = self.scope["url_route"]["kwargs"]["view_id"]
40-
41-
try:
42-
component_constructor = IDOM_REGISTERED_COMPONENTS[view_id]
43-
except KeyError:
44-
_logger.warning(f"Uknown IDOM view ID {view_id!r}")
45-
return
46-
47-
query_dict = dict(parse_qsl(self.scope["query_string"].decode()))
48-
component_kwargs = json.loads(query_dict.get("kwargs", "{}"))
49-
50-
try:
51-
component_instance = component_constructor(**component_kwargs)
52-
except Exception:
53-
_logger.exception(
54-
f"Failed to construct component {component_constructor} "
55-
f"with parameters {component_kwargs}"
56-
)
57-
return
58-
59-
self._idom_recv_queue = recv_queue = asyncio.Queue()
60-
try:
61-
await dispatch_single_view(
62-
Layout(component_instance),
63-
self.send_json,
64-
recv_queue.get,
65-
)
66-
except Exception:
67-
await self.close()
68-
raise
42+
while True:
43+
view_id = self.scope["url_route"]["kwargs"]["view_id"]
44+
45+
try:
46+
component_constructor = IDOM_REGISTERED_COMPONENTS[view_id]
47+
except KeyError:
48+
_logger.warning(f"Uknown IDOM view ID {view_id!r}")
49+
return
50+
51+
query_dict = dict(parse_qsl(self.scope["query_string"].decode()))
52+
component_kwargs = json.loads(query_dict.get("kwargs", "{}"))
53+
54+
try:
55+
component_instance = component_constructor(**component_kwargs)
56+
except Exception:
57+
_logger.exception(
58+
f"Failed to construct component {component_constructor} "
59+
f"with parameters {component_kwargs}"
60+
)
61+
return
62+
63+
self._idom_recv_queue = recv_queue = asyncio.Queue()
64+
try:
65+
queue_item = recv_queue.get()
66+
if queue_item is None:
67+
break
68+
69+
await dispatch_single_view(
70+
Layout(component_instance),
71+
self.send_json,
72+
lambda: queue_item,
73+
)
74+
except Exception:
75+
await self.close()
76+
raise

0 commit comments

Comments
 (0)