forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomparators.py
149 lines (112 loc) · 6.53 KB
/
comparators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from __future__ import annotations
from datetime import datetime, tzinfo
from typing import Any, Dict, Optional
from dateutil.tz import gettz
from aws_lambda_powertools.utilities.feature_flags.schema import HOUR_MIN_SEPARATOR, ModuloRangeValues, TimeValues
def _get_now_from_timezone(timezone: Optional[tzinfo]) -> datetime:
"""
Returns now in the specified timezone. Defaults to UTC if not present.
At this stage, we already validated that the passed timezone string is valid, so we assume that
gettz() will return a tzinfo object.
"""
timezone = gettz("UTC") if timezone is None else timezone
return datetime.now(timezone)
def compare_days_of_week(context_value: Any, condition_value: Dict) -> bool:
timezone_name = condition_value.get(TimeValues.TIMEZONE.value, "UTC")
# %A = Weekday as locale’s full name.
current_day = _get_now_from_timezone(gettz(timezone_name)).strftime("%A").upper()
days = condition_value.get(TimeValues.DAYS.value, [])
return current_day in days
def compare_datetime_range(context_value: Any, condition_value: Dict) -> bool:
timezone_name = condition_value.get(TimeValues.TIMEZONE.value, "UTC")
timezone = gettz(timezone_name)
current_time: datetime = _get_now_from_timezone(timezone)
start_date_str = condition_value.get(TimeValues.START.value, "")
end_date_str = condition_value.get(TimeValues.END.value, "")
# Since start_date and end_date doesn't include timezone information, we mark the timestamp
# with the same timezone as the current_time. This way all the 3 timestamps will be on
# the same timezone.
start_date = datetime.fromisoformat(start_date_str).replace(tzinfo=timezone)
end_date = datetime.fromisoformat(end_date_str).replace(tzinfo=timezone)
return start_date <= current_time <= end_date
def compare_time_range(context_value: Any, condition_value: Dict) -> bool:
timezone_name = condition_value.get(TimeValues.TIMEZONE.value, "UTC")
current_time: datetime = _get_now_from_timezone(gettz(timezone_name))
start_hour, start_min = condition_value.get(TimeValues.START.value, "").split(HOUR_MIN_SEPARATOR)
end_hour, end_min = condition_value.get(TimeValues.END.value, "").split(HOUR_MIN_SEPARATOR)
start_time = current_time.replace(hour=int(start_hour), minute=int(start_min))
end_time = current_time.replace(hour=int(end_hour), minute=int(end_min))
if int(end_hour) < int(start_hour):
# When the end hour is smaller than start hour, it means we are crossing a day's boundary.
# In this case we need to assert that current_time is **either** on one side or the other side of the boundary
#
# ┌─────┐ ┌─────┐ ┌─────┐
# │20.00│ │00.00│ │04.00│
# └─────┘ └─────┘ └─────┘
# ───────────────────────────────────────────┬─────────────────────────────────────────▶
# ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
# │ │ │
# │ either this area │ │ or this area
# │ │ │
# └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
# │
return (start_time <= current_time) or (current_time <= end_time)
else:
# In normal circumstances, we need to assert **both** conditions
return start_time <= current_time <= end_time
def compare_modulo_range(context_value: int, condition_value: Dict) -> bool:
"""
Returns for a given context 'a' and modulo condition 'b' -> b.start <= a % b.base <= b.end
"""
base = condition_value.get(ModuloRangeValues.BASE.value, 1)
start = condition_value.get(ModuloRangeValues.START.value, 1)
end = condition_value.get(ModuloRangeValues.END.value, 1)
return start <= context_value % base <= end
def compare_any_in_list(context_value: list, condition_value: list) -> bool:
"""Comparator for ANY_IN_VALUE action
Parameters
----------
context_value : list
user-defined context for flag evaluation
condition_value : list
schema value available for condition being evaluated
Returns
-------
bool
Whether any list item in context_value is available in condition_value
"""
if not isinstance(context_value, list):
raise ValueError("Context provided must be a list. Unable to compare ANY_IN_VALUE action.")
return any(key in condition_value for key in context_value)
def compare_all_in_list(context_value: list, condition_value: list) -> bool:
"""Comparator for ALL_IN_VALUE action
Parameters
----------
context_value : list
user-defined context for flag evaluation
condition_value : list
schema value available for condition being evaluated
Returns
-------
bool
Whether all list items in context_value are available in condition_value
"""
if not isinstance(context_value, list):
raise ValueError("Context provided must be a list. Unable to compare ALL_IN_VALUE action.")
return all(key in condition_value for key in context_value)
def compare_none_in_list(context_value: list, condition_value: list) -> bool:
"""Comparator for NONE_IN_VALUE action
Parameters
----------
context_value : list
user-defined context for flag evaluation
condition_value : list
schema value available for condition being evaluated
Returns
-------
bool
Whether list items in context_value are **not** available in condition_value
"""
if not isinstance(context_value, list):
raise ValueError("Context provided must be a list. Unable to compare NONE_IN_VALUE action.")
return all(key not in condition_value for key in context_value)