forked from reactive-python/reactpy-django
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhooks.py
233 lines (190 loc) · 6.65 KB
/
hooks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
from __future__ import annotations
import asyncio
import logging
from typing import (
Any,
Awaitable,
Callable,
DefaultDict,
Sequence,
Union,
cast,
overload,
)
from channels.db import database_sync_to_async as _database_sync_to_async
from idom import use_callback, use_ref
from idom.backend.types import Location
from idom.core.hooks import Context, create_context, use_context, use_effect, use_state
from django_idom.types import (
IdomWebsocket,
Mutation,
Query,
QueryOptions,
_Params,
_Result,
)
from django_idom.utils import _generate_obj_name
_logger = logging.getLogger(__name__)
database_sync_to_async = cast(
Callable[..., Callable[..., Awaitable[Any]]],
_database_sync_to_async,
)
WebsocketContext: Context[IdomWebsocket | None] = create_context(None)
_REFETCH_CALLBACKS: DefaultDict[
Callable[..., Any], set[Callable[[], None]]
] = DefaultDict(set)
def use_location() -> Location:
"""Get the current route as a `Location` object"""
# TODO: Use the browser's current page, rather than the WS route
scope = use_scope()
search = scope["query_string"].decode()
return Location(
scope["path"], f"?{search}" if (search and (search != "undefined")) else ""
)
def use_origin() -> str | None:
"""Get the current origin as a string. If the browser did not send an origin header,
this will be None."""
scope = use_scope()
try:
return next(
(
header[1].decode("utf-8")
for header in scope["headers"]
if header[0] == b"origin"
),
None,
)
except Exception:
return None
def use_scope() -> dict[str, Any]:
"""Get the current ASGI scope dictionary"""
return use_websocket().scope
def use_websocket() -> IdomWebsocket:
"""Get the current IdomWebsocket object"""
websocket = use_context(WebsocketContext)
if websocket is None:
raise RuntimeError("No websocket. Are you running with a Django server?")
return websocket
@overload
def use_query(
options: QueryOptions,
query: Callable[_Params, _Result | None],
/,
*args: _Params.args,
**kwargs: _Params.kwargs,
) -> Query[_Result | None]:
...
@overload
def use_query(
query: Callable[_Params, _Result | None],
/,
*args: _Params.args,
**kwargs: _Params.kwargs,
) -> Query[_Result | None]:
...
def use_query(
*args: Any,
**kwargs: Any,
) -> Query[_Result | None]:
"""Hook to fetch a Django ORM query.
Args:
options: An optional `QueryOptions` object that can modify how the query is executed.
query: A callable that returns a Django `Model` or `QuerySet`.
*args: Positional arguments to pass into `query`.
Keyword Args:
**kwargs: Keyword arguments to pass into `query`."""
if isinstance(args[0], QueryOptions):
query_options = args[0]
query = args[1]
args = args[2:]
else:
query_options = QueryOptions()
query = args[0]
args = args[1:]
query_ref = use_ref(query)
if query_ref.current is not query:
raise ValueError(f"Query function changed from {query_ref.current} to {query}.")
should_execute, set_should_execute = use_state(True)
data, set_data = use_state(cast(Union[_Result, None], None))
loading, set_loading = use_state(True)
error, set_error = use_state(cast(Union[Exception, None], None))
@use_callback
def refetch() -> None:
set_should_execute(True)
set_loading(True)
set_error(None)
@use_effect(dependencies=[])
def add_refetch_callback() -> Callable[[], None]:
# By tracking callbacks globally, any usage of the query function will be re-run
# if the user has told a mutation to refetch it.
_REFETCH_CALLBACKS[query].add(refetch)
return lambda: _REFETCH_CALLBACKS[query].remove(refetch)
@use_effect(dependencies=None)
@database_sync_to_async
def execute_query() -> None:
if not should_execute:
return
try:
# Run the initial query
new_data = query(*args, **kwargs)
if query_options.postprocessor:
new_data = query_options.postprocessor(
new_data, **query_options.postprocessor_kwargs
)
except Exception as e:
set_data(None)
set_loading(False)
set_error(e)
_logger.exception(
f"Failed to execute query: {_generate_obj_name(query) or query}"
)
return
finally:
set_should_execute(False)
set_data(new_data)
set_loading(False)
set_error(None)
return Query(data, loading, error, refetch)
def use_mutation(
mutate: Callable[_Params, bool | None],
refetch: Callable[..., Any] | Sequence[Callable[..., Any]] | None = None,
) -> Mutation[_Params]:
"""Hook to create, update, or delete Django ORM objects.
Args:
mutate: A callable that performs Django ORM create, update, or delete
functionality. If this function returns `False`, then your `refetch`
function will not be used.
refetch: A `query` function (used by the `use_query` hook) or a sequence of `query`
functions that will be called if the mutation succeeds. This is useful for
refetching data after a mutation has been performed.
"""
loading, set_loading = use_state(False)
error, set_error = use_state(cast(Union[Exception, None], None))
@use_callback
def call(*args: _Params.args, **kwargs: _Params.kwargs) -> None:
set_loading(True)
@database_sync_to_async
def execute_mutation() -> None:
try:
should_refetch = mutate(*args, **kwargs)
except Exception as e:
set_loading(False)
set_error(e)
_logger.exception(
f"Failed to execute mutation: {_generate_obj_name(mutate) or mutate}"
)
else:
set_loading(False)
set_error(None)
# `refetch` will execute unless explicitly told not to
# or if `refetch` was not defined.
if should_refetch is not False and refetch:
for query in (refetch,) if callable(refetch) else refetch:
for callback in _REFETCH_CALLBACKS.get(query) or ():
callback()
asyncio.ensure_future(execute_mutation())
@use_callback
def reset() -> None:
set_loading(False)
set_error(None)
return Mutation(call, loading, error, reset)