-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy path_registry.py
81 lines (67 loc) · 2.52 KB
/
_registry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from collections.abc import Callable
logger = logging.getLogger(__name__)
class ResolverRegistry:
def __init__(self):
self.resolvers: dict[str, dict[str, Any]] = {}
def register(
self,
type_name: str = "*",
field_name: str | None = None,
raise_on_error: bool = False,
aggregate: bool = True,
) -> Callable:
"""Registers the resolver for field_name
Parameters
----------
type_name : str
Type name
field_name : str
Field name
raise_on_error: bool
A flag indicating whether to raise an error when processing batches
with failed items. Defaults to False, which means errors are handled without raising exceptions.
aggregate: bool
A flag indicating whether the batch items should be processed at once or individually.
If True (default), the batch resolver will process all items in the batch as a single event.
If False, the batch resolver will process each item in the batch individually.
Return
----------
Callable
A Callable
"""
def _register(func) -> Callable:
logger.debug(f"Adding resolver `{func.__name__}` for field `{type_name}.{field_name}`")
self.resolvers[f"{type_name}.{field_name}"] = {
"func": func,
"raise_on_error": raise_on_error,
"aggregate": aggregate,
}
return func
return _register
def find_resolver(self, type_name: str, field_name: str) -> dict | None:
"""Find resolver based on type_name and field_name
Parameters
----------
type_name : str
Type name
field_name : str
Field name
Return
----------
Optional[Dict]
A dictionary with the resolver and if raise exception on error
"""
logger.debug(f"Looking for resolver for type={type_name}, field={field_name}.")
return self.resolvers.get(f"{type_name}.{field_name}", self.resolvers.get(f"*.{field_name}"))
def merge(self, other_registry: ResolverRegistry):
"""Update current registry with incoming registry
Parameters
----------
other_registry : ResolverRegistry
Registry to merge from
"""
self.resolvers.update(**other_registry.resolvers)