forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathusing_redis_client_with_local_certs.py
38 lines (29 loc) · 1.23 KB
/
using_redis_client_with_local_certs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from __future__ import annotations
from typing import Any
from redis import Redis
from aws_lambda_powertools.shared.functions import abs_lambda_path
from aws_lambda_powertools.utilities import parameters
from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
RedisCachePersistenceLayer,
)
redis_values: dict[str, Any] = parameters.get_secret("redis_info", transform="json") # (1)!
redis_client = Redis(
host=redis_values.get("REDIS_HOST", "localhost"),
port=redis_values.get("REDIS_PORT", 6379),
password=redis_values.get("REDIS_PASSWORD"),
decode_responses=True,
socket_timeout=10.0,
ssl=True,
retry_on_timeout=True,
ssl_certfile=f"{abs_lambda_path()}/certs/redis_user.crt", # (2)!
ssl_keyfile=f"{abs_lambda_path()}/certs/redis_user_private.key", # (3)!
ssl_ca_certs=f"{abs_lambda_path()}/certs/redis_ca.pem", # (4)!
)
persistence_layer = RedisCachePersistenceLayer(client=redis_client)
config = IdempotencyConfig(
expires_after_seconds=2 * 60, # 2 minutes
)
@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event, context):
return {"message": "Hello"}