5
5
import asyncio
6
6
import contextlib
7
7
import logging
8
- from concurrent .futures import Future
9
8
from datetime import timedelta
10
9
from threading import Thread
11
10
from typing import Any , MutableMapping , Sequence
@@ -41,12 +40,12 @@ class ReactpyAsyncWebsocketConsumer(AsyncJsonWebsocketConsumer):
41
40
42
41
async def connect (self ) -> None :
43
42
"""The browser has connected."""
44
- from reactpy_django .config import REACTPY_AUTH_BACKEND , REACTPY_BACKHAUL_THREAD
43
+ from reactpy_django .config import REACTPY_AUTH_BACKEND
45
44
46
45
await super ().connect ()
47
46
48
47
# Authenticate the user, if possible
49
- user : Any = self .scope .get ("user" )
48
+ user = self .scope .get ("user" )
50
49
if user and user .is_authenticated :
51
50
try :
52
51
await login (self .scope , user , backend = REACTPY_AUTH_BACKEND )
@@ -77,33 +76,36 @@ async def connect(self) -> None:
77
76
)
78
77
79
78
# Start the component dispatcher
80
- self .dispatcher : Future | asyncio .Task
81
- self .threaded = REACTPY_BACKHAUL_THREAD
82
- if self .threaded :
83
- if not backhaul_thread .is_alive ():
84
- await asyncio .to_thread (
85
- _logger .debug , "Starting ReactPy backhaul thread."
86
- )
87
- backhaul_thread .start ()
88
- self .dispatcher = asyncio .run_coroutine_threadsafe (
89
- self .run_dispatcher (), backhaul_loop
90
- )
91
- else :
92
- self .dispatcher = asyncio .create_task (self .run_dispatcher ())
79
+ self .recv_queue : asyncio .Queue = asyncio .Queue ()
80
+ self .dispatcher = asyncio .create_task (self .run_dispatcher ())
93
81
94
82
async def disconnect (self , code : int ) -> None :
95
83
"""The browser has disconnected."""
96
84
self .dispatcher .cancel ()
85
+ await self .dispatcher
97
86
await super ().disconnect (code )
98
87
99
88
async def receive_json (self , content : Any , ** _ ) -> None :
100
89
"""Receive a message from the browser. Typically, messages are event signals."""
101
- if self .threaded :
102
- asyncio .run_coroutine_threadsafe (
103
- self .recv_queue .put (content ), backhaul_loop
104
- )
105
- else :
106
- await self .recv_queue .put (content )
90
+ await self .recv_queue .put (content )
91
+
92
+ async def dispatch (self , message ):
93
+ """Override the Django Channels dispatch method to allow running the ASGI
94
+ dispatcher in a thread."""
95
+ from reactpy_django .config import REACTPY_BACKHAUL_THREAD
96
+
97
+ if REACTPY_BACKHAUL_THREAD :
98
+ if not backhaul_thread .is_alive ():
99
+ await asyncio .to_thread (
100
+ _logger .debug , "Starting ReactPy backhaul thread."
101
+ )
102
+ backhaul_thread .start ()
103
+
104
+ return asyncio .run_coroutine_threadsafe (
105
+ super ().dispatch (message ), backhaul_loop
106
+ ).result ()
107
+
108
+ return await super ().dispatch (message )
107
109
108
110
async def run_dispatcher (self ):
109
111
"""Runs the main loop that performs component rendering tasks."""
@@ -117,7 +119,6 @@ async def run_dispatcher(self):
117
119
dotted_path = scope ["url_route" ]["kwargs" ]["dotted_path" ]
118
120
uuid = scope ["url_route" ]["kwargs" ]["uuid" ]
119
121
search = scope ["query_string" ].decode ()
120
- self .recv_queue : asyncio .Queue = asyncio .Queue ()
121
122
connection = Connection ( # For `use_connection`
122
123
scope = scope ,
123
124
location = Location (
0 commit comments