1
1
from datetime import datetime , tzinfo
2
- from typing import Dict , Optional
2
+ from typing import Any , Dict , Optional
3
3
4
4
from dateutil .tz import gettz
5
5
6
- from .schema import HOUR_MIN_SEPARATOR , TimeValues
6
+ from .schema import HOUR_MIN_SEPARATOR , ModuloRangeValues , TimeValues
7
7
8
8
9
9
def _get_now_from_timezone (timezone : Optional [tzinfo ]) -> datetime :
@@ -16,23 +16,23 @@ def _get_now_from_timezone(timezone: Optional[tzinfo]) -> datetime:
16
16
return datetime .now (timezone )
17
17
18
18
19
- def compare_days_of_week (action : str , values : Dict ) -> bool :
20
- timezone_name = values .get (TimeValues .TIMEZONE .value , "UTC" )
19
+ def compare_days_of_week (context_value : Any , condition_value : Dict ) -> bool :
20
+ timezone_name = condition_value .get (TimeValues .TIMEZONE .value , "UTC" )
21
21
22
22
# %A = Weekday as locale’s full name.
23
23
current_day = _get_now_from_timezone (gettz (timezone_name )).strftime ("%A" ).upper ()
24
24
25
- days = values .get (TimeValues .DAYS .value , [])
25
+ days = condition_value .get (TimeValues .DAYS .value , [])
26
26
return current_day in days
27
27
28
28
29
- def compare_datetime_range (action : str , values : Dict ) -> bool :
30
- timezone_name = values .get (TimeValues .TIMEZONE .value , "UTC" )
29
+ def compare_datetime_range (context_value : Any , condition_value : Dict ) -> bool :
30
+ timezone_name = condition_value .get (TimeValues .TIMEZONE .value , "UTC" )
31
31
timezone = gettz (timezone_name )
32
32
current_time : datetime = _get_now_from_timezone (timezone )
33
33
34
- start_date_str = values .get (TimeValues .START .value , "" )
35
- end_date_str = values .get (TimeValues .END .value , "" )
34
+ start_date_str = condition_value .get (TimeValues .START .value , "" )
35
+ end_date_str = condition_value .get (TimeValues .END .value , "" )
36
36
37
37
# Since start_date and end_date doesn't include timezone information, we mark the timestamp
38
38
# with the same timezone as the current_time. This way all the 3 timestamps will be on
@@ -42,12 +42,12 @@ def compare_datetime_range(action: str, values: Dict) -> bool:
42
42
return start_date <= current_time <= end_date
43
43
44
44
45
- def compare_time_range (action : str , values : Dict ) -> bool :
46
- timezone_name = values .get (TimeValues .TIMEZONE .value , "UTC" )
45
+ def compare_time_range (context_value : Any , condition_value : Dict ) -> bool :
46
+ timezone_name = condition_value .get (TimeValues .TIMEZONE .value , "UTC" )
47
47
current_time : datetime = _get_now_from_timezone (gettz (timezone_name ))
48
48
49
- start_hour , start_min = values .get (TimeValues .START .value , "" ).split (HOUR_MIN_SEPARATOR )
50
- end_hour , end_min = values .get (TimeValues .END .value , "" ).split (HOUR_MIN_SEPARATOR )
49
+ start_hour , start_min = condition_value .get (TimeValues .START .value , "" ).split (HOUR_MIN_SEPARATOR )
50
+ end_hour , end_min = condition_value .get (TimeValues .END .value , "" ).split (HOUR_MIN_SEPARATOR )
51
51
52
52
start_time = current_time .replace (hour = int (start_hour ), minute = int (start_min ))
53
53
end_time = current_time .replace (hour = int (end_hour ), minute = int (end_min ))
@@ -71,3 +71,14 @@ def compare_time_range(action: str, values: Dict) -> bool:
71
71
else :
72
72
# In normal circumstances, we need to assert **both** conditions
73
73
return start_time <= current_time <= end_time
74
+
75
+
76
+ def compare_modulo_range (context_value : int , condition_value : Dict ) -> bool :
77
+ """
78
+ Returns for a given context 'a' and modulo condition 'b' -> b.start <= a % b.base <= b.end
79
+ """
80
+ base = condition_value .get (ModuloRangeValues .BASE .value , 1 )
81
+ start = condition_value .get (ModuloRangeValues .START .value , 1 )
82
+ end = condition_value .get (ModuloRangeValues .END .value , 1 )
83
+
84
+ return start <= context_value % base <= end
0 commit comments