Skip to content

feat(Idempotency): add feature for manipulating idempotent responses #4037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
3 changes: 3 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Utility for adding idempotency to lambda functions
"""

from aws_lambda_powertools.utilities.idempotency.config import IdempotentHookData, IdempotentHookFunction
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
BasePersistenceLayer,
)
Expand All @@ -17,4 +18,6 @@
"idempotent",
"idempotent_function",
"IdempotencyConfig",
"IdempotentHookData",
"IdempotentHookFunction",
)
10 changes: 9 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
from copy import deepcopy
from typing import Any, Callable, Dict, Optional, Tuple

from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.config import (
IdempotencyConfig,
IdempotentHookData,
)
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyAlreadyInProgressError,
IdempotencyInconsistentStateError,
Expand Down Expand Up @@ -227,6 +230,11 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Any]:
)
response_dict: Optional[dict] = data_record.response_json_as_dict()
if response_dict is not None:
if self.config.response_hook is not None:
return self.config.response_hook(
self.output_serializer.from_dict(response_dict),
IdempotentHookData(data_record),
)
return self.output_serializer.from_dict(response_dict)
return None

Expand Down
43 changes: 42 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,45 @@
from typing import Dict, Optional
from typing import Any, Dict, Optional

from aws_lambda_powertools.shared.types import Protocol
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import DataRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


class IdempotentHookData:
"""
Idempotent Hook Data

