forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_custom_resource.py
130 lines (103 loc) · 5.41 KB
/
test_custom_resource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import pytest
from pydantic import BaseModel, Field
from aws_lambda_powertools.utilities.parser import ValidationError, event_parser
from aws_lambda_powertools.utilities.parser.models import (
CloudFormationCustomResourceCreateModel,
CloudFormationCustomResourceDeleteModel,
CloudFormationCustomResourceUpdateModel,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from tests.functional.utils import load_event
@event_parser(model=CloudFormationCustomResourceCreateModel)
def handle_create_custom_resource(event: CloudFormationCustomResourceCreateModel, _: LambdaContext):
assert event.request_type == "Create"
assert event.request_id == "xxxxx-d2a0-4dfb-ab1f-xxxxxx"
assert event.service_token == "arn:aws:lambda:us-east-1:xxx:function:xxxx-CrbuiltinfunctionidProvi-2vKAalSppmKe"
assert (
str(event.response_url)
== "https://cloudformation-custom-resource-response-useast1.s3.amazonaws.com/7F%7Cb1f50fdfc25f3b"
)
assert event.stack_id == "arn:aws:cloudformation:us-east-1:xxxx:stack/xxxx/271845b0-f2e8-11ed-90ac-0eeb25b8ae21"
assert event.logical_resource_id == "xxxxxxxxx"
assert event.resource_type == "Custom::MyType"
assert event.resource_properties == {
"ServiceToken": "arn:aws:lambda:us-east-1:xxxxx:function:xxxxx",
"MyProps": "ss",
}
@event_parser(model=CloudFormationCustomResourceUpdateModel)
def handle_update_custom_resource(event: CloudFormationCustomResourceUpdateModel, _: LambdaContext):
assert event.request_type == "Update"
assert event.request_id == "xxxxx-d2a0-4dfb-ab1f-xxxxxx"
assert event.service_token == "arn:aws:lambda:us-east-1:xxx:function:xxxx-CrbuiltinfunctionidProvi-2vKAalSppmKe"
assert (
str(event.response_url)
== "https://cloudformation-custom-resource-response-useast1.s3.amazonaws.com/7F%7Cb1f50fdfc25f3b"
)
assert event.stack_id == "arn:aws:cloudformation:us-east-1:xxxx:stack/xxxx/271845b0-f2e8-11ed-90ac-0eeb25b8ae21"
assert event.logical_resource_id == "xxxxxxxxx"
assert event.resource_type == "Custom::MyType"
assert event.resource_properties == {
"ServiceToken": "arn:aws:lambda:us-east-1:xxxxx:function:xxxxx",
"MyProps": "new",
}
assert event.old_resource_properties == {
"ServiceToken": "arn:aws:lambda:us-east-1:xxxxx:function:xxxxx-xxxx-xxx",
"MyProps": "old",
}
@event_parser(model=CloudFormationCustomResourceDeleteModel)
def handle_delete_custom_resource(event: CloudFormationCustomResourceDeleteModel, _: LambdaContext):
assert event.request_type == "Delete"
assert event.request_id == "xxxxx-d2a0-4dfb-ab1f-xxxxxx"
assert event.service_token == "arn:aws:lambda:us-east-1:xxx:function:xxxx-CrbuiltinfunctionidProvi-2vKAalSppmKe"
assert (
str(event.response_url)
== "https://cloudformation-custom-resource-response-useast1.s3.amazonaws.com/7F%7Cb1f50fdfc25f3b"
)
assert event.stack_id == "arn:aws:cloudformation:us-east-1:xxxx:stack/xxxx/271845b0-f2e8-11ed-90ac-0eeb25b8ae21"
assert event.logical_resource_id == "xxxxxxxxx"
assert event.resource_type == "Custom::MyType"
assert event.resource_properties == {
"ServiceToken": "arn:aws:lambda:us-east-1:xxxxx:function:xxxxx",
"MyProps": "ss",
}
def test_create_trigger_event():
event_dict = load_event("customResourceCreate.json")
handle_create_custom_resource(event_dict, LambdaContext())
def test_update_trigger_event():
event_dict = load_event("customResourceUpdate.json")
handle_update_custom_resource(event_dict, LambdaContext())
def test_delete_trigger_event():
event_dict = load_event("customResourceDelete.json")
handle_delete_custom_resource(event_dict, LambdaContext())
def test_validate_create_event_does_not_conform_with_model():
event = {"invalid": "event"}
with pytest.raises(ValidationError):
handle_create_custom_resource(event, LambdaContext())
def test_validate_update_event_does_not_conform_with_model():
event = {"invalid": "event"}
with pytest.raises(ValidationError):
handle_update_custom_resource(event, LambdaContext())
def test_validate_delete_event_does_not_conform_with_model():
event = {"invalid": "event"}
with pytest.raises(ValidationError):
handle_delete_custom_resource(event, LambdaContext())
class MyModel(BaseModel):
MyProps: str
class MyCustomResource(CloudFormationCustomResourceCreateModel):
resource_properties: MyModel = Field(..., alias="ResourceProperties")
@event_parser(model=MyCustomResource)
def handle_create_custom_resource_extended_model(event: MyCustomResource, _: LambdaContext):
assert event.request_type == "Create"
assert event.request_id == "xxxxx-d2a0-4dfb-ab1f-xxxxxx"
assert event.service_token == "arn:aws:lambda:us-east-1:xxx:function:xxxx-CrbuiltinfunctionidProvi-2vKAalSppmKe"
assert (
str(event.response_url)
== "https://cloudformation-custom-resource-response-useast1.s3.amazonaws.com/7F%7Cb1f50fdfc25f3b"
)
assert event.stack_id == "arn:aws:cloudformation:us-east-1:xxxx:stack/xxxx/271845b0-f2e8-11ed-90ac-0eeb25b8ae21"
assert event.logical_resource_id == "xxxxxxxxx"
assert event.resource_type == "Custom::MyType"
assert event.resource_properties.MyProps == "ss"
def test_create_trigger_event_custom_model():
event_dict = load_event("customResourceCreate.json")
handle_create_custom_resource_extended_model(event_dict, LambdaContext())