Skip to content

Commit ce8e060

Browse files
committed
refactor dispatchers
Simplify them greatly by making them normal functions. This avoids a lot of indirection and complexity caused by inheritance. In addition this fixes some impropper usages of TaskGroups which was causing problems in the SharedClientStateServer implementations. This refactor also allowed us to get rid of the clunky HasAsyncResources util class. In the end it just added confusion.
1 parent 13fca66 commit ce8e060

19 files changed

+445
-645
lines changed

docs/source/core-concepts.rst

+13-19
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ ever be removed from the model. Then you'll just need to call and await a
6363

6464
.. testcode::
6565

66-
async with idom.Layout(ClickCount()) as layout:
66+
with idom.Layout(ClickCount()) as layout:
6767
patch = await layout.render()
6868

6969
The layout also handles the triggering of event handlers. Normally these are
@@ -88,7 +88,7 @@ which we can re-render and see what changed:
8888

8989
return idom.html.button({"onClick": handler}, [f"Click count: {count}"])
9090

91-
async with idom.Layout(ClickCount()) as layout:
91+
with idom.Layout(ClickCount()) as layout:
9292
patch_1 = await layout.render()
9393

9494
fake_event = LayoutEvent(target=static_handler.target, data=[{}])
@@ -111,20 +111,20 @@ which we can re-render and see what changed:
111111
Layout Dispatcher
112112
-----------------
113113

114-
An :class:`~idom.core.dispatcher.AbstractDispatcher` implementation is a relatively thin layer
115-
of logic around a :class:`~idom.core.layout.Layout` which drives the triggering of
116-
events and layout updates by scheduling an asynchronous loop that will run forever -
117-
effectively animating the model. To execute the loop, the dispatcher's
118-
:meth:`~idom.core.dispatcher.AbstractDispatcher.run` method accepts two callbacks. One is a
119-
"send" callback to which the dispatcher passes updates, while the other is "receive"
120-
callback that's called by the dispatcher to events it should execute.
114+
A "dispatcher" implementation is a relatively thin layer of logic around a
115+
:class:`~idom.core.layout.Layout` which drives the triggering of events and updates by
116+
scheduling an asynchronous loop that will run forever - effectively animating the model.
117+
The simplest dispatcher is :func:`~idom.core.dispatcher.dispatch_single_view` which
118+
accepts three arguments. The first is a :class:`~idom.core.layout.Layout`, the second is
119+
a "send" callback to which the dispatcher passes updates, and the third is a "receive"
120+
callback that's called by the dispatcher to collect events it should execute.
121121

122122
.. testcode::
123123

124124
import asyncio
125125

126-
from idom.core import SingleViewDispatcher, EventHandler
127126
from idom.core.layout import LayoutEvent
127+
from idom.core.dispatch import dispatch_single_view
128128

129129

130130
sent_patches = []
@@ -148,20 +148,14 @@ callback that's called by the dispatcher to events it should execute.
148148
return event
149149

150150

151-
async with SingleViewDispatcher(idom.Layout(ClickCount())) as dispatcher:
152-
context = None # see note below
153-
await dispatcher.run(send, recv, context)
154-
151+
await dispatch_single_view(idom.Layout(ClickCount()), send, recv)
155152
assert len(sent_patches) == 5
156153

157154

158155
.. note::
159156

160-
``context`` is information that's specific to the
161-
:class:`~idom.core.dispatcher.AbstractDispatcher` implementation. In the case of
162-
the :class:`~idom.core.dispatcher.SingleViewDispatcher` it doesn't require any
163-
context. On the other hand the :class:`~idom.core.dispatcher.SharedViewDispatcher`
164-
requires a client ID as its piece of contextual information.
157+
The :func:`~idom.core.dispatcher.create_shared_view_dispatcher`, while more complex
158+
in its usage, allows multiple clients to share one synchronized view.
165159

166160

167161
Layout Server

requirements/pkg-deps.txt

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
typing-extensions >=3.7.4
22
mypy-extensions >=0.4.3
33
anyio >=2.0
4-
async_generator >=1.10; python_version<"3.7"
54
async_exit_stack >=1.0.1; python_version<"3.7"
65
jsonpatch >=1.26
76
typer >=0.3.2

