-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathtest_idempotency_redis.py
214 lines (166 loc) · 8.26 KB
/
test_idempotency_redis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import json
from time import sleep
import pytest
from tests.e2e.utils import data_fetcher
from tests.e2e.utils.data_fetcher.common import GetLambdaResponseOptions, get_lambda_response_in_parallel
@pytest.fixture
def ttl_cache_expiration_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("TtlCacheExpirationHandlerArn", "")
@pytest.fixture
def ttl_cache_timeout_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("TtlCacheTimeoutHandlerArn", "")
@pytest.fixture
def parallel_execution_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("ParallelExecutionHandlerArn", "")
@pytest.fixture
def function_thread_safety_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("FunctionThreadSafetyHandlerArn", "")
@pytest.fixture
def optional_idempotency_key_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("OptionalIdempotencyKeyHandlerArn", "")
@pytest.fixture
def response_hook_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("ResponseHookArn", "")
@pytest.mark.xdist_group(name="idempotency-redis")
def test_ttl_caching_expiration_idempotency(ttl_cache_expiration_handler_fn_arn: str):
# GIVEN
payload = json.dumps({"message": "Powertools for AWS Lambda (Python) - TTL 5s"})
# WHEN
# first execution
first_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn,
payload=payload,
)
first_execution_response = first_execution["Payload"].read().decode("utf-8")
# the second execution should return the same response as the first execution
second_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn,
payload=payload,
)
second_execution_response = second_execution["Payload"].read().decode("utf-8")
# wait 8s to expire ttl and execute again, this should return a new response value
sleep(8)
third_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn,
payload=payload,
)
third_execution_response = third_execution["Payload"].read().decode("utf-8")
# THEN
assert first_execution_response == second_execution_response
assert third_execution_response != second_execution_response
@pytest.mark.xdist_group(name="idempotency-redis")
def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str):
# GIVEN
payload_timeout_execution = json.dumps(
{"sleep": 12, "message": "Powertools for AWS Lambda (Python) - TTL 1s"},
sort_keys=True,
)
payload_working_execution = json.dumps(
{"sleep": 0, "message": "Powertools for AWS Lambda (Python) - TTL 1s"},
sort_keys=True,
)
# WHEN
# first call should fail due to timeout
execution_with_timeout, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_timeout_handler_fn_arn,
payload=payload_timeout_execution,
raise_on_error=False,
)
execution_with_timeout_response = execution_with_timeout["Payload"].read().decode("utf-8")
# the second call should work and return the payload
execution_working, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_timeout_handler_fn_arn,
payload=payload_working_execution,
)
execution_working_response = execution_working["Payload"].read().decode("utf-8")
# THEN
assert "Task timed out after" in execution_with_timeout_response
assert payload_working_execution == execution_working_response
@pytest.mark.xdist_group(name="idempotency-redis")
def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str):
# GIVEN
payload = json.dumps({"message": "Powertools for AWS Lambda (Python) - Parallel execution"})
invocation_options = [
GetLambdaResponseOptions(lambda_arn=parallel_execution_handler_fn_arn, payload=payload, raise_on_error=False),
GetLambdaResponseOptions(lambda_arn=parallel_execution_handler_fn_arn, payload=payload, raise_on_error=False),
]
# WHEN executing Lambdas in parallel
execution_result_list = get_lambda_response_in_parallel(invocation_options)
timeout_execution_response = execution_result_list[0][0]["Payload"].read().decode("utf-8")
error_idempotency_execution_response = execution_result_list[1][0]["Payload"].read().decode("utf-8")
# THEN
assert "Execution already in progress with idempotency key" in error_idempotency_execution_response
assert "Task timed out after" in timeout_execution_response
@pytest.mark.xdist_group(name="idempotency-redis")
def test_idempotent_function_thread_safety(function_thread_safety_handler_fn_arn: str):
# GIVEN
payload = json.dumps({"message": "Powertools for AWS Lambda (Python) - Idempotent function thread safety check"})
# WHEN
# first execution
first_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=function_thread_safety_handler_fn_arn,
payload=payload,
)
first_execution_response = first_execution["Payload"].read().decode("utf-8")
# the second execution should return the same response as the first execution
second_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=function_thread_safety_handler_fn_arn,
payload=payload,
)
second_execution_response = second_execution["Payload"].read().decode("utf-8")
# THEN
# Function threads finished without exception AND
# first and second execution is the same
for function_thread in json.loads(first_execution_response):
assert function_thread["state"] == "FINISHED"
assert function_thread["exception"] is None
assert function_thread["output"] is not None
# we use set() here because we want to compare the elements regardless of their order in the array
assert set(first_execution_response) == set(second_execution_response)
@pytest.mark.xdist_group(name="idempotency-redis")
def test_optional_idempotency_key(optional_idempotency_key_fn_arn: str):
# GIVEN two payloads where only one has the expected idempotency key
payload = json.dumps({"headers": {"X-Idempotency-Key": "here"}})
payload_without = json.dumps({"headers": {}})
# WHEN
# we make one request with an idempotency key
first_execution, _ = data_fetcher.get_lambda_response(lambda_arn=optional_idempotency_key_fn_arn, payload=payload)
first_execution_response = first_execution["Payload"].read().decode("utf-8")
# and two others without the idempotency key
second_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=optional_idempotency_key_fn_arn,
payload=payload_without,
)
second_execution_response = second_execution["Payload"].read().decode("utf-8")
third_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=optional_idempotency_key_fn_arn,
payload=payload_without,
)
third_execution_response = third_execution["Payload"].read().decode("utf-8")
# THEN
# we should treat 2nd and 3rd requests with NULL idempotency key as non-idempotent transactions
# that is, no cache, no calls to persistent store, etc.
assert first_execution_response != second_execution_response
assert first_execution_response != third_execution_response
assert second_execution_response != third_execution_response
@pytest.mark.xdist_group(name="idempotency")
def test_response_hook_idempotency(response_hook_handler_fn_arn: str):
# GIVEN
payload = json.dumps({"message": "Powertools for AWS Lambda (Python)"})
# WHEN
# first execution
first_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=response_hook_handler_fn_arn,
payload=payload,
)
first_execution_response = first_execution["Payload"].read().decode("utf-8")
# the second execution should include response hook
second_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=response_hook_handler_fn_arn,
payload=payload,
)
second_execution_response = second_execution["Payload"].read().decode("utf-8")
# THEN first execution should not trigger response hook
# THEN seconde execution must trigger response hook
assert "x-response-hook" not in first_execution_response
assert "x-response-hook" in second_execution_response