Contains data relevant to the current Idempotent record which matches the current request.
All IdempotentHook functions will be passed this data as well as the current Response.
"""

def __init__(self, data_record: DataRecord) -> None:
self._idempotency_key = data_record.idempotency_key
self._status = data_record.status
self._expiry_timestamp = data_record.expiry_timestamp

@property
def idempotency_key(self) -> str:
return self._idempotency_key

@property
def status(self) -> str:
return self._status

@property
def expiry_timestamp(self) -> Optional[int]:
return self._expiry_timestamp


class IdempotentHookFunction(Protocol):
"""
The IdempotentHookFunction.
This class defines the calling signature for IdempotentHookFunction callbacks.
"""

def __call__(self, response: Any, idempotent_data: IdempotentHookData) -> Any: ...


class IdempotencyConfig:
def __init__(
self,
Expand All @@ -15,6 +52,7 @@ def __init__(
local_cache_max_items: int = 256,
hash_function: str = "md5",
lambda_context: Optional[LambdaContext] = None,
response_hook: Optional[IdempotentHookFunction] = None,
):
"""
Initialize the base persistence layer
Expand All @@ -37,6 +75,8 @@ def __init__(
Function to use for calculating hashes, by default md5.
lambda_context: LambdaContext, optional
Lambda Context containing information about the invocation, function and execution environment.
response_hook: IdempotentHookFunction, optional
Hook function to be called when an idempotent response is returned from the idempotent store.
"""
self.event_key_jmespath = event_key_jmespath
self.payload_validation_jmespath = payload_validation_jmespath
Expand All @@ -47,6 +87,7 @@ def __init__(
self.local_cache_max_items = local_cache_max_items
self.hash_function = hash_function
self.lambda_context: Optional[LambdaContext] = lambda_context
self.response_hook: Optional[IdempotentHookFunction] = response_hook

def register_lambda_context(self, lambda_context: LambdaContext):
"""Captures the Lambda context, to calculate the remaining time before the invocation times out"""
Expand Down
39 changes: 28 additions & 11 deletions docs/utilities/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ We currently support Amazon DynamoDB and Redis as a storage layer. The following
If you're not [changing the default configuration for the DynamoDB persistence layer](#dynamodbpersistencelayer), this is the expected default configuration:

| Configuration | Value | Notes |
| ------------------ | ------------ | ----------------------------------------------------------------------------------- |
| Partition key | `id` |
| ------------------ | ------------ |-------------------------------------------------------------------------------------|
| Partition key | `id` | |
| TTL attribute name | `expiration` | This can only be configured after your table is created if you're using AWS Console |

???+ tip "Tip: You can share a single state table for all functions"
Expand Down Expand Up @@ -699,15 +699,16 @@ For advanced configurations, such as setting up SSL certificates or customizing

Idempotent decorator can be further configured with **`IdempotencyConfig`** as seen in the previous example. These are the available options for further configuration

| Parameter | Default | Description |
| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **event_key_jmespath** | `""` | JMESPath expression to extract the idempotency key from the event record using [built-in functions](./jmespath_functions.md#built-in-jmespath-functions){target="_blank"} |
| **payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload |
| **raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request |
| **expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired |
| **use_local_cache** | `False` | Whether to locally cache idempotency results |
| **local_cache_max_items** | 256 | Max number of items to store in local cache |
| **hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html){target="_blank" rel="nofollow"} in the standard library. |
| Parameter | Default | Description |
|---------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **event_key_jmespath** | `""` | JMESPath expression to extract the idempotency key from the event record using [built-in functions](./jmespath_functions.md#built-in-jmespath-functions){target="_blank"} |
| **payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload |
| **raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request |
| **expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired |
| **use_local_cache** | `False` | Whether to locally cache idempotency results |
| **local_cache_max_items** | 256 | Max number of items to store in local cache |
| **hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html){target="_blank" rel="nofollow"} in the standard library. |
| **response_hook** | `None` | Function to use for processing the stored Idempotent response. This function hook is called when an already returned response is found. See [Modifying The Idempotent Response](idempotency.md#modifying-the-idempotent-repsonse) |

### Handling concurrent executions with the same payload

Expand Down Expand Up @@ -909,6 +910,22 @@ You can create your own persistent store from scratch by inheriting the `BasePer

For example, the `_put_record` method needs to raise an exception if a non-expired record already exists in the data store with a matching key.

### Modifying the Idempotent Repsonse

The IdempotentConfig allows you to specify a _**response_hook**_ which is a function that will be called when an idempotent response is loaded from the PersistenceStore.

You can provide the response_hook using _**IdempotentConfig**_.

=== "Using an Idempotent Response Hook"

```python hl_lines="10-15 19"
--8<-- "examples/idempotency/src/working_with_response_hook.py"
```

???+ info "Info: Using custom de-serialization?"

The response_hook is called after the custom de-serialization so the payload you process will be the de-serialized version.

## Compatibility with other utilities

### Batch
Expand Down
27 changes: 27 additions & 0 deletions examples/idempotency/src/working_with_response_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Dict

from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer,
IdempotencyConfig,
IdempotentHookData,
idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext


def my_response_hook(response: Dict, idempotent_data: IdempotentHookData) -> Dict:
# How to add a field to the response
response["is_idempotent_response"] = True

# Must return the response here
return response


persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")

config = IdempotencyConfig(event_key_jmespath="body", response_hook=my_response_hook)


@idempotent(persistence_store=persistence_layer, config=config)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
return event
156 changes: 122 additions & 34 deletions tests/functional/idempotency/test_idempotency.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import copy
import datetime
import warnings
from unittest.mock import MagicMock
from typing import Any
from unittest.mock import MagicMock, Mock

import jmespath
import pytest
Expand All @@ -26,6 +27,7 @@
IdempotencyHandler,
_prepare_data,
)
from aws_lambda_powertools.utilities.idempotency.config import IdempotentHookData
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyAlreadyInProgressError,
IdempotencyInconsistentStateError,
Expand Down Expand Up @@ -240,6 +242,39 @@ def lambda_handler(event, context):
stubber.deactivate()


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True)
def test_idempotent_lambda_expired(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
lambda_response,
expected_params_update_item,
expected_params_put_item,
lambda_context,
):
"""
Test idempotent decorator when lambda is called with an event it successfully handled already, but outside of the
expiry window
"""

stubber = stub.Stubber(persistence_store.client)

ddb_response = {}

stubber.add_response("put_item", ddb_response, expected_params_put_item)
stubber.add_response("update_item", ddb_response, expected_params_update_item)
stubber.activate()

@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response

lambda_handler(lambda_apigw_event, lambda_context)

stubber.assert_no_pending_responses()
stubber.deactivate()


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": True}], indirect=True)
def test_idempotent_lambda_first_execution_cached(
idempotency_config: IdempotencyConfig,
Expand Down Expand Up @@ -324,39 +359,6 @@ def lambda_handler(event, context):
stubber.deactivate()


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True)
def test_idempotent_lambda_expired(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
lambda_response,
expected_params_update_item,
expected_params_put_item,
lambda_context,
):
"""
Test idempotent decorator when lambda is called with an event it successfully handled already, but outside of the
expiry window
"""

stubber = stub.Stubber(persistence_store.client)

ddb_response = {}

stubber.add_response("put_item", ddb_response, expected_params_put_item)
stubber.add_response("update_item", ddb_response, expected_params_update_item)
stubber.activate()

@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response

lambda_handler(lambda_apigw_event, lambda_context)

stubber.assert_no_pending_responses()
stubber.deactivate()


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True)
def test_idempotent_lambda_exception(
idempotency_config: IdempotencyConfig,
Expand Down Expand Up @@ -1986,3 +1988,89 @@ def lambda_handler(event, context):

# THEN we should not cache a transaction that failed validation
assert cache_spy.call_count == 0


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True)
def test_responsehook_lambda_first_execution(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
expected_params_update_item,
expected_params_put_item,
lambda_response,
lambda_context,
):
"""
Test response_hook is not called for the idempotent decorator when lambda is executed
with an event with a previously unknown event key
"""

idempotent_response_hook = Mock()

stubber = stub.Stubber(persistence_store.client)
ddb_response = {}

stubber.add_response("put_item", ddb_response, expected_params_put_item)
stubber.add_response("update_item", ddb_response, expected_params_update_item)
stubber.activate()

idempotency_config.response_hook = idempotent_response_hook

@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response

lambda_handler(lambda_apigw_event, lambda_context)

stubber.assert_no_pending_responses()
stubber.deactivate()

assert not idempotent_response_hook.called


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True)
def test_idempotent_lambda_already_completed_response_hook_is_called(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
timestamp_future,
hashed_idempotency_key,
serialized_lambda_response,
deserialized_lambda_response,
lambda_context,
):
"""
Test idempotent decorator where event with matching event key has already been successfully processed
"""

def idempotent_response_hook(response: Any, idempotent_data: IdempotentHookData) -> Any:
"""Modify the response provided by adding a new key"""
response["idempotent_response"] = True

return response

idempotency_config.response_hook = idempotent_response_hook

stubber = stub.Stubber(persistence_store.client)
ddb_response = {
"Item": {
"id": {"S": hashed_idempotency_key},
"expiration": {"N": timestamp_future},
"data": {"S": serialized_lambda_response},
"status": {"S": "COMPLETED"},
},
}
stubber.add_client_error("put_item", "ConditionalCheckFailedException", modeled_fields=ddb_response)
stubber.activate()

@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
raise Exception

lambda_resp = lambda_handler(lambda_apigw_event, lambda_context)

# Then idempotent_response value will be added to the response
assert lambda_resp["idempotent_response"]

stubber.assert_no_pending_responses()
stubber.deactivate()