Skip to content

Commit fbdd7fa

Browse files
authored
Context condiition logic (#3532)
* Add condition logic to context * Add W1028 to validate code paths that will never get used
1 parent 0eeeb6b commit fbdd7fa

File tree

19 files changed

+846
-118
lines changed

19 files changed

+846
-118
lines changed

src/cfnlint/context/_conditions.py

-70
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
__all__ = ["Conditions", "Unsatisfiable"]
2+
3+
from cfnlint.context.conditions._conditions import Conditions
4+
from cfnlint.context.conditions.exceptions import Unsatisfiable
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
"""
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: MIT-0
4+
"""
5+
6+
from __future__ import annotations
7+
8+
from dataclasses import dataclass, field
9+
from typing import Any
10+
11+
from sympy import And, Not, Or
12+
from sympy.logic.boolalg import BooleanFunction
13+
14+
from cfnlint.conditions._utils import get_hash
15+
from cfnlint.context.conditions._equals import Equal
16+
from cfnlint.helpers import FUNCTION_CONDITIONS, is_function
17+
18+
19+
@dataclass(frozen=True)
20+
class Condition:
21+
instance: Any = field(init=True)
22+
status: bool | None = field(init=True, default=None)
23+
hash: str = field(init=False)
24+
25+
fn_equals: Equal | None = field(init=True, default=None)
26+
condition: list["Condition"] | "Condition" | None = field(init=True, default=None)
27+
cnf: BooleanFunction = field(init=True, default_factory=BooleanFunction)
28+
29+
def __post_init__(self):
30+
object.__setattr__(self, "hash", get_hash(self.instance))
31+
32+
@classmethod
33+
def create_from_instance(
34+
cls, instance: Any, all_conditions: dict[str, Any]
35+
) -> "Condition":
36+
fn_k, fn_v = is_function(instance)
37+
if fn_k is None:
38+
raise ValueError("Condition value must be an object of length 1")
39+
if fn_k in FUNCTION_CONDITIONS:
40+
if not isinstance(fn_v, list):
41+
raise ValueError(f"{fn_v!r} value should be an array")
42+
if fn_k == "Fn::Equals":
43+
equal = Equal.create_from_instance(fn_v)
44+
return cls(instance=instance, fn_equals=equal, cnf=equal.cnf)
45+
46+
condition = []
47+
for v in fn_v:
48+
condition.append(Condition.create_from_instance(v, all_conditions))
49+
50+
cnf = None
51+
if fn_k == "Fn::And":
52+
cnf = And(*[c.cnf for c in condition])
53+
elif fn_k == "Fn::Or":
54+
cnf = Or(*[c.cnf for c in condition])
55+
elif fn_k == "Fn::Not":
56+
if len(condition) != 1:
57+
raise ValueError(
58+
f"Fn::Not expects only one condition, got {len(condition)}"
59+
)
60+
cnf = Not(condition[0].cnf)
61+
62+
return cls(instance=instance, condition=condition, cnf=cnf)
63+
64+
if fn_k == "Condition":
65+
if not isinstance(fn_v, str):
66+
raise ValueError(f"Condition value {fn_v!r} must be a string")
67+
sub_condition = all_conditions.get(fn_v)
68+
try:
69+
c = Condition.create_from_instance(sub_condition, all_conditions)
70+
except Exception:
71+
c = Condition.create_from_instance(
72+
{"Fn::Equals": [None, None]}, all_conditions
73+
)
74+
return cls(instance=instance, condition=c, cnf=c.cnf)
75+
76+
raise ValueError(f"Unknown key {fn_k!r} in condition")
77+
78+
def evolve(self, status: bool | None) -> "Condition":
79+
cls = self.__class__
80+
81+
if self.status is not None:
82+
if status != self.status:
83+
raise ValueError(f"Resetting status to {status} from {self.status}")
84+
85+
return cls(
86+
instance=self.instance,
87+
status=status,
88+
cnf=self.cnf,
89+
)
90+
91+
@property
92+
def is_region(self) -> bool:
93+
"""Returns True or False if the condition is based on region
94+
95+
Args: None
96+
97+
Returns:
98+
bool
99+
Returns True or False if the condition is based on region
100+
"""
101+
if self.fn_equals:
102+
return self.fn_equals.is_region
103+
if isinstance(self.condition, list):
104+
for c in self.condition:
105+
if c.is_region:
106+
return True
107+
return False
108+
if self.condition:
109+
return self.condition.is_region
110+
return False
111+
112+
@property
113+
def equals(self) -> list[Equal]:
114+
"""Returns a Sequence of the Equals that make up the Condition
115+
116+
Args: None
117+
118+
Returns:
119+
Sequence[EqualParameter] | Sequence[Equal] | None:
120+
The Equal that are part of the condition
121+
"""
122+
if self.fn_equals:
123+
return [self.fn_equals]
124+
if isinstance(self.condition, list):
125+
equals = []
126+
for c in self.condition:
127+
equals.extend(c.equals)
128+
return equals
129+
if self.condition:
130+
return self.condition.equals
131+
return []
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
"""
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: MIT-0
4+
"""
5+
6+
from __future__ import annotations
7+
8+
from dataclasses import dataclass, field
9+
from typing import TYPE_CHECKING, Any
10+
11+
from sympy import Not, Or
12+
from sympy.logic.boolalg import BooleanFunction
13+
from sympy.logic.inference import satisfiable
14+
15+
from cfnlint.conditions._utils import get_hash
16+
from cfnlint.context.conditions._condition import Condition
17+
from cfnlint.context.conditions.exceptions import Unsatisfiable
18+
19+
if TYPE_CHECKING:
20+
from cfnlint.context.context import Parameter
21+
22+
23+
@dataclass(frozen=True)
24+
class Conditions:
25+
# Template level condition management
26+
conditions: dict[str, Condition] = field(init=True, default_factory=dict)
27+
cnf: BooleanFunction | None = field(init=True, default=None)
28+
29+
@classmethod
30+
def create_from_instance(
31+
cls, conditions: Any, parameters: dict[str, "Parameter"]
32+
) -> "Conditions":
33+
obj: dict[str, Condition] = {}
34+
if not isinstance(conditions, dict):
35+
raise ValueError("Conditions must be a object")
36+
for k, v in conditions.items():
37+
try:
38+
obj[k] = Condition.create_from_instance(v, conditions)
39+
except ValueError:
40+
# this is a default condition so we can keep the name but it will
41+
# not associate with another condition and will always be true/false
42+
obj[k] = Condition.create_from_instance(
43+
{"Fn::Equals": [None, None]}, conditions
44+
)
45+
46+
cnf = None
47+
for p_k, p_v in parameters.items():
48+
49+
if not p_v.allowed_values:
50+
continue
51+
allowed_values = p_v.allowed_values.copy()
52+
equals_cnfs = []
53+
for _, c_v in obj.items():
54+
for i in c_v.equals:
55+
if i.right.hash == get_hash({"Ref": p_k}):
56+
if not isinstance(i.left.instance, str):
57+
continue
58+
equals_cnfs.append(i.cnf)
59+
if i.left.instance in allowed_values:
60+
allowed_values.remove(i.left.instance)
61+
62+
if not allowed_values:
63+
if cnf is None:
64+
cnf = Or(*equals_cnfs)
65+
else:
66+
cnf = cnf & Or(*equals_cnfs)
67+
68+
return cls(conditions=obj, cnf=cnf)
69+
70+
def evolve(self, status: dict[str, bool]) -> "Conditions":
71+
cls = self.__class__
72+
73+
conditions: dict[str, Condition] = {}
74+
cnf = self.cnf
75+
for condition, value in self.conditions.items():
76+
s = status.get(condition, value.status)
77+
try:
78+
conditions[condition] = value.evolve(status=s)
79+
if s is not None:
80+
if cnf:
81+
cnf = (
82+
cnf & conditions[condition].cnf
83+
if s
84+
else cnf & Not(conditions[condition].cnf)
85+
)
86+
else:
87+
cnf = (
88+
conditions[condition].cnf
89+
if s
90+
else Not(conditions[condition].cnf)
91+
)
92+
except ValueError as e:
93+
raise Unsatisfiable(
94+
new_status=status,
95+
current_status=self.status,
96+
) from e
97+
98+
if not satisfiable(cnf):
99+
raise Unsatisfiable(
100+
new_status=status,
101+
current_status=self.status,
102+
)
103+
104+
return cls(
105+
conditions=conditions,
106+
cnf=cnf,
107+
)
108+
109+
@property
110+
def status(self) -> dict[str, bool]:
111+
obj = {}
112+
for name, c in self.conditions.items():
113+
if c.status is not None:
114+
obj[name] = c.status
115+
116+
return obj

0 commit comments

Comments
 (0)