1
1
from __future__ import annotations
2
2
3
3
import abc
4
- import asyncio
4
+ from asyncio import (
5
+ FIRST_COMPLETED ,
6
+ Event ,
7
+ Queue ,
8
+ Task ,
9
+ create_task ,
10
+ gather ,
11
+ get_running_loop ,
12
+ wait ,
13
+ )
5
14
from collections import Counter
6
15
from collections .abc import Iterator
7
16
from contextlib import AsyncExitStack
@@ -41,6 +50,7 @@ class Layout:
41
50
"root" ,
42
51
"_event_handlers" ,
43
52
"_rendering_queue" ,
53
+ "_render_tasks" ,
44
54
"_root_life_cycle_state_id" ,
45
55
"_model_states_by_life_cycle_state_id" ,
46
56
)
@@ -58,6 +68,7 @@ def __init__(self, root: ComponentType) -> None:
58
68
async def __aenter__ (self ) -> Layout :
59
69
# create attributes here to avoid access before entering context manager
60
70
self ._event_handlers : EventHandlerDict = {}
71
+ self ._render_tasks : set [Task [LayoutUpdateMessage ]] = set ()
61
72
62
73
self ._rendering_queue : _ThreadSafeQueue [_LifeCycleStateId ] = _ThreadSafeQueue ()
63
74
root_model_state = _new_root_model_state (self .root , self ._rendering_queue .put )
@@ -72,6 +83,7 @@ async def __aenter__(self) -> Layout:
72
83
async def __aexit__ (self , * exc : Any ) -> None :
73
84
root_csid = self ._root_life_cycle_state_id
74
85
root_model_state = self ._model_states_by_life_cycle_state_id [root_csid ]
86
+ await gather (* self ._render_tasks , return_exceptions = True )
75
87
await self ._unmount_model_states ([root_model_state ])
76
88
77
89
# delete attributes here to avoid access after exiting context manager
@@ -102,21 +114,35 @@ async def deliver(self, event: LayoutEventMessage) -> None:
102
114
async def render (self ) -> LayoutUpdateMessage :
103
115
"""Await the next available render. This will block until a component is updated"""
104
116
while True :
105
- model_state_id = await self ._rendering_queue .get ()
106
- try :
107
- model_state = self ._model_states_by_life_cycle_state_id [model_state_id ]
108
- except KeyError :
109
- logger .debug (
110
- "Did not render component with model state ID "
111
- f"{ model_state_id !r} - component already unmounted"
112
- )
117
+ render_completed = (
118
+ create_task (wait (self ._render_tasks , return_when = FIRST_COMPLETED ))
119
+ if self ._render_tasks
120
+ else get_running_loop ().create_future ()
121
+ )
122
+ await wait (
123
+ (create_task (self ._rendering_queue .ready ()), render_completed ),
124
+ return_when = FIRST_COMPLETED ,
125
+ )
126
+ if render_completed .done ():
127
+ done , _ = await render_completed
128
+ update_task : Task [LayoutUpdateMessage ] = done .pop ()
129
+ self ._render_tasks .remove (update_task )
130
+ return update_task .result ()
113
131
else :
114
- update = await self ._create_layout_update (model_state )
115
- if REACTPY_CHECK_VDOM_SPEC .current :
116
- root_id = self ._root_life_cycle_state_id
117
- root_model = self ._model_states_by_life_cycle_state_id [root_id ]
118
- validate_vdom_json (root_model .model .current )
119
- return update
132
+ model_state_id = await self ._rendering_queue .get ()
133
+ try :
134
+ model_state = self ._model_states_by_life_cycle_state_id [
135
+ model_state_id
136
+ ]
137
+ except KeyError :
138
+ logger .debug (
139
+ "Did not render component with model state ID "
140
+ f"{ model_state_id !r} - component already unmounted"
141
+ )
142
+ else :
143
+ self ._render_tasks .add (
144
+ create_task (self ._create_layout_update (model_state ))
145
+ )
120
146
121
147
async def _create_layout_update (
122
148
self , old_state : _ModelState
@@ -127,6 +153,9 @@ async def _create_layout_update(
127
153
async with AsyncExitStack () as exit_stack :
128
154
await self ._render_component (exit_stack , old_state , new_state , component )
129
155
156
+ if REACTPY_CHECK_VDOM_SPEC .current :
157
+ validate_vdom_json (new_state .model .current )
158
+
130
159
return {
131
160
"type" : "layout-update" ,
132
161
"path" : new_state .patch_path ,
@@ -540,6 +569,7 @@ class _ModelState:
540
569
__slots__ = (
541
570
"__weakref__" ,
542
571
"_parent_ref" ,
572
+ "_render_semaphore" ,
543
573
"children_by_key" ,
544
574
"index" ,
545
575
"key" ,
@@ -651,24 +681,27 @@ class _LifeCycleState(NamedTuple):
651
681
652
682
653
683
class _ThreadSafeQueue (Generic [_Type ]):
654
- __slots__ = "_loop" , "_queue" , "_pending"
655
-
656
684
def __init__ (self ) -> None :
657
- self ._loop = asyncio . get_running_loop ()
658
- self ._queue : asyncio . Queue [_Type ] = asyncio . Queue ()
685
+ self ._loop = get_running_loop ()
686
+ self ._queue : Queue [_Type ] = Queue ()
659
687
self ._pending : set [_Type ] = set ()
688
+ self ._ready = Event ()
660
689
661
690
def put (self , value : _Type ) -> None :
662
691
if value not in self ._pending :
663
692
self ._pending .add (value )
664
693
self ._loop .call_soon_threadsafe (self ._queue .put_nowait , value )
694
+ self ._ready .set ()
695
+
696
+ async def ready (self ) -> None :
697
+ """Return when the next value is available"""
698
+ await self ._ready .wait ()
665
699
666
700
async def get (self ) -> _Type :
667
- while True :
668
- value = await self ._queue .get ()
669
- if value in self ._pending :
670
- break
701
+ value = await self ._queue .get ()
671
702
self ._pending .remove (value )
703
+ if not self ._pending :
704
+ self ._ready .clear ()
672
705
return value
673
706
674
707
0 commit comments