-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathtest_redis_layer.py
229 lines (184 loc) · 7.42 KB
/
test_redis_layer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import copy
import time as t
import pytest
from aws_lambda_powertools.utilities.idempotency import RedisCachePersistenceLayer
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyAlreadyInProgressError,
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
IdempotencyRedisClientConfigError,
)
from aws_lambda_powertools.utilities.idempotency.idempotency import (
idempotent,
idempotent_function,
)
@pytest.fixture
def lambda_context():
class LambdaContext:
def __init__(self):
self.function_name = "test-func"
self.memory_limit_in_mb = 128
self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func"
self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72"
def get_remaining_time_in_millis(self) -> int:
return 1000
return LambdaContext()
class MockRedis:
def __init__(self, decode_responses, cache: dict = None, **kwargs):
self.cache = cache or {}
self.expire_dict = {}
self.decode_responses = decode_responses
self.acl = {}
self.username = ""
def hset(self, name, mapping):
self.expire_dict.pop(name, {})
self.cache[name] = mapping
def from_url(self, url: str):
pass
# not covered by test yet.
def expire(self, name, time):
self.expire_dict[name] = t.time() + time
# return {} if no match
def hgetall(self, name):
if self.expire_dict.get(name, t.time() + 1) < t.time():
self.cache.pop(name, {})
return self.cache.get(name, {})
def get_connection_kwargs(self):
return {"decode_responses": self.decode_responses}
def auth(self, username, **kwargs):
self.username = username
def delete(self, name):
self.cache.pop(name, {})
@pytest.fixture
def persistence_store_standalone_redis():
# you will need to handle yourself the connection to pass again the password
# and avoid AuthenticationError at redis queries
redis_client = MockRedis(
host="localhost",
port="63005",
decode_responses=True,
)
return RedisCachePersistenceLayer(connection=redis_client)
# test basic
def test_idempotent_function_and_lambda_handler_redis_basic(
# idempotency_config: IdempotencyConfig,
persistence_store_standalone_redis: RedisCachePersistenceLayer,
lambda_context,
):
mock_event = {"data": "value"}
persistence_layer = persistence_store_standalone_redis
expected_result = {"message": "Foo"}
@idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record")
def record_handler(record):
return expected_result
@idempotent(persistence_store=persistence_layer)
def lambda_handler(event, context):
return expected_result
# WHEN calling the function
fn_result = record_handler(record=mock_event)
# WHEN calling lambda handler
handler_result = lambda_handler(mock_event, lambda_context)
# THEN we expect the function and lambda handler to execute successfully
assert fn_result == expected_result
assert handler_result == expected_result
def test_idempotent_lambda_redis_no_decode():
redis_client = MockRedis(
host="localhost",
port="63005",
decode_responses=False,
)
# decode_responses=False will not be accepted
with pytest.raises(IdempotencyRedisClientConfigError):
RedisCachePersistenceLayer(connection=redis_client)
def test_idempotent_function_and_lambda_handler_redis_cache(
persistence_store_standalone_redis: RedisCachePersistenceLayer,
lambda_context,
):
mock_event = {"data": "value2"}
persistence_layer = persistence_store_standalone_redis
result = {"message": "Foo"}
expected_result = copy.deepcopy(result)
@idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record")
def record_handler(record):
return result
@idempotent(persistence_store=persistence_layer)
def lambda_handler(event, context):
return result
# WHEN calling the function
fn_result = record_handler(record=mock_event)
# WHEN calling lambda handler
handler_result = lambda_handler(mock_event, lambda_context)
# THEN we expect the function and lambda handler to execute successfully
assert fn_result == expected_result
assert handler_result == expected_result
# modify the return to check if idem cache works
result = {"message": "Bar"}
fn_result2 = record_handler(record=mock_event)
# Second time calling lambda handler, test if same result
handler_result2 = lambda_handler(mock_event, lambda_context)
assert fn_result2 == expected_result
assert handler_result2 == expected_result
# modify the mock event to check if we got updated result
mock_event = {"data": "value3"}
fn_result3 = record_handler(record=mock_event)
# thrid time calling lambda handler, test if result updated
handler_result3 = lambda_handler(mock_event, lambda_context)
assert fn_result3 == result
assert handler_result3 == result
# test idem-inprogress
def test_idempotent_lambda_redis_in_progress(
persistence_store_standalone_redis: RedisCachePersistenceLayer,
lambda_context,
):
"""
Test idempotent decorator where lambda_handler is already processing an event with matching event key
"""
mock_event = {"data": "value4"}
persistence_store = persistence_store_standalone_redis
lambda_response = {"foo": "bar"}
@idempotent(persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response
# register the context first
lambda_handler(mock_event, lambda_context)
# save additional to in_progress
mock_event = {"data": "value7"}
try:
persistence_store.save_inprogress(mock_event, 1000)
except IdempotencyItemAlreadyExistsError:
pass
with pytest.raises(IdempotencyAlreadyInProgressError):
lambda_handler(mock_event, lambda_context)
# test -remove
def test_idempotent_lambda_redis_delete(
persistence_store_standalone_redis: RedisCachePersistenceLayer,
lambda_context,
):
mock_event = {"data": "test_delete"}
persistence_layer = persistence_store_standalone_redis
result = {"message": "Foo"}
@idempotent(persistence_store=persistence_layer)
def lambda_handler(event, _):
return result
handler_result = lambda_handler(mock_event, lambda_context)
assert handler_result == result
# delete the idem and handler should output new result
persistence_layer.delete_record(mock_event, IdempotencyItemNotFoundError)
result = {"message": "Foo2"}
handler_result2 = lambda_handler(mock_event, lambda_context)
assert handler_result2 == result
"""def test_idempotent_lambda_redis_credential(lambda_context):
redis_client = MockRedis(
host='localhost',
port='63005',
decode_responses=True,
)
pwd = "terriblePassword"
usr = "test_acl_denial"
redis_client.acl_setuser(username=usr, enabled=True, passwords="+"+pwd,keys='*',commands=['+hgetall','-set'])
redis_client.auth(password=pwd,username=usr)
@idempotent(persistence_store=RedisCachePersistenceLayer(connection=redis_client))
def lambda_handler(event, _):
return True
with pytest.raises(IdempotencyPersistenceLayerError):
handler_result = lambda_handler("test_Acl", lambda_context)"""