Skip to content

feat(idempotency): adding redis as idempotency backend #1914

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions aws_lambda_powertools/utilities/connections/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from aws_lambda_powertools.utilities.connections.redis import (
RedisCluster,
RedisStandalone,
)

__all__ = ["RedisStandalone", "RedisCluster"]
7 changes: 7 additions & 0 deletions aws_lambda_powertools/utilities/connections/base_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from abc import ABC, abstractmethod


class BaseConnectionSync(ABC):
@abstractmethod
def init_connection(self):
raise NotImplementedError() # pragma: no cover
4 changes: 4 additions & 0 deletions aws_lambda_powertools/utilities/connections/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
class RedisConnectionError(Exception):
"""
Redis connection error
"""
133 changes: 133 additions & 0 deletions aws_lambda_powertools/utilities/connections/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import logging
from typing import Optional, Type, Union

import redis

from .base_sync import BaseConnectionSync
from .exceptions import RedisConnectionError

logger = logging.getLogger(__name__)


class RedisConnection(BaseConnectionSync):
def __init__(
self,
client: Type[Union[redis.Redis, redis.RedisCluster]],
host: Optional[str] = None,
port: Optional[int] = None,
username: Optional[str] = None,
password: Optional[str] = None,
db_index: Optional[int] = None,
url: Optional[str] = None,
**extra_options,
) -> None:
self.extra_options: dict = {}

self.url = url
self.host = host
self.port = port
self.username = username
self.password = password
self.db_index = db_index
self.extra_options.update(**extra_options)
self._connection = None
self._client = client

def init_connection(self):
"""
Connection is cached, so returning this
"""
if self._connection:
return self._connection

logger.info(f"Trying to connect to Redis: {self.host}")

try:
if self.url:
logger.debug(f"Using URL format to connect to Redis: {self.host}")
self._connection = self._client.from_url(url=self.url)
else:
logger.debug(f"Using other parameters to connect to Redis: {self.host}")
self._connection = self._client(
host=self.host,
port=self.port,
username=self.username,
password=self.password,
db=self.db_index,
decode_responses=True,
**self.extra_options,
)
except redis.exceptions.ConnectionError as exc:
logger.debug(f"Cannot connect in Redis: {self.host}")
raise RedisConnectionError("Could not to connect to Redis", exc) from exc

return self._connection


class RedisStandalone(RedisConnection):
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
username: Optional[str] = None,
password: Optional[str] = None,
db_index: Optional[int] = None,
url: Optional[str] = None,
**extra_options,
) -> None:
"""
Initialize the Redis standalone client
Parameters
----------
host: str
Name of the host to connect to Redis instance/cluster
port: int
Number of the port to connect to Redis instance/cluster
username: str
Name of the username to connect to Redis instance/cluster in case of using ACL
See: https://redis.io/docs/management/security/acl/
password: str
Passwod to connect to Redis instance/cluster
db_index: int
Index of Redis database
See: https://redis.io/commands/select/
url: str
Redis client object configured from the given URL
See: https://redis.readthedocs.io/en/latest/connections.html#redis.Redis.from_url
"""
super().__init__(redis.Redis, host, port, username, password, db_index, url, **extra_options)


class RedisCluster(RedisConnection):
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
username: Optional[str] = None,
password: Optional[str] = None,
db_index: Optional[int] = None,
url: Optional[str] = None,
**extra_options,
) -> None:
"""
Initialize the Redis standalone client
Parameters
----------
host: str
Name of the host to connect to Redis instance/cluster
port: int
Number of the port to connect to Redis instance/cluster
username: str
Name of the username to connect to Redis instance/cluster in case of using ACL
See: https://redis.io/docs/management/security/acl/
password: str
Passwod to connect to Redis instance/cluster
db_index: int
Index of Redis database
See: https://redis.io/commands/select/
url: str
Redis client object configured from the given URL
See: https://redis.readthedocs.io/en/latest/connections.html#redis.Redis.from_url
"""

super().__init__(redis.cluster.RedisCluster, host, port, username, password, db_index, url, **extra_options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the only difference between the two classes is the first parameter (redis.cluster.RedisCluster vs redis.Redis).

It that's the only difference, wouldn't it make sense to have a single class with a flag for standalone vs cluster instead of 2 different classes?

12 changes: 11 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@
from aws_lambda_powertools.utilities.idempotency.persistence.dynamodb import (
DynamoDBPersistenceLayer,
)
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
RedisCachePersistenceLayer,
)

from .idempotency import IdempotencyConfig, idempotent, idempotent_function

__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent", "idempotent_function", "IdempotencyConfig")
__all__ = (
"DynamoDBPersistenceLayer",
"BasePersistenceLayer",
"idempotent",
"idempotent_function",
"IdempotencyConfig",
"RedisCachePersistenceLayer",
)
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DataRecord:

