forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_idempotency_dynamodb.py
133 lines (99 loc) · 5.16 KB
/
test_idempotency_dynamodb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import json
from time import sleep
import pytest
from tests.e2e.utils import data_fetcher
from tests.e2e.utils.functions import execute_lambdas_in_parallel
@pytest.fixture
def ttl_cache_expiration_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("TtlCacheExpirationHandlerArn", "")
@pytest.fixture
def ttl_cache_timeout_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("TtlCacheTimeoutHandlerArn", "")
@pytest.fixture
def parallel_execution_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("ParallelExecutionHandlerArn", "")
@pytest.fixture
def function_thread_safety_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("FunctionThreadSafetyHandlerArn", "")
@pytest.fixture
def idempotency_table_name(infrastructure: dict) -> str:
return infrastructure.get("DynamoDBTable", "")
@pytest.mark.xdist_group(name="idempotency")
def test_ttl_caching_expiration_idempotency(ttl_cache_expiration_handler_fn_arn: str):
# GIVEN
payload = json.dumps({"message": "Lambda Powertools - TTL 5s"})
# WHEN
# first execution
first_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
)
first_execution_response = first_execution["Payload"].read().decode("utf-8")
# the second execution should return the same response as the first execution
second_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
)
second_execution_response = second_execution["Payload"].read().decode("utf-8")
# wait 8s to expire ttl and execute again, this should return a new response value
sleep(8)
third_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
)
third_execution_response = third_execution["Payload"].read().decode("utf-8")
# THEN
assert first_execution_response == second_execution_response
assert third_execution_response != second_execution_response
@pytest.mark.xdist_group(name="idempotency")
def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str):
# GIVEN
payload_timeout_execution = json.dumps({"sleep": 5, "message": "Lambda Powertools - TTL 1s"})
payload_working_execution = json.dumps({"sleep": 0, "message": "Lambda Powertools - TTL 1s"})
# WHEN
# first call should fail due to timeout
execution_with_timeout, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_timeout_execution
)
execution_with_timeout_response = execution_with_timeout["Payload"].read().decode("utf-8")
# the second call should work and return the payload
execution_working, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_working_execution
)
execution_working_response = execution_working["Payload"].read().decode("utf-8")
# THEN
assert "Task timed out after" in execution_with_timeout_response
assert payload_working_execution == execution_working_response
@pytest.mark.xdist_group(name="idempotency")
def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str):
# GIVEN
arguments = json.dumps({"message": "Lambda Powertools - Parallel execution"})
# WHEN
# executing Lambdas in parallel
lambdas_arn = [parallel_execution_handler_fn_arn, parallel_execution_handler_fn_arn]
execution_result_list = execute_lambdas_in_parallel("data_fetcher.get_lambda_response", lambdas_arn, arguments)
timeout_execution_response = execution_result_list[0][0]["Payload"].read().decode("utf-8")
error_idempotency_execution_response = execution_result_list[1][0]["Payload"].read().decode("utf-8")
# THEN
assert "Execution already in progress with idempotency key" in error_idempotency_execution_response
assert "Task timed out after" in timeout_execution_response
@pytest.mark.xdist_group(name="idempotency")
def test_idempotent_function_thread_safety(function_thread_safety_handler_fn_arn: str):
# GIVEN
payload = json.dumps({"message": "Lambda Powertools - Idempotent function thread safety check"})
# WHEN
# first execution
first_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=function_thread_safety_handler_fn_arn, payload=payload
)
first_execution_response = first_execution["Payload"].read().decode("utf-8")
# the second execution should return the same response as the first execution
second_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=function_thread_safety_handler_fn_arn, payload=payload
)
second_execution_response = second_execution["Payload"].read().decode("utf-8")
# THEN
# Function threads finished without exception AND
# first and second execution is the same
for function_thread in json.loads(first_execution_response):
assert function_thread["state"] == "FINISHED"
assert function_thread["exception"] is None
assert function_thread["output"] is not None
assert first_execution_response == second_execution_response