src/idom/core/__init__.py

-23
Original file line numberDiff line numberDiff line change
@@ -1,23 +0,0 @@
1-
from .component import AbstractComponent, Component, ComponentConstructor, component
2-
from .dispatcher import AbstractDispatcher, SharedViewDispatcher, SingleViewDispatcher
3-
from .events import EventHandler, Events, event
4-
from .layout import Layout
5-
from .vdom import vdom
6-
7-
8-
__all__ = [
9-
"AbstractComponent",
10-
"Layout",
11-
"AbstractDispatcher",
12-
"component",
13-
"Component",
14-
"EventHandler",
15-
"ComponentConstructor",
16-
"event",
17-
"Events",
18-
"hooks",
19-
"Layout",
20-
"vdom",
21-
"SharedViewDispatcher",
22-
"SingleViewDispatcher",
23-
]

src/idom/core/dispatcher.py

+123-110
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
1-
import abc
2-
import asyncio
1+
from __future__ import annotations
2+
3+
import sys
4+
from asyncio import Future, Queue
5+
from asyncio.tasks import FIRST_COMPLETED, ensure_future, gather, wait
36
from logging import getLogger
4-
from typing import Any, AsyncIterator, Awaitable, Callable, Dict
7+
from typing import Any, AsyncIterator, Awaitable, Callable, List, Sequence, Tuple
8+
from weakref import WeakSet
59

610
from anyio import create_task_group
7-
from anyio.abc import TaskGroup
11+
12+
from idom.utils import Ref
813

914
from .layout import Layout, LayoutEvent, LayoutUpdate
10-
from .utils import HasAsyncResources, async_resource
15+
16+
17+
if sys.version_info >= (3, 7): # pragma: no cover
18+
from contextlib import asynccontextmanager # noqa
19+
else: # pragma: no cover
20+
from async_generator import asynccontextmanager
1121

1222

1323
logger = getLogger(__name__)
@@ -16,136 +26,139 @@
1626
RecvCoroutine = Callable[[], Awaitable[LayoutEvent]]
1727

1828

19-
class AbstractDispatcher(HasAsyncResources, abc.ABC):
20-
"""A base class for implementing :class:`~idom.core.layout.Layout` dispatchers."""
29+
async def dispatch_single_view(
30+
layout: Layout,
31+
send: SendCoroutine,
32+
recv: RecvCoroutine,
33+
) -> None:
34+
with layout:
35+
async with create_task_group() as task_group:
36+
task_group.start_soon(_single_outgoing_loop, layout, send)
37+
task_group.start_soon(_single_incoming_loop, layout, recv)
2138

22-
__slots__ = "_layout"
2339

24-
def __init__(self, layout: Layout) -> None:
25-
super().__init__()
26-
self._layout = layout
40+
_SharedDispatchFuture = Callable[[SendCoroutine, RecvCoroutine], Future]
2741

28-
async def start(self) -> None:
29-
await self.__aenter__()
3042

31-
async def stop(self) -> None:
32-
await self.task_group.cancel_scope.cancel()
33-
await self.__aexit__(None, None, None)
43+
@asynccontextmanager
44+
async def create_shared_view_dispatcher(
45+
layout: Layout, run_forever: bool = False
46+
) -> AsyncIterator[_SharedDispatchFuture]:
47+
with layout:
48+
(
49+
dispatch_shared_view,
50+
model_state,
51+
all_update_queues,
52+
) = await _make_shared_view_dispatcher(layout)
3453

35-
@async_resource
36-
async def layout(self) -> AsyncIterator[Layout]:
37-
async with self._layout as layout:
38-
yield layout
54+
dispatch_tasks: List[Future] = []
3955

40-
@async_resource
41-
async def task_group(self) -> AsyncIterator[TaskGroup]:
42-
async with create_task_group() as group:
43-
yield group
56+
def dispatch_shared_view_soon(
57+
send: SendCoroutine, recv: RecvCoroutine
58+
) -> Future:
59+
future = ensure_future(dispatch_shared_view(send, recv))
60+
dispatch_tasks.append(future)
61+
return future
4462

