-
Notifications
You must be signed in to change notification settings - Fork 421
feat(idempotency): adding redis as idempotency backend #1914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3d5062c
aed821a
ffcf16d
b5c5791
6902e73
504d258
0d1e3e9
fce91e5
3126885
f9c1875
81c141e
5d6b0d0
656096f
038926c
ca9b16d
b8bcfe4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from aws_lambda_powertools.utilities.connections.redis import ( | ||
RedisCluster, | ||
RedisStandalone, | ||
) | ||
|
||
__all__ = ["RedisStandalone", "RedisCluster"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from abc import ABC, abstractmethod | ||
|
||
|
||
class BaseConnectionSync(ABC): | ||
@abstractmethod | ||
def init_connection(self): | ||
raise NotImplementedError() # pragma: no cover |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
class RedisConnectionError(Exception): | ||
""" | ||
Redis connection error | ||
""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import logging | ||
from typing import Optional, Type, Union | ||
|
||
import redis | ||
|
||
from .base_sync import BaseConnectionSync | ||
from .exceptions import RedisConnectionError | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class RedisConnection(BaseConnectionSync): | ||
def __init__( | ||
self, | ||
client: Type[Union[redis.Redis, redis.RedisCluster]], | ||
host: Optional[str] = None, | ||
port: Optional[int] = None, | ||
username: Optional[str] = None, | ||
password: Optional[str] = None, | ||
db_index: Optional[int] = None, | ||
url: Optional[str] = None, | ||
**extra_options, | ||
) -> None: | ||
self.extra_options: dict = {} | ||
|
||
self.url = url | ||
self.host = host | ||
self.port = port | ||
self.username = username | ||
self.password = password | ||
self.db_index = db_index | ||
self.extra_options.update(**extra_options) | ||
self._connection = None | ||
self._client = client | ||
|
||
def init_connection(self): | ||
""" | ||
Connection is cached, so returning this | ||
""" | ||
if self._connection: | ||
return self._connection | ||
|
||
logger.info(f"Trying to connect to Redis: {self.host}") | ||
|
||
try: | ||
if self.url: | ||
logger.debug(f"Using URL format to connect to Redis: {self.host}") | ||
self._connection = self._client.from_url(url=self.url) | ||
else: | ||
logger.debug(f"Using other parameters to connect to Redis: {self.host}") | ||
self._connection = self._client( | ||
host=self.host, | ||
port=self.port, | ||
username=self.username, | ||
password=self.password, | ||
db=self.db_index, | ||
decode_responses=True, | ||
**self.extra_options, | ||
) | ||
except redis.exceptions.ConnectionError as exc: | ||
logger.debug(f"Cannot connect in Redis: {self.host}") | ||
raise RedisConnectionError("Could not to connect to Redis", exc) from exc | ||
|
||
return self._connection | ||
|
||
|
||
class RedisStandalone(RedisConnection): | ||
def __init__( | ||
self, | ||
host: Optional[str] = None, | ||
port: Optional[int] = None, | ||
username: Optional[str] = None, | ||
password: Optional[str] = None, | ||
db_index: Optional[int] = None, | ||
url: Optional[str] = None, | ||
**extra_options, | ||
) -> None: | ||
""" | ||
Initialize the Redis standalone client | ||
Parameters | ||
---------- | ||
host: str | ||
Name of the host to connect to Redis instance/cluster | ||
port: int | ||
Number of the port to connect to Redis instance/cluster | ||
username: str | ||
Name of the username to connect to Redis instance/cluster in case of using ACL | ||
See: https://redis.io/docs/management/security/acl/ | ||
password: str | ||
Passwod to connect to Redis instance/cluster | ||
db_index: int | ||
Index of Redis database | ||
See: https://redis.io/commands/select/ | ||
url: str | ||
Redis client object configured from the given URL | ||
See: https://redis.readthedocs.io/en/latest/connections.html#redis.Redis.from_url | ||
""" | ||
super().__init__(redis.Redis, host, port, username, password, db_index, url, **extra_options) | ||
|
||
|
||
class RedisCluster(RedisConnection): | ||
def __init__( | ||
self, | ||
host: Optional[str] = None, | ||
port: Optional[int] = None, | ||
username: Optional[str] = None, | ||
password: Optional[str] = None, | ||
db_index: Optional[int] = None, | ||
url: Optional[str] = None, | ||
**extra_options, | ||
) -> None: | ||
""" | ||
Initialize the Redis standalone client | ||
Parameters | ||
---------- | ||
host: str | ||
Name of the host to connect to Redis instance/cluster | ||
port: int | ||
Number of the port to connect to Redis instance/cluster | ||
username: str | ||
Name of the username to connect to Redis instance/cluster in case of using ACL | ||
See: https://redis.io/docs/management/security/acl/ | ||
password: str | ||
Passwod to connect to Redis instance/cluster | ||
db_index: int | ||
Index of Redis database | ||
See: https://redis.io/commands/select/ | ||
url: str | ||
Redis client object configured from the given URL | ||
See: https://redis.readthedocs.io/en/latest/connections.html#redis.Redis.from_url | ||
""" | ||
|
||
super().__init__(redis.cluster.RedisCluster, host, port, username, password, db_index, url, **extra_options) | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,7 +37,7 @@ class DataRecord: | |
|
||
def __init__( | ||
self, | ||
idempotency_key: str, | ||
idempotency_key: str = "", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure we want this? It's probably a breaking change to make this optional. |
||
status: str = "", | ||
expiry_timestamp: Optional[int] = None, | ||
in_progress_expiry_timestamp: Optional[int] = None, | ||
|
@@ -116,6 +116,7 @@ class BasePersistenceLayer(ABC): | |
def __init__(self): | ||
"""Initialize the defaults""" | ||
self.function_name = "" | ||
self.backend = "" | ||
self.configured = False | ||
self.event_key_jmespath: Optional[str] = None | ||
self.event_key_compiled_jmespath = None | ||
|
@@ -262,9 +263,12 @@ def _get_expiry_timestamp(self) -> int: | |
unix timestamp of expiry date for idempotency record | ||
|
||
""" | ||
now = datetime.datetime.now() | ||
period = datetime.timedelta(seconds=self.expires_after_seconds) | ||
return int((now + period).timestamp()) | ||
if self.backend == "redis": | ||
return self.expires_after_seconds | ||
else: | ||
now = datetime.datetime.now() | ||
period = datetime.timedelta(seconds=self.expires_after_seconds) | ||
return int((now + period).timestamp()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use tell don't ask here. Instead of storing the type of backend, just delegate the |
||
|
||
def _save_to_cache(self, data_record: DataRecord): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ class DynamoDBPersistenceLayer(BasePersistenceLayer): | |
def __init__( | ||
self, | ||
table_name: str, | ||
key_attr: str = "id", | ||
key_attr: Optional[str] = "id", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably unecessary |
||
static_pk_value: Optional[str] = None, | ||
sort_key_attr: Optional[str] = None, | ||
expiry_attr: str = "expiration", | ||
|
@@ -59,6 +59,8 @@ def __init__( | |
DynamoDB attribute name for status, by default "status" | ||
data_attr: str, optional | ||
DynamoDB attribute name for response data, by default "data" | ||
validation_key_attr: str, optional | ||
DynamoDB attribute name for hashed representation of the parts of the event used for validation | ||
boto_config: botocore.config.Config, optional | ||
Botocore configuration to pass during client initialization | ||
boto3_session : boto3.session.Session, optional | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
import datetime | ||
import logging | ||
from typing import Any, Dict, Union | ||
|
||
import redis | ||
|
||
from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer | ||
from aws_lambda_powertools.utilities.idempotency.exceptions import ( | ||
IdempotencyItemAlreadyExistsError, | ||
IdempotencyItemNotFoundError, | ||
) | ||
from aws_lambda_powertools.utilities.idempotency.persistence.base import ( | ||
STATUS_CONSTANTS, | ||
DataRecord, | ||
) | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class RedisCachePersistenceLayer(BasePersistenceLayer): | ||
def __init__( | ||
self, | ||
connection, | ||
in_progress_expiry_attr: str = "in_progress_expiration", | ||
status_attr: str = "status", | ||
data_attr: str = "data", | ||
validation_key_attr: str = "validation", | ||
): | ||
""" | ||
Initialize the Redis Persistence Layer | ||
Parameters | ||
---------- | ||
in_progress_expiry_attr: str, optional | ||
Redis hash attribute name for in-progress expiry timestamp, by default "in_progress_expiration" | ||
status_attr: str, optional | ||
Redis hash attribute name for status, by default "status" | ||
data_attr: str, optional | ||
Redis hash attribute name for response data, by default "data" | ||
validation_key_attr: str, optional | ||
Redis hash attribute name for hashed representation of the parts of the event used for validation | ||
""" | ||
|
||
# Initialize connection with Redis | ||
self._connection: Union[redis.Redis, redis.RedisCluster] = connection.init_connection() | ||
|
||
self.in_progress_expiry_attr = in_progress_expiry_attr | ||
self.status_attr = status_attr | ||
self.data_attr = data_attr | ||
self.validation_key_attr = validation_key_attr | ||
super(RedisCachePersistenceLayer, self).__init__() | ||
|
||
def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord: | ||
return DataRecord( | ||
status=item[self.status_attr], | ||
in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr), | ||
response_data=item.get(self.data_attr), | ||
payload_hash=item.get(self.validation_key_attr), | ||
) | ||
|
||
def _get_record(self, idempotency_key) -> DataRecord: | ||
# See: https://redis.io/commands/hgetall/ | ||
response = self._connection.hgetall(idempotency_key) | ||
|
||
try: | ||
item = response | ||
except KeyError: | ||
raise IdempotencyItemNotFoundError | ||
return self._item_to_data_record(item) | ||
|
||
def _put_record(self, data_record: DataRecord) -> None: | ||
item: Dict[str, Any] = {} | ||
|
||
# Redis works with hset to support hashing keys with multiple attributes | ||
# See: https://redis.io/commands/hset/ | ||
item = { | ||
"name": data_record.idempotency_key, | ||
"mapping": { | ||
self.status_attr: data_record.status, | ||
}, | ||
} | ||
|
||
if data_record.in_progress_expiry_timestamp is not None: | ||
item["mapping"][self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp | ||
|
||
if self.payload_validation_enabled: | ||
item["mapping"][self.validation_key_attr] = data_record.payload_hash | ||
|
||
now = datetime.datetime.now() | ||
try: | ||
# | LOCKED | RETRY if status = "INPROGRESS" | RETRY | ||
# |----------------|-------------------------------------------------------|-------------> .... (time) | ||
# | Lambda Idempotency Record | ||
# | Timeout Timeout | ||
# | (in_progress_expiry) (expiry) | ||
|
||
# Conditions to successfully save a record: | ||
|
||
# The idempotency key does not exist: | ||
# - first time that this invocation key is used | ||
# - previous invocation with the same key was deleted due to TTL | ||
idempotency_record = self._connection.hgetall(data_record.idempotency_key) | ||
if len(idempotency_record) > 0: | ||
# record already exists. | ||
|
||
# status is completed, so raise exception because it exists and still valid | ||
if idempotency_record[self.status_attr] == STATUS_CONSTANTS["COMPLETED"]: | ||
raise | ||
|
||
# checking if in_progress_expiry_attr exists | ||
# if in_progress_expiry_attr exist, must be lower than now | ||
if self.in_progress_expiry_attr in idempotency_record and int( | ||
idempotency_record[self.in_progress_expiry_attr] | ||
) > int(now.timestamp() * 1000): | ||
raise | ||
|
||
logger.debug(f"Putting record on Redis for idempotency key: {data_record.idempotency_key}") | ||
self._connection.hset(**item) | ||
# hset type must set expiration after adding the record | ||
# Need to review this to get ttl in seconds | ||
self._connection.expire(name=data_record.idempotency_key, time=self.expires_after_seconds) | ||
except Exception: | ||
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}") | ||
raise IdempotencyItemAlreadyExistsError | ||
|
||
def _update_record(self, data_record: DataRecord) -> None: | ||
item: Dict[str, Any] = {} | ||
|
||
item = { | ||
"name": data_record.idempotency_key, | ||
"mapping": { | ||
self.data_attr: data_record.response_data, | ||
self.status_attr: data_record.status, | ||
}, | ||
} | ||
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}") | ||
self._connection.hset(**item) | ||
|
||
def _delete_record(self, data_record: DataRecord) -> None: | ||
logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}") | ||
# See: https://redis.io/commands/del/ | ||
self._connection.delete(data_record.idempotency_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, the only difference between the two classes is the first parameter (
redis.cluster.RedisCluster
vsredis.Redis
).It that's the only difference, wouldn't it make sense to have a single class with a flag for
standalone
vscluster
instead of 2 different classes?