Skip to content

Commit 0eeeb6b

Browse files
authored
Create rule W3663 to validate lmbd permission account (#3523)
* Create rule W3663 to validate lmbd permission account
1 parent c6b8148 commit 0eeeb6b

File tree

2 files changed

+292
-0
lines changed

2 files changed

+292
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: MIT-0
4+
"""
5+
6+
from __future__ import annotations
7+
8+
from typing import Any
9+
10+
import regex as re
11+
12+
from cfnlint.helpers import ensure_list, is_function
13+
from cfnlint.jsonschema import ValidationError, ValidationResult
14+
from cfnlint.jsonschema.protocols import Validator
15+
from cfnlint.rules.jsonschema import CfnLintKeyword
16+
17+
18+
class PermissionSourceAccount(CfnLintKeyword):
19+
20+
id = "W3663"
21+
shortdesc = "Validate SourceAccount is required property"
22+
description = (
23+
"When configuration a Lambda permission with a SourceArn "
24+
"that doesn't have an AccountId you should also specify "
25+
"the SourceAccount"
26+
)
27+
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-permission.html#cfn-lambda-permission-sourceaccount"
28+
tags = ["resources", "lambda", "permission"]
29+
30+
def __init__(self):
31+
super().__init__(
32+
keywords=["Resources/AWS::Lambda::Permission/Properties"],
33+
)
34+
35+
def _validate_sub_has_account_id(self, validator: Validator, value: Any) -> bool:
36+
value = ensure_list(value)
37+
38+
if isinstance(value[0], str):
39+
if re.search(r":(\d{12}|\${AWS::AccountId}):", value[0]):
40+
return True
41+
42+
return False
43+
return True
44+
45+
def _validate_is_gettatt_to_bucket(self, validator: Validator, value: Any) -> bool:
46+
value = ensure_list(value)[0].split(".")[0]
47+
48+
resource = validator.context.resources[value]
49+
if resource.type == "AWS::S3::Bucket":
50+
return True
51+
return False
52+
53+
def validate(
54+
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any]
55+
) -> ValidationResult:
56+
if not isinstance(instance, dict):
57+
return
58+
59+
for scenario in validator.cfn.get_object_without_conditions(
60+
instance, ["SourceArn", "SourceAccount"]
61+
):
62+
if scenario.get("Scenario"):
63+
scenario_validator = validator.evolve(
64+
context=validator.context.evolve(
65+
conditions=validator.context.conditions.evolve(
66+
status=scenario.get("Scenario")
67+
)
68+
)
69+
)
70+
else:
71+
scenario_validator = validator.evolve()
72+
73+
source_arn = scenario.get("Object").get("SourceArn")
74+
source_account = scenario.get("Object").get("SourceAccount")
75+
if not source_arn:
76+
continue
77+
78+
if isinstance(source_arn, str):
79+
if re.search(r":\d{12}:", source_arn):
80+
continue
81+
82+
fn_k, fn_v = is_function(source_arn)
83+
if fn_k is not None:
84+
if fn_k == "Fn::Sub":
85+
if self._validate_sub_has_account_id(scenario_validator, fn_v):
86+
continue
87+
elif fn_k == "Fn::GetAtt":
88+
if not self._validate_is_gettatt_to_bucket(
89+
scenario_validator, fn_v
90+
):
91+
continue
92+
else:
93+
continue
94+
95+
if not source_account:
96+
yield ValidationError(
97+
"'SourceAccount' is a required property",
98+
validator="required",
99+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
"""
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: MIT-0
4+
"""
5+
6+
import pytest
7+
8+
from cfnlint.jsonschema import ValidationError
9+
from cfnlint.rules.resources.lmbd.PermissionSourceAccount import PermissionSourceAccount
10+
11+
12+
@pytest.fixture(scope="module")
13+
def rule():
14+
rule = PermissionSourceAccount()
15+
yield rule
16+
17+
18+
@pytest.fixture
19+
def template():
20+
return {
21+
"AWSTemplateFormatVersion": "2010-09-09",
22+
"Conditions": {
23+
"IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]},
24+
},
25+
"Resources": {
26+
"Bucket": {"Type": "AWS::S3::Bucket"},
27+
"SQS": {"Type": "AWS::SQS::Queue"},
28+
},
29+
}
30+
31+
32+
@pytest.mark.parametrize(
33+
"instance,expected",
34+
[
35+
(
36+
{
37+
"SourceArn": "arn:aws:s3:::bucket_name",
38+
"SourceAccount": "123456789012",
39+
},
40+
[],
41+
),
42+
(
43+
{},
44+
[],
45+
),
46+
(
47+
[],
48+
[],
49+
),
50+
(
51+
{
52+
"SourceArn": [],
53+
},
54+
[],
55+
),
56+
(
57+
{
58+
"SourceArn": "arn:aws:s3:::bucket_name",
59+
},
60+
[
61+
ValidationError(
62+
"'SourceAccount' is a required property",
63+
validator="required",
64+
)
65+
],
66+
),
67+
(
68+
{
69+
"SourceArn": "arn:aws:sqs:us-east-1:123456789012:queue",
70+
},
71+
[],
72+
),
73+
(
74+
{
75+
"SourceArn": {
76+
"Fn::Sub": (
77+
"arn:${AWS::Partition}:sqs:"
78+
"${AWS::Region}:${AWS::AccountId}:queue"
79+
)
80+
},
81+
},
82+
[],
83+
),
84+
(
85+
{
86+
"SourceArn": {"Fn::Sub": "arn:${AWS::Partition}:s3:::bucket"},
87+
},
88+
[
89+
ValidationError(
90+
"'SourceAccount' is a required property",
91+
validator="required",
92+
)
93+
],
94+
),
95+
(
96+
{
97+
"SourceArn": {"Fn::Sub": [[], {}]},
98+
},
99+
[],
100+
),
101+
(
102+
{
103+
"SourceArn": {"Fn::Sub": "arn:${AWS::Partition}:s3:::bucket"},
104+
"SourceAccount": {"Ref": "AWS::AccountId"},
105+
},
106+
[],
107+
),
108+
(
109+
{
110+
"SourceArn": {"Fn::GetAtt": ["Bucket", "Arn"]},
111+
"SourceAccount": {"Ref": "AWS::AccountId"},
112+
},
113+
[],
114+
),
115+
(
116+
{
117+
"SourceArn": {"Fn::GetAtt": ["Bucket", "Arn"]},
118+
},
119+
[
120+
ValidationError(
121+
"'SourceAccount' is a required property",
122+
validator="required",
123+
)
124+
],
125+
),
126+
(
127+
{
128+
"SourceArn": {"Fn::GetAtt": ["SQS", "Arn"]},
129+
"SourceAccount": {"Ref": "AWS::AccountId"},
130+
},
131+
[],
132+
),
133+
(
134+
{
135+
"SourceArn": {"Fn::GetAtt": ["SQS", "Arn"]},
136+
},
137+
[],
138+
),
139+
(
140+
{
141+
"SourceArn": {"Ref": "Foo"},
142+
},
143+
[],
144+
),
145+
(
146+
{
147+
"SourceArn": {
148+
"Fn::If": [
149+
"IsUsEast1",
150+
{"Fn::GetAtt": ["Bucket", "Arn"]},
151+
{"Fn::GetAtt": ["SQS", "Arn"]},
152+
]
153+
},
154+
"SourceAccount": {
155+
"Fn::If": [
156+
"IsUsEast1",
157+
{"Ref": "AWS::AccountId"},
158+
{"Ref": "AWS::NoValue"},
159+
]
160+
},
161+
},
162+
[],
163+
),
164+
(
165+
{
166+
"SourceArn": {
167+
"Fn::If": [
168+
"IsUsEast1",
169+
{"Fn::GetAtt": ["Bucket", "Arn"]},
170+
{"Fn::GetAtt": ["SQS", "Arn"]},
171+
]
172+
},
173+
"SourceAccount": {
174+
"Fn::If": [
175+
"IsUsEast1",
176+
{"Ref": "AWS::NoValue"},
177+
{"Ref": "AWS::AccountId"},
178+
]
179+
},
180+
},
181+
[
182+
ValidationError(
183+
"'SourceAccount' is a required property",
184+
validator="required",
185+
)
186+
],
187+
),
188+
],
189+
)
190+
def test_validate(instance, expected, rule, validator):
191+
errs = list(rule.validate(validator, "", instance, {}))
192+
193+
assert errs == expected, f"Expected {expected} got {errs}"

0 commit comments

Comments
 (0)