Skip to content

Commit bf4bdd6

Browse files
committed
refactor(schema-validator): initial change to classes
1 parent 3a71fdf commit bf4bdd6

File tree

3 files changed

+111
-55
lines changed

3 files changed

+111
-55
lines changed

aws_lambda_powertools/utilities/feature_flags/base.py

+6
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,9 @@ def get_json_configuration(self) -> Dict[str, Any]:
2222
parsed JSON dictionary
2323
"""
2424
return NotImplemented # pragma: no cover
25+
26+
27+
class BaseValidator(ABC):
28+
@abstractmethod
29+
def validate(self):
30+
return NotImplemented # pragma: no cover
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import logging
22
from enum import Enum
3-
from typing import Any, Dict, Optional
3+
from typing import Any, Dict, List, Optional
44

5+
from .base import BaseValidator
56
from .exceptions import ConfigurationError
67

78
logger = logging.getLogger(__name__)
@@ -28,74 +29,117 @@ class SchemaValidator:
2829
def __init__(self, schema: Dict[str, Any]):
2930
self.schema = schema
3031

31-
@staticmethod
32-
def _is_dict_and_non_empty(value: Optional[Dict]):
33-
return not value or isinstance(value, dict)
32+
def validate(self) -> None:
33+
if not isinstance(self.schema, dict):
34+
raise ConfigurationError(f"Schema must be a dictionary, schema={str(self.schema)}")
35+
36+
features = Features(schema=self.schema)
37+
features.validate()
38+
39+
40+
class Features(BaseValidator):
41+
def __init__(self, schema):
42+
self.schema = schema
43+
self.features: Optional[Dict[str, Dict]] = None
44+
45+
if isinstance(self.schema, dict):
46+
self.features = self.schema.get(FEATURES_KEY)
47+
48+
def validate(self):
49+
if not isinstance(self.features, dict):
50+
raise ConfigurationError(f"'features' key must be a dictionary, schema={self.schema}")
51+
52+
for name, feature in self.features.items():
53+
self.validate_feature(name, feature)
54+
rules = FeatureRules(feature=feature, feature_name=name)
55+
rules.validate()
3456

3557
@staticmethod
36-
def _validate_condition(rule_name: str, condition: Dict[str, str]) -> None:
37-
if not condition or not isinstance(condition, dict):
38-
raise ConfigurationError(f"invalid condition type, not a dictionary, rule_name={rule_name}")
58+
def validate_feature(name, feature):
59+
if not feature or not isinstance(feature, dict):
60+
raise ConfigurationError(f"Feature must be a non-empty dictionary, feature={name}")
3961

40-
action = condition.get(CONDITION_ACTION, "")
41-
if action not in [ACTION.EQUALS.value, ACTION.STARTSWITH.value, ACTION.ENDSWITH.value, ACTION.CONTAINS.value]:
42-
raise ConfigurationError(f"invalid action value, rule_name={rule_name}, action={action}")
62+
default_value = feature.get(FEATURE_DEFAULT_VAL_KEY)
63+
if default_value is None or not isinstance(default_value, bool):
64+
raise ConfigurationError(f"'feature_default_value' boolean key must be present, feature_name={name}")
4365

44-
key = condition.get(CONDITION_KEY, "")
45-
if not key or not isinstance(key, str):
46-
raise ConfigurationError(f"Invalid key value, key has to be a non empty string, rule_name={rule_name}")
4766

48-
value = condition.get(CONDITION_VALUE, "")
49-
if not value:
50-
raise ConfigurationError(f"Missing condition value, rule_name={rule_name}")
67+
class FeatureRules(BaseValidator):
68+
def __init__(self, feature: Dict[str, Any], feature_name: str):
69+
self.feature = feature
70+
self.feature_name = feature_name
71+
self.rules: Optional[List[Dict]] = self.feature.get(RULES_KEY)
72+
73+
def validate(self):
74+
if not isinstance(self.rules, list):
75+
raise ConfigurationError(f"Feature rules is not a list, feature_name={self.feature_name}")
76+
77+
if not self.rules:
78+
logger.debug("Rules are empty, ignoring validation")
79+
return
5180

52-
def _validate_rule(self, feature_name: str, rule: Dict[str, Any]) -> None:
81+
for rule in self.rules:
82+
self.validate_rule(rule, self.feature)
83+
conditions = FeatureRuleConditions(rule=rule, rule_name=rule.get(RULE_NAME_KEY))
84+
conditions.validate()
85+
86+
def validate_rule(self, rule, feature_name):
5387
if not rule or not isinstance(rule, dict):
54-
raise ConfigurationError(f"Feature rule is not a dictionary, feature_name={feature_name}")
88+
raise ConfigurationError(f"Feature rule must be a dictionary, feature_name={feature_name}")
89+
90+
self.validate_rule_name(rule, feature_name)
91+
self.validate_rule_default_value(rule)
5592

93+
@staticmethod
94+
def validate_rule_name(rule, feature_name):
5695
rule_name = rule.get(RULE_NAME_KEY)
5796
if not rule_name or rule_name is None or not isinstance(rule_name, str):
58-
raise ConfigurationError(f"Invalid rule_name, feature_name={feature_name}")
97+
raise ConfigurationError(
98+
f"'rule_name' key must be present and have a non-empty string, feature_name={feature_name}"
99+
)
59100

101+
@staticmethod
102+
def validate_rule_default_value(rule):
103+
rule_name = rule.get(RULE_NAME_KEY)
60104
rule_default_value = rule.get(RULE_DEFAULT_VALUE)
61105
if rule_default_value is None or not isinstance(rule_default_value, bool):
62-
raise ConfigurationError(f"Invalid rule_default_value, rule_name={rule_name}")
63-
64-
conditions = rule.get(CONDITIONS_KEY, {})
65-
if not conditions or not isinstance(conditions, list):
66-
raise ConfigurationError(f"Invalid condition, rule_name={rule_name}")
106+
raise ConfigurationError(f"'rule_default_value' key must have be bool, rule_name={rule_name}")
67107

68-
# validate conditions
69-
for condition in conditions:
70-
self._validate_condition(rule_name, condition)
71108

72-
def _validate_feature(self, name: str, feature: Dict[str, Any]) -> None:
73-
if not feature or not isinstance(feature, dict):
74-
# if self._is_dict_and_non_empty(feature):
75-
raise ConfigurationError(f"Invalid AWS AppConfig JSON schema detected, feature {name} is invalid")
109+
class FeatureRuleConditions(BaseValidator):
110+
def __init__(self, rule: Dict[str, Any], rule_name: str):
111+
self.conditions = rule.get(CONDITIONS_KEY, {})
112+
self.rule_name = rule_name
76113

77-
feature_default_value = feature.get(FEATURE_DEFAULT_VAL_KEY)
78-
if feature_default_value is None or not isinstance(feature_default_value, bool):
79-
raise ConfigurationError(f"Missing feature_default_value for feature, feature_name={name}")
114+
def validate(self):
115+
if not self.conditions or not isinstance(self.conditions, list):
116+
raise ConfigurationError(f"Invalid condition, rule_name={self.rule_name}")
80117

81-
# validate rules
82-
rules = feature.get(RULES_KEY, [])
83-
if not rules:
84-
return
118+
for condition in self.conditions:
119+
self._validate_condition(rule_name=self.rule_name, condition=condition)
85120

86-
if not isinstance(rules, list):
87-
raise ConfigurationError(f"Feature rules is not a list, feature_name={name}")
121+
def _validate_condition(self, rule_name: str, condition: Dict[str, str]) -> None:
122+
if not condition or not isinstance(condition, dict):
123+
raise ConfigurationError(f"invalid condition type, not a dictionary, rule_name={rule_name}")
88124

89-
for rule in rules:
90-
self._validate_rule(name, rule)
125+
self._validate_condition_action(condition=condition, rule_name=rule_name)
126+
self._validate_condition_key(condition=condition, rule_name=rule_name)
127+
self._validate_condition_value(condition=condition, rule_name=rule_name)
91128

92-
def validate(self) -> None:
93-
if not self._is_dict_and_non_empty(self.schema):
94-
raise ConfigurationError(f"Schema must be a dictionary, schema={str(self.schema)}")
129+
@staticmethod
130+
def _validate_condition_action(condition: Dict[str, Any], rule_name: str):
131+
action = condition.get(CONDITION_ACTION, "")
132+
if action not in ACTION.__members__:
133+
raise ConfigurationError(f"invalid action value, rule_name={rule_name}, action={action}")
95134

96-
features: Optional[Dict[str, Dict]] = self.schema.get(FEATURES_KEY)
97-
if not isinstance(features, dict):
98-
raise ConfigurationError(f"'features' key must be present, schema={self.schema}")
135+
@staticmethod
136+
def _validate_condition_key(condition: Dict[str, Any], rule_name: str):
137+
key = condition.get(CONDITION_KEY, "")
138+
if not key or not isinstance(key, str):
139+
raise ConfigurationError(f"Invalid key value, key has to be a non empty string, rule_name={rule_name}")
99140

100-
for name, feature in features.items():
101-
self._validate_feature(name, feature)
141+
@staticmethod
142+
def _validate_condition_value(condition: Dict[str, Any], rule_name: str):
143+
value = condition.get(CONDITION_VALUE, "")
144+
if not value:
145+
raise ConfigurationError(f"Missing condition value, rule_name={rule_name}")

tests/functional/feature_toggles/test_schema_validation.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
RULE_DEFAULT_VALUE,
1515
RULE_NAME_KEY,
1616
RULES_KEY,
17+
FeatureRules,
1718
SchemaValidator,
1819
)
1920

@@ -315,11 +316,16 @@ def test_validate_condition_missing_condition_value():
315316

316317
def test_validate_rule_invalid_rule_name():
317318
# GIVEN a rule_name not in the rule dict
318-
validator = SchemaValidator(EMPTY_SCHEMA)
319-
rule_name = "invalid_rule_name"
320-
rule = {"missing": ""}
319+
feature = {
320+
"feature_default_value": True,
321+
"rules": [
322+
{"invalid_rule_name": "tenant id equals 345345435"},
323+
],
324+
}
325+
rule = feature[RULES_KEY][0]
321326

322327
# WHEN calling _validate_rule
323328
# THEN raise ConfigurationError
324-
with pytest.raises(ConfigurationError, match="Invalid rule_name"):
325-
validator._validate_rule(rule_name, rule)
329+
with pytest.raises(ConfigurationError, match="'rule_name' key must be present*"):
330+
rules_validator = FeatureRules(feature=feature, feature_name="my_feature")
331+
rules_validator.validate_rule_name(rule=rule, feature_name="my_feature")

0 commit comments

Comments
 (0)