Skip to content

Commit 2d13a3c

Browse files
Adding e2e tests
1 parent 8d6aec5 commit 2d13a3c

10 files changed

+397
-2
lines changed

tests/e2e/idempotency_redis/__init__.py

Whitespace-only changes.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import pytest
2+
3+
from tests.e2e.idempotency_redis.infrastructure import IdempotencyRedisServerlessStack
4+
5+
6+
@pytest.fixture(autouse=True, scope="package")
7+
def infrastructure():
8+
"""Setup and teardown logic for E2E test infrastructure
9+
10+
Yields
11+
------
12+
Dict[str, str]
13+
CloudFormation Outputs from deployed infrastructure
14+
"""
15+
stack = IdempotencyRedisServerlessStack()
16+
try:
17+
yield stack.deploy()
18+
finally:
19+
stack.delete()
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import os
2+
import time
3+
from concurrent.futures import ThreadPoolExecutor, as_completed
4+
from threading import current_thread
5+
6+
from aws_lambda_powertools.utilities.idempotency import (
7+
RedisCachePersistenceLayer,
8+
idempotent_function,
9+
)
10+
11+
REDIS_HOST = os.getenv("RedisEndpoint", "")
12+
persistence_layer = RedisCachePersistenceLayer(host=REDIS_HOST, port=6379, ssl=True)
13+
threads_count = 2
14+
15+
16+
@idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record")
17+
def record_handler(record):
18+
time_now = time.time()
19+
return {"thread_name": current_thread().name, "time": str(time_now)}
20+
21+
22+
def lambda_handler(event, context):
23+
with ThreadPoolExecutor(max_workers=threads_count) as executor:
24+
futures = [executor.submit(record_handler, **{"record": i}) for i in range(threads_count)]
25+
26+
return [
27+
{"state": future._state, "exception": future.exception(), "output": future.result()}
28+
for future in as_completed(futures)
29+
]
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import os
2+
import uuid
3+
4+
from aws_lambda_powertools.utilities.idempotency import (
5+
IdempotencyConfig,
6+
RedisCachePersistenceLayer,
7+
idempotent,
8+
)
9+
10+
REDIS_HOST = os.getenv("RedisEndpoint", "")
11+
persistence_layer = RedisCachePersistenceLayer(host=REDIS_HOST, port=6379, ssl=True)
12+
config = IdempotencyConfig(event_key_jmespath='headers."X-Idempotency-Key"', use_local_cache=True)
13+
14+
15+
@idempotent(config=config, persistence_store=persistence_layer)
16+
def lambda_handler(event, context):
17+
return {"request": str(uuid.uuid4())}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import os
2+
import time
3+
4+
from aws_lambda_powertools.utilities.idempotency import (
5+
RedisCachePersistenceLayer,
6+
idempotent,
7+
)
8+
9+
REDIS_HOST = os.getenv("RedisEndpoint", "")
10+
persistence_layer = RedisCachePersistenceLayer(host=REDIS_HOST, port=6379, ssl=True)
11+
12+
13+
@idempotent(persistence_store=persistence_layer)
14+
def lambda_handler(event, context):
15+
time.sleep(5)
16+
17+
return event
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import os
2+
import time
3+
4+
from aws_lambda_powertools.utilities.idempotency import (
5+
IdempotencyConfig,
6+
RedisCachePersistenceLayer,
7+
idempotent,
8+
)
9+
10+
REDIS_HOST = os.getenv("RedisEndpoint", "")
11+
persistence_layer = RedisCachePersistenceLayer(host=REDIS_HOST, port=6379, ssl=True)
12+
config = IdempotencyConfig(expires_after_seconds=5)
13+
14+
15+
@idempotent(config=config, persistence_store=persistence_layer)
16+
def lambda_handler(event, context):
17+
time_now = time.time()
18+
19+
return {"time": str(time_now)}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import os
2+
import time
3+
4+
from aws_lambda_powertools.utilities.idempotency import (
5+
IdempotencyConfig,
6+
RedisCachePersistenceLayer,
7+
idempotent,
8+
)
9+
10+
REDIS_HOST = os.getenv("RedisEndpoint", "")
11+
persistence_layer = RedisCachePersistenceLayer(host=REDIS_HOST, port=6379, ssl=True)
12+
config = IdempotencyConfig(expires_after_seconds=1)
13+
14+
15+
@idempotent(config=config, persistence_store=persistence_layer)
16+
def lambda_handler(event, context):
17+
sleep_time: int = event.get("sleep") or 0
18+
time.sleep(sleep_time)
19+
20+
return event
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import time
2+
from typing import Tuple
3+
4+
from aws_cdk import aws_ec2 as ec2
5+
from aws_cdk.aws_ec2 import (
6+
SecurityGroup,
7+
SubnetType,
8+
Vpc,
9+
)
10+
from aws_cdk.aws_elasticache import (
11+
CfnServerlessCache,
12+
)
13+
14+
from tests.e2e.utils.data_builder import build_random_value
15+
from tests.e2e.utils.infrastructure import BaseInfrastructure
16+
17+
18+
class IdempotencyRedisServerlessStack(BaseInfrastructure):
19+
def create_resources(self) -> None:
20+
service_name = build_random_value(10)
21+
22+
vpc_stack: Vpc = self._create_vpc(service_name, "172.150.0.0/16")
23+
security_groups: Tuple = self._create_security_groups(vpc_stack)
24+
redis_cluster: CfnServerlessCache = self._create_redis_cache(service_name, vpc_stack, security_groups[0])
25+
26+
env_vars = {"RedisEndpoint": f"{str(redis_cluster.attr_endpoint_address)}"}
27+
28+
self.create_lambda_functions(
29+
function_props={
30+
"environment": env_vars,
31+
"vpc": vpc_stack,
32+
"security_groups": [security_groups[1]],
33+
},
34+
)
35+
36+
def _create_vpc(self, service_name: str, cidr: str) -> Vpc:
37+
vpc_stack: Vpc = Vpc(
38+
self.stack,
39+
"VPC-ServerlessCache",
40+
nat_gateways=1,
41+
vpc_name=f"VPC-ServerlessCache-{service_name}",
42+
ip_addresses=ec2.IpAddresses.cidr(cidr),
43+
subnet_configuration=[
44+
ec2.SubnetConfiguration(name="public", subnet_type=SubnetType.PUBLIC, cidr_mask=24),
45+
ec2.SubnetConfiguration(name="private", subnet_type=SubnetType.PRIVATE_WITH_EGRESS, cidr_mask=24),
46+
],
47+
max_azs=2,
48+
)
49+
50+
return vpc_stack
51+
52+
def _create_security_groups(self, vpc_stack: Vpc) -> Tuple[SecurityGroup, SecurityGroup]:
53+
# Create a security group for the ElastiCache cluster
54+
cache_security_group: SecurityGroup = SecurityGroup(self.stack, "ElastiCacheSecurityGroup", vpc=vpc_stack)
55+
cache_security_group.add_ingress_rule(
56+
peer=ec2.Peer.ipv4(vpc_stack.vpc_cidr_block),
57+
connection=ec2.Port.tcp(6379),
58+
description="Allow inbound traffic from VPC",
59+
)
60+
61+
lambda_security_group = SecurityGroup(
62+
self.stack,
63+
"LambdaSecurityGroup",
64+
vpc=vpc_stack,
65+
allow_all_ipv6_outbound=True,
66+
allow_all_outbound=True,
67+
)
68+
69+
return cache_security_group, lambda_security_group
70+
71+
def _create_redis_cache(
72+
self,
73+
service_name: str,
74+
vpc_stack: Vpc,
75+
cache_security_group: SecurityGroup,
76+
) -> CfnServerlessCache:
77+
cache_cluster = CfnServerlessCache(
78+
self.stack,
79+
"ElastiCacheCluster",
80+
engine="redis",
81+
security_group_ids=[cache_security_group.security_group_id],
82+
subnet_ids=[subnet.subnet_id for subnet in vpc_stack.private_subnets],
83+
serverless_cache_name=f"Cache-{service_name}",
84+
)
85+
86+
# Just to make sure the Cluster will be ready before the Stack is complete
87+
while cache_cluster.attr_status == "CREATING":
88+
print("Waiting for ElastiCache serverless to be created...")
89+
time.sleep(5)
90+
91+
return cache_cluster

0 commit comments

Comments
 (0)