45-
async def run(self, send: SendCoroutine, recv: RecvCoroutine, context: Any) -> None:
46-
"""Start an unending loop which will drive the layout.
63+
yield dispatch_shared_view_soon
4764

48-
This will call :meth:`AbstractLayout.render` and :meth:`Layout.dispatch`
49-
to render new models and execute events respectively.
50-
"""
51-
await self.task_group.spawn(self._outgoing_loop, send, context)
52-
await self.task_group.spawn(self._incoming_loop, recv, context)
53-
return None
65+
gathered_dispatch_tasks = gather(*dispatch_tasks, return_exceptions=True)
5466

55-
async def _outgoing_loop(self, send: SendCoroutine, context: Any) -> None:
56-
try:
57-
while True:
58-
await send(await self._outgoing(self.layout, context))
59-
except Exception:
60-
logger.info("Failed to send outgoing update", exc_info=True)
61-
raise
67+
while True:
68+
(
69+
update_future,
70+
dispatchers_completed_future,
71+
) = await _wait_until_first_complete(
72+
layout.render(),
73+
gathered_dispatch_tasks,
74+
)
75+
76+
if dispatchers_completed_future.done():
77+
update_future.cancel()
78+
break
79+
else:
80+
update: LayoutUpdate = update_future.result()
81+
82+
model_state.current = update.apply_to(model_state.current)
83+
# push updates to all dispatcher callbacks
84+
for queue in all_update_queues:
85+
queue.put_nowait(update)
86+
87+
88+
def ensure_shared_view_dispatcher_future(
89+
layout: Layout,
90+
) -> Tuple[Future, _SharedDispatchFuture]:
91+
dispatcher_future = Future()
92+
93+
async def dispatch_shared_view_forever():
94+
with layout:
95+
(
96+
dispatch_shared_view,
97+
model_state,
98+
all_update_queues,
99+
) = await _make_shared_view_dispatcher(layout)
100+
101+
dispatcher_future.set_result(dispatch_shared_view)
62102

63-
async def _incoming_loop(self, recv: RecvCoroutine, context: Any) -> None:
64-
try:
65103
while True:
66-
await self._incoming(self.layout, context, await recv())
67-
except Exception:
68-
logger.info("Failed to receive incoming event", exc_info=True)
69-
raise
70-
71-
@abc.abstractmethod
72-
async def _outgoing(self, layout: Layout, context: Any) -> Any:
73-
...
104+
update = await layout.render()
105+
model_state.current = update.apply_to(model_state.current)
106+
# push updates to all dispatcher callbacks
107+
for queue in all_update_queues:
108+
queue.put_nowait(update)
74109

75-
@abc.abstractmethod
76-
async def _incoming(self, layout: Layout, context: Any, message: Any) -> None:
77-
...
110+
async def dispatch(send: SendCoroutine, recv: RecvCoroutine) -> None:
111+
await (await dispatcher_future)(send, recv)
78112

113+
return ensure_future(dispatch_shared_view_forever()), dispatch
79114

80-
class SingleViewDispatcher(AbstractDispatcher):
81-
"""Each client of the dispatcher will get its own model.
82115

83-
..note::
84-
The ``context`` parameter of :meth:`SingleViewDispatcher.run` should just
85-
be ``None`` since it's not used.
86-
"""
116+
_SharedDispatchCoroutine = Callable[[SendCoroutine, RecvCoroutine], Awaitable[None]]
87117

88-
__slots__ = "_current_model_as_json"
89118

90-
def __init__(self, layout: Layout) -> None:
91-
super().__init__(layout)
92-
self._current_model_as_json = ""
119+
async def _make_shared_view_dispatcher(
120+
layout: Layout,
121+
) -> Tuple[_SharedDispatchCoroutine, Ref[Any], WeakSet[Queue[LayoutUpdate]]]:
122+
initial_update = await layout.render()
123+
model_state = Ref(initial_update.apply_to({}))
93124

94-
async def _outgoing(self, layout: Layout, context: Any) -> LayoutUpdate:
95-
return await layout.render()
125+
# We push updates to queues instead of pushing directly to send() callbacks in
126+
# order to isolate the render loop from any errors dispatch callbacks might
127+
# raise.
128+
all_update_queues: WeakSet[Queue[LayoutUpdate]] = WeakSet()
96129