def __init__(
self,
idempotency_key: str,
idempotency_key: str = "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we want this? It's probably a breaking change to make this optional.

status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
Expand Down Expand Up @@ -116,6 +116,7 @@ class BasePersistenceLayer(ABC):
def __init__(self):
"""Initialize the defaults"""
self.function_name = ""
self.backend = ""
self.configured = False
self.event_key_jmespath: Optional[str] = None
self.event_key_compiled_jmespath = None
Expand Down Expand Up @@ -262,9 +263,12 @@ def _get_expiry_timestamp(self) -> int:
unix timestamp of expiry date for idempotency record

"""
now = datetime.datetime.now()
period = datetime.timedelta(seconds=self.expires_after_seconds)
return int((now + period).timestamp())
if self.backend == "redis":
return self.expires_after_seconds
else:
now = datetime.datetime.now()
period = datetime.timedelta(seconds=self.expires_after_seconds)
return int((now + period).timestamp())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use tell don't ask here. Instead of storing the type of backend, just delegate the get_expiry_timestamp to the different backends, and let them deal with the details.


def _save_to_cache(self, data_record: DataRecord):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DynamoDBPersistenceLayer(BasePersistenceLayer):
def __init__(
self,
table_name: str,
key_attr: str = "id",
key_attr: Optional[str] = "id",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably unecessary

static_pk_value: Optional[str] = None,
sort_key_attr: Optional[str] = None,
expiry_attr: str = "expiration",
Expand Down Expand Up @@ -59,6 +59,8 @@ def __init__(
DynamoDB attribute name for status, by default "status"
data_attr: str, optional
DynamoDB attribute name for response data, by default "data"
validation_key_attr: str, optional
DynamoDB attribute name for hashed representation of the parts of the event used for validation
boto_config: botocore.config.Config, optional
Botocore configuration to pass during client initialization
boto3_session : boto3.session.Session, optional
Expand Down
141 changes: 141 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/persistence/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import datetime
import logging
from typing import Any, Dict, Union

import redis

from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
STATUS_CONSTANTS,
DataRecord,
)

logger = logging.getLogger(__name__)


class RedisCachePersistenceLayer(BasePersistenceLayer):
def __init__(
self,
connection,
in_progress_expiry_attr: str = "in_progress_expiration",
status_attr: str = "status",
data_attr: str = "data",
validation_key_attr: str = "validation",
):
"""
Initialize the Redis Persistence Layer
Parameters
----------
in_progress_expiry_attr: str, optional
Redis hash attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
status_attr: str, optional
Redis hash attribute name for status, by default "status"
data_attr: str, optional
Redis hash attribute name for response data, by default "data"
validation_key_attr: str, optional
Redis hash attribute name for hashed representation of the parts of the event used for validation
"""

# Initialize connection with Redis
self._connection: Union[redis.Redis, redis.RedisCluster] = connection.init_connection()

self.in_progress_expiry_attr = in_progress_expiry_attr
self.status_attr = status_attr
self.data_attr = data_attr
self.validation_key_attr = validation_key_attr
super(RedisCachePersistenceLayer, self).__init__()

def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
return DataRecord(
status=item[self.status_attr],
in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr),
response_data=item.get(self.data_attr),
payload_hash=item.get(self.validation_key_attr),
)

def _get_record(self, idempotency_key) -> DataRecord:
# See: https://redis.io/commands/hgetall/
response = self._connection.hgetall(idempotency_key)

try:
item = response
except KeyError:
raise IdempotencyItemNotFoundError
return self._item_to_data_record(item)

def _put_record(self, data_record: DataRecord) -> None:
item: Dict[str, Any] = {}

# Redis works with hset to support hashing keys with multiple attributes
# See: https://redis.io/commands/hset/
item = {
"name": data_record.idempotency_key,
"mapping": {
self.status_attr: data_record.status,
},
}

if data_record.in_progress_expiry_timestamp is not None:
item["mapping"][self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp

if self.payload_validation_enabled:
item["mapping"][self.validation_key_attr] = data_record.payload_hash

now = datetime.datetime.now()
try:
# | LOCKED | RETRY if status = "INPROGRESS" | RETRY
# |----------------|-------------------------------------------------------|-------------> .... (time)
# | Lambda Idempotency Record
# | Timeout Timeout
# | (in_progress_expiry) (expiry)

# Conditions to successfully save a record:

# The idempotency key does not exist:
# - first time that this invocation key is used
# - previous invocation with the same key was deleted due to TTL
idempotency_record = self._connection.hgetall(data_record.idempotency_key)
if len(idempotency_record) > 0:
# record already exists.

# status is completed, so raise exception because it exists and still valid
if idempotency_record[self.status_attr] == STATUS_CONSTANTS["COMPLETED"]:
raise

# checking if in_progress_expiry_attr exists
# if in_progress_expiry_attr exist, must be lower than now
if self.in_progress_expiry_attr in idempotency_record and int(
idempotency_record[self.in_progress_expiry_attr]
) > int(now.timestamp() * 1000):
raise

logger.debug(f"Putting record on Redis for idempotency key: {data_record.idempotency_key}")
self._connection.hset(**item)
# hset type must set expiration after adding the record
# Need to review this to get ttl in seconds
self._connection.expire(name=data_record.idempotency_key, time=self.expires_after_seconds)
except Exception:
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
raise IdempotencyItemAlreadyExistsError

def _update_record(self, data_record: DataRecord) -> None:
item: Dict[str, Any] = {}

item = {
"name": data_record.idempotency_key,
"mapping": {
self.data_attr: data_record.response_data,
self.status_attr: data_record.status,
},
}
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
self._connection.hset(**item)

def _delete_record(self, data_record: DataRecord) -> None:
logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}")
# See: https://redis.io/commands/del/
self._connection.delete(data_record.idempotency_key)
Loading