From 9ff73728df00c3d6d7461605735f783166692524 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 27 Oct 2021 14:50:28 -0700 Subject: [PATCH 01/15] Threaded dispatcher loop --- src/django_idom/websocket_consumer.py | 78 +++++++++++++++------------ 1 file changed, 43 insertions(+), 35 deletions(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 31f1aa38..ff7aecd9 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -2,9 +2,11 @@ import asyncio import json import logging +from threading import Thread from typing import Any from urllib.parse import parse_qsl +from asgiref.sync import async_to_sync from channels.generic.websocket import AsyncJsonWebsocketConsumer from idom.core.dispatcher import dispatch_single_view from idom.core.layout import Layout, LayoutEvent @@ -23,46 +25,52 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: async def connect(self) -> None: await super().connect() - self._idom_dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop()) + self._idom_dispatcher_thread: Thread = Thread(target=self._run_dispatch_loop) + self._idom_dispatcher_thread.daemon = True + self._idom_dispatcher_thread.start() async def disconnect(self, code: int) -> None: - if self._idom_dispatcher_future.done(): - await self._idom_dispatcher_future - else: - self._idom_dispatcher_future.cancel() + await self._idom_recv_queue.put(None) + self._idom_dispatcher_thread.join() await super().disconnect(code) async def receive_json(self, content: Any, **kwargs: Any) -> None: await self._idom_recv_queue.put(LayoutEvent(**content)) + @async_to_sync async def _run_dispatch_loop(self): - view_id = self.scope["url_route"]["kwargs"]["view_id"] - - try: - component_constructor = IDOM_REGISTERED_COMPONENTS[view_id] - except KeyError: - _logger.warning(f"Uknown IDOM view ID {view_id!r}") - return - - query_dict = dict(parse_qsl(self.scope["query_string"].decode())) - component_kwargs = json.loads(query_dict.get("kwargs", "{}")) - - try: - component_instance = component_constructor(**component_kwargs) - except Exception: - _logger.exception( - f"Failed to construct component {component_constructor} " - f"with parameters {component_kwargs}" - ) - return - - self._idom_recv_queue = recv_queue = asyncio.Queue() - try: - await dispatch_single_view( - Layout(component_instance), - self.send_json, - recv_queue.get, - ) - except Exception: - await self.close() - raise + while True: + view_id = self.scope["url_route"]["kwargs"]["view_id"] + + try: + component_constructor = IDOM_REGISTERED_COMPONENTS[view_id] + except KeyError: + _logger.warning(f"Uknown IDOM view ID {view_id!r}") + return + + query_dict = dict(parse_qsl(self.scope["query_string"].decode())) + component_kwargs = json.loads(query_dict.get("kwargs", "{}")) + + try: + component_instance = component_constructor(**component_kwargs) + except Exception: + _logger.exception( + f"Failed to construct component {component_constructor} " + f"with parameters {component_kwargs}" + ) + return + + self._idom_recv_queue = recv_queue = asyncio.Queue() + try: + queue_item = recv_queue.get() + if queue_item is None: + break + + await dispatch_single_view( + Layout(component_instance), + self.send_json, + lambda: queue_item, + ) + except Exception: + await self.close() + raise From 65ccaa6533e70d57d6ae4203c24a9984e64caa23 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 27 Oct 2021 16:43:11 -0700 Subject: [PATCH 02/15] Revert "Threaded dispatcher loop" This reverts commit 9ff73728df00c3d6d7461605735f783166692524. --- src/django_idom/websocket_consumer.py | 78 ++++++++++++--------------- 1 file changed, 35 insertions(+), 43 deletions(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index ff7aecd9..31f1aa38 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -2,11 +2,9 @@ import asyncio import json import logging -from threading import Thread from typing import Any from urllib.parse import parse_qsl -from asgiref.sync import async_to_sync from channels.generic.websocket import AsyncJsonWebsocketConsumer from idom.core.dispatcher import dispatch_single_view from idom.core.layout import Layout, LayoutEvent @@ -25,52 +23,46 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: async def connect(self) -> None: await super().connect() - self._idom_dispatcher_thread: Thread = Thread(target=self._run_dispatch_loop) - self._idom_dispatcher_thread.daemon = True - self._idom_dispatcher_thread.start() + self._idom_dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop()) async def disconnect(self, code: int) -> None: - await self._idom_recv_queue.put(None) - self._idom_dispatcher_thread.join() + if self._idom_dispatcher_future.done(): + await self._idom_dispatcher_future + else: + self._idom_dispatcher_future.cancel() await super().disconnect(code) async def receive_json(self, content: Any, **kwargs: Any) -> None: await self._idom_recv_queue.put(LayoutEvent(**content)) - @async_to_sync async def _run_dispatch_loop(self): - while True: - view_id = self.scope["url_route"]["kwargs"]["view_id"] - - try: - component_constructor = IDOM_REGISTERED_COMPONENTS[view_id] - except KeyError: - _logger.warning(f"Uknown IDOM view ID {view_id!r}") - return - - query_dict = dict(parse_qsl(self.scope["query_string"].decode())) - component_kwargs = json.loads(query_dict.get("kwargs", "{}")) - - try: - component_instance = component_constructor(**component_kwargs) - except Exception: - _logger.exception( - f"Failed to construct component {component_constructor} " - f"with parameters {component_kwargs}" - ) - return - - self._idom_recv_queue = recv_queue = asyncio.Queue() - try: - queue_item = recv_queue.get() - if queue_item is None: - break - - await dispatch_single_view( - Layout(component_instance), - self.send_json, - lambda: queue_item, - ) - except Exception: - await self.close() - raise + view_id = self.scope["url_route"]["kwargs"]["view_id"] + + try: + component_constructor = IDOM_REGISTERED_COMPONENTS[view_id] + except KeyError: + _logger.warning(f"Uknown IDOM view ID {view_id!r}") + return + + query_dict = dict(parse_qsl(self.scope["query_string"].decode())) + component_kwargs = json.loads(query_dict.get("kwargs", "{}")) + + try: + component_instance = component_constructor(**component_kwargs) + except Exception: + _logger.exception( + f"Failed to construct component {component_constructor} " + f"with parameters {component_kwargs}" + ) + return + + self._idom_recv_queue = recv_queue = asyncio.Queue() + try: + await dispatch_single_view( + Layout(component_instance), + self.send_json, + recv_queue.get, + ) + except Exception: + await self.close() + raise From 2cf8e2036699d98f997ddfe4a8fc4127e3294fc1 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 27 Oct 2021 16:58:12 -0700 Subject: [PATCH 03/15] threading asyncio run --- src/django_idom/websocket_consumer.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 31f1aa38..a98c0042 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -2,6 +2,7 @@ import asyncio import json import logging +from threading import Thread from typing import Any from urllib.parse import parse_qsl @@ -23,13 +24,17 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: async def connect(self) -> None: await super().connect() - self._idom_dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop()) + self._idom_dispatcher_future = Thread( + target=asyncio.run, + args=(self._run_dispatch_loop(),), + ) + self._idom_dispatcher_future.start() async def disconnect(self, code: int) -> None: - if self._idom_dispatcher_future.done(): - await self._idom_dispatcher_future - else: - self._idom_dispatcher_future.cancel() + # if self._idom_dispatcher_future.done(): + # await self._idom_dispatcher_future + # else: + # self._idom_dispatcher_future.cancel() await super().disconnect(code) async def receive_json(self, content: Any, **kwargs: Any) -> None: @@ -63,6 +68,7 @@ async def _run_dispatch_loop(self): self.send_json, recv_queue.get, ) + print("complete") except Exception: await self.close() raise From 698e4edfadf2d08a1169d4e6ff7b6ecff4f83195 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 27 Oct 2021 18:53:58 -0700 Subject: [PATCH 04/15] working implementation (thread-safe queue --- requirements/pkg-deps.txt | 1 + src/django_idom/websocket_consumer.py | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/requirements/pkg-deps.txt b/requirements/pkg-deps.txt index dbbaf6b6..eb6dbc21 100644 --- a/requirements/pkg-deps.txt +++ b/requirements/pkg-deps.txt @@ -1,2 +1,3 @@ channels<4.0.0 # Django websocket features idom >=0.33.0, <0.34.0 +janus < 1.0.0 \ No newline at end of file diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index a98c0042..fd7bd95a 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -6,6 +6,7 @@ from typing import Any from urllib.parse import parse_qsl +import janus from channels.generic.websocket import AsyncJsonWebsocketConsumer from idom.core.dispatcher import dispatch_single_view from idom.core.layout import Layout, LayoutEvent @@ -24,11 +25,12 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: async def connect(self) -> None: await super().connect() - self._idom_dispatcher_future = Thread( + self._idom_dispatcher_thread = Thread( target=asyncio.run, args=(self._run_dispatch_loop(),), ) - self._idom_dispatcher_future.start() + self._idom_dispatcher_thread.daemon = True + self._idom_dispatcher_thread.start() async def disconnect(self, code: int) -> None: # if self._idom_dispatcher_future.done(): @@ -61,14 +63,14 @@ async def _run_dispatch_loop(self): ) return - self._idom_recv_queue = recv_queue = asyncio.Queue() + # Thread-safe queue + self._idom_recv_queue = janus.Queue().async_q try: await dispatch_single_view( Layout(component_instance), self.send_json, - recv_queue.get, + self._idom_recv_queue.get, ) - print("complete") except Exception: await self.close() raise From 2ee570eb2fe9fc4604d719a193ddc299bae27093 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 27 Oct 2021 19:05:43 -0700 Subject: [PATCH 05/15] close out thread before disconnect --- src/django_idom/websocket_consumer.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index fd7bd95a..1938bf7b 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -33,11 +33,8 @@ async def connect(self) -> None: self._idom_dispatcher_thread.start() async def disconnect(self, code: int) -> None: - # if self._idom_dispatcher_future.done(): - # await self._idom_dispatcher_future - # else: - # self._idom_dispatcher_future.cancel() await super().disconnect(code) + self._idom_dispatcher_thread.join(timeout=0.05) async def receive_json(self, content: Any, **kwargs: Any) -> None: await self._idom_recv_queue.put(LayoutEvent(**content)) From b8a59d8f99055d054aac6279a82f8a9387643244 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 27 Oct 2021 19:22:03 -0700 Subject: [PATCH 06/15] do not block on Thread.join() --- src/django_idom/websocket_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 1938bf7b..fef2dbb7 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -34,7 +34,7 @@ async def connect(self) -> None: async def disconnect(self, code: int) -> None: await super().disconnect(code) - self._idom_dispatcher_thread.join(timeout=0.05) + self._idom_dispatcher_thread.join(timeout=0) async def receive_json(self, content: Any, **kwargs: Any) -> None: await self._idom_recv_queue.put(LayoutEvent(**content)) From 0cc0915e26fd2f24841084608621c51e76148d83 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 27 Oct 2021 23:01:23 -0700 Subject: [PATCH 07/15] remove useless super --- src/django_idom/websocket_consumer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index fef2dbb7..32defc54 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -33,7 +33,6 @@ async def connect(self) -> None: self._idom_dispatcher_thread.start() async def disconnect(self, code: int) -> None: - await super().disconnect(code) self._idom_dispatcher_thread.join(timeout=0) async def receive_json(self, content: Any, **kwargs: Any) -> None: From 94d1892461c59b473c2aae31a18e3f5db9abbdc6 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Thu, 28 Oct 2021 01:45:43 -0700 Subject: [PATCH 08/15] add daemon as constructor arg --- src/django_idom/websocket_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 32defc54..3edee9d6 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -28,8 +28,8 @@ async def connect(self) -> None: self._idom_dispatcher_thread = Thread( target=asyncio.run, args=(self._run_dispatch_loop(),), + daemon=True, ) - self._idom_dispatcher_thread.daemon = True self._idom_dispatcher_thread.start() async def disconnect(self, code: int) -> None: From 13989c60663e3ee032b37b1b8b6e9b4846063063 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Sat, 30 Oct 2021 00:11:31 -0700 Subject: [PATCH 09/15] close thread via exception --- src/django_idom/websocket_consumer.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 3edee9d6..42dacab8 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -2,7 +2,7 @@ import asyncio import json import logging -from threading import Thread +import threading from typing import Any from urllib.parse import parse_qsl @@ -25,18 +25,25 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: async def connect(self) -> None: await super().connect() - self._idom_dispatcher_thread = Thread( + # Thread-safe queue + self._recv_queue = janus.Queue().async_q + + # Run render as thread + self._disconnected = threading.Event() + self._dispatcher_thread = threading.Thread( target=asyncio.run, args=(self._run_dispatch_loop(),), daemon=True, ) - self._idom_dispatcher_thread.start() + self._dispatcher_thread.start() async def disconnect(self, code: int) -> None: - self._idom_dispatcher_thread.join(timeout=0) + self._disconnected.set() + await self._recv_queue.put(None) + self._dispatcher_thread.join(timeout=0) async def receive_json(self, content: Any, **kwargs: Any) -> None: - await self._idom_recv_queue.put(LayoutEvent(**content)) + await self._recv_queue.put(LayoutEvent(**content)) async def _run_dispatch_loop(self): view_id = self.scope["url_route"]["kwargs"]["view_id"] @@ -59,14 +66,14 @@ async def _run_dispatch_loop(self): ) return - # Thread-safe queue - self._idom_recv_queue = janus.Queue().async_q try: await dispatch_single_view( Layout(component_instance), self.send_json, - self._idom_recv_queue.get, + self._recv_queue.get, ) except Exception: await self.close() - raise + self._disconnected.wait() + if not self._disconnected.is_set(): + raise From 3db5aa4367aa7c792ceec2fe0e8014425a6ed194 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Sat, 30 Oct 2021 00:16:26 -0700 Subject: [PATCH 10/15] mvoe recv_queue back to dispatch loop --- src/django_idom/websocket_consumer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 42dacab8..94334549 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -25,8 +25,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: async def connect(self) -> None: await super().connect() - # Thread-safe queue - self._recv_queue = janus.Queue().async_q # Run render as thread self._disconnected = threading.Event() @@ -66,6 +64,9 @@ async def _run_dispatch_loop(self): ) return + # Thread-safe queue + self._recv_queue = janus.Queue().async_q + try: await dispatch_single_view( Layout(component_instance), From 03ee090e6d20f6a6d609308c7252b6481ed3f2b3 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Sat, 30 Oct 2021 00:59:14 -0700 Subject: [PATCH 11/15] unbork exception logic --- src/django_idom/websocket_consumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 94334549..ced5fa73 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -74,7 +74,8 @@ async def _run_dispatch_loop(self): self._recv_queue.get, ) except Exception: - await self.close() self._disconnected.wait() if not self._disconnected.is_set(): + await self.close() raise + await self.close() From 866030182dea92d98360fd4487bb2a124796831b Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 22 Dec 2021 04:43:58 -0800 Subject: [PATCH 12/15] reset websocket consumer to main --- src/django_idom/websocket_consumer.py | 47 ++++++++++++++------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 8d2ded7d..349a8366 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -2,11 +2,12 @@ import asyncio import json import logging -import threading -from typing import Any +from dataclasses import dataclass +from typing import Any, Awaitable, Callable, Optional from urllib.parse import parse_qsl -import janus +from channels.auth import login +from channels.db import database_sync_to_async as convert_to_async from channels.generic.websocket import AsyncJsonWebsocketConsumer from idom.core.dispatcher import dispatch_single_view from idom.core.layout import Layout, LayoutEvent @@ -31,22 +32,27 @@ class IdomAsyncWebsocketConsumer(AsyncJsonWebsocketConsumer): async def connect(self) -> None: await super().connect() - # Run render as thread - self._disconnected = threading.Event() - self._dispatcher_thread = threading.Thread( - target=asyncio.run, - args=(self._run_dispatch_loop(),), - daemon=True, - ) - self._dispatcher_thread.start() + user = self.scope.get("user") + if user and user.is_authenticated: + try: + await login(self.scope, user) + await convert_to_async(self.scope["session"].save)() + except Exception: + _logger.exception("IDOM websocket authentication has failed!") + elif user is None: + _logger.warning("IDOM websocket is missing AuthMiddlewareStack!") + + self._idom_dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop()) async def disconnect(self, code: int) -> None: - self._disconnected.set() - await self._recv_queue.put(None) - self._dispatcher_thread.join(timeout=0) + if self._idom_dispatcher_future.done(): + await self._idom_dispatcher_future + else: + self._idom_dispatcher_future.cancel() + await super().disconnect(code) async def receive_json(self, content: Any, **kwargs: Any) -> None: - await self._recv_queue.put(LayoutEvent(**content)) + await self._idom_recv_queue.put(LayoutEvent(**content)) async def _run_dispatch_loop(self): view_id = self.scope["url_route"]["kwargs"]["view_id"] @@ -72,18 +78,13 @@ async def _run_dispatch_loop(self): ) return - # Thread-safe queue - self._recv_queue = janus.Queue().async_q - + self._idom_recv_queue = recv_queue = asyncio.Queue() try: await dispatch_single_view( Layout(component_instance), self.send_json, - self._recv_queue.get, + recv_queue.get, ) except Exception: - self._disconnected.wait() - if not self._disconnected.is_set(): - await self.close() - raise await self.close() + raise From fb229daa48ccd63f835658b39b1f27d58e7efdff Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 22 Dec 2021 04:44:29 -0800 Subject: [PATCH 13/15] var cleanup --- src/django_idom/websocket_consumer.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 349a8366..1216fb53 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -42,17 +42,17 @@ async def connect(self) -> None: elif user is None: _logger.warning("IDOM websocket is missing AuthMiddlewareStack!") - self._idom_dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop()) + self._dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop()) async def disconnect(self, code: int) -> None: - if self._idom_dispatcher_future.done(): - await self._idom_dispatcher_future + if self._dispatcher_future.done(): + await self._dispatcher_future else: - self._idom_dispatcher_future.cancel() + self._dispatcher_future.cancel() await super().disconnect(code) async def receive_json(self, content: Any, **kwargs: Any) -> None: - await self._idom_recv_queue.put(LayoutEvent(**content)) + await self._recv_queue.put(LayoutEvent(**content)) async def _run_dispatch_loop(self): view_id = self.scope["url_route"]["kwargs"]["view_id"] @@ -78,7 +78,7 @@ async def _run_dispatch_loop(self): ) return - self._idom_recv_queue = recv_queue = asyncio.Queue() + self._recv_queue = recv_queue = asyncio.Queue() try: await dispatch_single_view( Layout(component_instance), From d0d2e995dd91091f61477de5390e3e00f6a80964 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 22 Dec 2021 05:03:28 -0800 Subject: [PATCH 14/15] new implementation --- src/django_idom/websocket_consumer.py | 29 ++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/django_idom/websocket_consumer.py b/src/django_idom/websocket_consumer.py index 1216fb53..c5cb3af5 100644 --- a/src/django_idom/websocket_consumer.py +++ b/src/django_idom/websocket_consumer.py @@ -3,6 +3,7 @@ import json import logging from dataclasses import dataclass +from threading import Thread from typing import Any, Awaitable, Callable, Optional from urllib.parse import parse_qsl @@ -18,6 +19,11 @@ _logger = logging.getLogger(__name__) +def start_background_loop(loop: asyncio.AbstractEventLoop) -> None: + asyncio.set_event_loop(loop) + loop.run_forever() + + @dataclass class WebsocketConnection: scope: dict @@ -42,16 +48,25 @@ async def connect(self) -> None: elif user is None: _logger.warning("IDOM websocket is missing AuthMiddlewareStack!") - self._dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop()) + self._recv_queue_loop = asyncio.new_event_loop() + t = Thread( + target=start_background_loop, args=[self._recv_queue_loop], daemon=True + ) + t.start() + asyncio.run_coroutine_threadsafe( + self._run_dispatch_loop(), self._recv_queue_loop + ) async def disconnect(self, code: int) -> None: - if self._dispatcher_future.done(): - await self._dispatcher_future - else: - self._dispatcher_future.cancel() + self._recv_queue_loop.stop() await super().disconnect(code) async def receive_json(self, content: Any, **kwargs: Any) -> None: + asyncio.run_coroutine_threadsafe( + self._recv_queue_put(content, **kwargs), self._recv_queue_loop + ) + + async def _recv_queue_put(self, content: Any, **kwargs: Any): await self._recv_queue.put(LayoutEvent(**content)) async def _run_dispatch_loop(self): @@ -78,12 +93,12 @@ async def _run_dispatch_loop(self): ) return - self._recv_queue = recv_queue = asyncio.Queue() + self._recv_queue = asyncio.Queue() try: await dispatch_single_view( Layout(component_instance), self.send_json, - recv_queue.get, + self._recv_queue.get, ) except Exception: await self.close() From cee61743cfc9bcb4b46c1d900dfd208cde37b043 Mon Sep 17 00:00:00 2001 From: Archmonger <16909269+Archmonger@users.noreply.github.com> Date: Wed, 22 Dec 2021 05:11:11 -0800 Subject: [PATCH 15/15] remove janus --- requirements/pkg-deps.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements/pkg-deps.txt b/requirements/pkg-deps.txt index eb6dbc21..dbbaf6b6 100644 --- a/requirements/pkg-deps.txt +++ b/requirements/pkg-deps.txt @@ -1,3 +1,2 @@ channels<4.0.0 # Django websocket features idom >=0.33.0, <0.34.0 -janus < 1.0.0 \ No newline at end of file