97-
async def _incoming(self, layout: Layout, context: Any, event: LayoutEvent) -> None:
98-
await layout.dispatch(event)
130+
async def dispatch_shared_view(send: SendCoroutine, recv: RecvCoroutine) -> None:
131+
update_queue: Queue[LayoutUpdate] = Queue()
132+
async with create_task_group() as inner_task_group:
133+
all_update_queues.add(update_queue)
134+
await send(LayoutUpdate.create_from({}, model_state.current))
135+
inner_task_group.start_soon(_single_incoming_loop, layout, recv)
136+
inner_task_group.start_soon(_shared_outgoing_loop, send, update_queue)
99137
return None
100138

139+
return dispatch_shared_view, model_state, all_update_queues
101140

102-
class SharedViewDispatcher(SingleViewDispatcher):
103-
"""Each client of the dispatcher shares the same model.
104141

105-
The client's ID is indicated by the ``context`` argument of
106-
:meth:`SharedViewDispatcher.run`
107-
"""
142+
async def _single_outgoing_loop(layout: Layout, send: SendCoroutine) -> None:
143+
while True:
144+
await send(await layout.render())
108145

109-
__slots__ = "_update_queues", "_model_state"
110146

111-
def __init__(self, layout: Layout) -> None:
112-
super().__init__(layout)
113-
self._model_state: Any = {}
114-
self._update_queues: Dict[str, asyncio.Queue[LayoutUpdate]] = {}
147+
async def _single_incoming_loop(layout: Layout, recv: RecvCoroutine) -> None:
148+
while True:
149+
await layout.dispatch(await recv())
115150

116-
@async_resource
117-
async def task_group(self) -> AsyncIterator[TaskGroup]:
118-
async with create_task_group() as group:
119-
await group.spawn(self._render_loop)
120-
yield group
121151

122-
async def run(
123-
self, send: SendCoroutine, recv: RecvCoroutine, context: str, join: bool = False
124-
) -> None:
125-
await super().run(send, recv, context)
126-
if join:
127-
await self._join_event.wait()
152+
async def _shared_outgoing_loop(
153+
send: SendCoroutine, queue: Queue[LayoutUpdate]
154+
) -> None:
155+
while True:
156+
await send(await queue.get())
128157

129-
async def _render_loop(self) -> None:
130-
while True:
131-
update = await super()._outgoing(self.layout, None)
132-
self._model_state = update.apply_to(self._model_state)
133-
# append updates to all other contexts
134-
for queue in self._update_queues.values():
135-
await queue.put(update)
136-
137-
async def _outgoing_loop(self, send: SendCoroutine, context: Any) -> None:
138-
self._update_queues[context] = asyncio.Queue()
139-
await send(LayoutUpdate.create_from({}, self._model_state))
140-
await super()._outgoing_loop(send, context)
141-
142-
async def _outgoing(self, layout: Layout, context: str) -> LayoutUpdate:
143-
return await self._update_queues[context].get()
144-
145-
@async_resource
146-
async def _join_event(self) -> AsyncIterator[asyncio.Event]:
147-
event = asyncio.Event()
148-
try:
149-
yield event
150-
finally:
151-
event.set()
158+
159+
async def _wait_until_first_complete(
160+
*tasks: Awaitable[Any],
161+
) -> Sequence[Future]:
162+
futures = [ensure_future(t) for t in tasks]
163+
await wait(futures, return_when=FIRST_COMPLETED)
164+
return futures

src/idom/core/events.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ async def __call__(self, data: List[Any]) -> Any:
192192
if self._coro_handlers:
193193
async with create_task_group() as group:
194194
for handler in self._coro_handlers:
195-
await group.spawn(handler, *data)
195+
group.start_soon(handler, *data)
196196
for handler in self._func_handlers:
197197
handler(*data)
198198

src/idom/core/hooks.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -387,8 +387,8 @@ class LifeCycleHook:
387387

388388
def __init__(
389389
self,
390-
component: AbstractComponent,
391390
layout: idom.core.layout.Layout,
391+
component: AbstractComponent,
392392
) -> None:
393393
self.component = component
394394
self._layout = weakref.ref(layout)

0 commit comments

Comments
 (0)