From e55e2283d92fb881cbae8ad8c161dc75bdbddea1 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Fri, 19 Feb 2021 21:10:24 +0100 Subject: [PATCH 1/7] fix: correct behaviour to avoid caching "INPROGRESS" records --- .../utilities/idempotency/persistence/base.py | 11 ++++----- .../idempotency/test_idempotency.py | 23 ++++++++++--------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/base.py b/aws_lambda_powertools/utilities/idempotency/persistence/base.py index c9751b0ca12..c06c95664eb 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/base.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/base.py @@ -243,7 +243,7 @@ def _save_to_cache(self, data_record: DataRecord): def _retrieve_from_cache(self, idempotency_key: str): cached_record = self._cache.get(idempotency_key) if cached_record: - if not cached_record.is_expired: + if all((not cached_record.is_expired, not cached_record.status == "INPROGRESS")): return cached_record logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}") self._delete_from_cache(idempotency_key) @@ -305,11 +305,6 @@ def save_inprogress(self, event: Dict[str, Any]) -> None: self._put_record(data_record) - # This has to come after _put_record. If _put_record call raises ItemAlreadyExists we shouldn't populate the - # cache with an "INPROGRESS" record as we don't know the status in the data store at this point. - if self.use_local_cache: - self._save_to_cache(data_record) - def delete_record(self, event: Dict[str, Any], exception: Exception): """ Delete record from the persistence store @@ -365,7 +360,9 @@ def get_record(self, event: Dict[str, Any]) -> DataRecord: record = self._get_record(idempotency_key) - if self.use_local_cache: + # We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the + # execution environment + if self.use_local_cache and not record.status == STATUS_CONSTANTS["INPROGRESS"]: self._save_to_cache(data_record=record) self._validate_payload(event, record) diff --git a/tests/functional/idempotency/test_idempotency.py b/tests/functional/idempotency/test_idempotency.py index e6e64e3b38b..643ffc19dd0 100644 --- a/tests/functional/idempotency/test_idempotency.py +++ b/tests/functional/idempotency/test_idempotency.py @@ -133,6 +133,12 @@ def test_idempotent_lambda_in_progress_with_cache( stubber.add_client_error("put_item", "ConditionalCheckFailedException") stubber.add_response("get_item", ddb_response, expected_params) + + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_response("get_item", copy.deepcopy(ddb_response), copy.deepcopy(expected_params)) + + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_response("get_item", copy.deepcopy(ddb_response), copy.deepcopy(expected_params)) stubber.activate() @idempotent(persistence_store=persistence_store) @@ -151,11 +157,8 @@ def lambda_handler(event, context): assert retrieve_from_cache_spy.call_count == 2 * loops retrieve_from_cache_spy.assert_called_with(idempotency_key=hashed_idempotency_key) - assert save_to_cache_spy.call_count == 1 - first_call_args_data_record = save_to_cache_spy.call_args_list[0].kwargs["data_record"] - assert first_call_args_data_record.idempotency_key == hashed_idempotency_key - assert first_call_args_data_record.status == "INPROGRESS" - assert persistence_store._cache.get(hashed_idempotency_key) + save_to_cache_spy.assert_not_called() + assert persistence_store._cache.get(hashed_idempotency_key) is None stubber.assert_no_pending_responses() stubber.deactivate() @@ -223,12 +226,10 @@ def lambda_handler(event, context): lambda_handler(lambda_apigw_event, {}) - assert retrieve_from_cache_spy.call_count == 1 - assert save_to_cache_spy.call_count == 2 - first_call_args, second_call_args = save_to_cache_spy.call_args_list - assert first_call_args.args[0].status == "INPROGRESS" - assert second_call_args.args[0].status == "COMPLETED" - assert persistence_store._cache.get(hashed_idempotency_key) + retrieve_from_cache_spy.assert_called_once() + save_to_cache_spy.assert_called_once() + assert save_to_cache_spy.call_args[0][0].status == "COMPLETED" + assert persistence_store._cache.get(hashed_idempotency_key).status == "COMPLETED" # This lambda call should not call AWS API lambda_handler(lambda_apigw_event, {}) From 197dc0352cb0230635da7ca2434b3d6b596b9c64 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Fri, 19 Feb 2021 21:48:40 +0100 Subject: [PATCH 2/7] docs: add beta flag to utility for initial release(s) --- docs/utilities/idempotency.md | 57 ++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index 1c6555088d9..6bc7457d603 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -3,8 +3,11 @@ title: Idempotency description: Utility --- -This utility provides a simple solution to convert your Lambda functions into idempotent operations which are safe to -retry. +!!! attention + **This utility is currently in beta**. Please open an [issue in GitHub](https://github.com/awslabs/aws-lambda-powertools-python/issues/new/choose) for any bugs or feature requests. + +The idempotency utility provides a simple solution to convert your Lambda functions into idempotent operations which +are safe to retry. ## Terminology @@ -31,31 +34,31 @@ storage layer, so you'll need to create a table first. > Example using AWS Serverless Application Model (SAM) === "template.yml" -```yaml -Resources: - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - Runtime: python3.8 - ... - Policies: - - DynamoDBCrudPolicy: - TableName: !Ref IdempotencyTable - IdempotencyTable: - Type: AWS::DynamoDB::Table - Properties: - AttributeDefinitions: - - AttributeName: id - AttributeType: S - BillingMode: PAY_PER_REQUEST - KeySchema: - - AttributeName: id - KeyType: HASH - TableName: "IdempotencyTable" - TimeToLiveSpecification: - AttributeName: expiration - Enabled: true -``` + ```yaml + Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Runtime: python3.8 + ... + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref IdempotencyTable + IdempotencyTable: + Type: AWS::DynamoDB::Table + Properties: + AttributeDefinitions: + - AttributeName: id + AttributeType: S + BillingMode: PAY_PER_REQUEST + KeySchema: + - AttributeName: id + KeyType: HASH + TableName: "IdempotencyTable" + TimeToLiveSpecification: + AttributeName: expiration + Enabled: true + ``` !!! warning When using this utility with DynamoDB, your lambda responses must always be smaller than 400kb. Larger items cannot From bdfae47b73694f3b5c9f6261ffcb68329111bd16 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Sat, 20 Feb 2021 21:10:57 +0100 Subject: [PATCH 3/7] chore: Change STATUS_CONSTANTS to MappingProxyType --- .../utilities/idempotency/persistence/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/base.py b/aws_lambda_powertools/utilities/idempotency/persistence/base.py index c06c95664eb..2ae51f0b000 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/base.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/base.py @@ -7,6 +7,7 @@ import json import logging from abc import ABC, abstractmethod +from types import MappingProxyType from typing import Any, Dict import jmespath @@ -21,7 +22,7 @@ logger = logging.getLogger(__name__) -STATUS_CONSTANTS = {"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"} +STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"}) class DataRecord: From 359151c9e56c3a7c505ccf15ee3f5bb04d202fea Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Sat, 20 Feb 2021 21:11:44 +0100 Subject: [PATCH 4/7] chore: Fix docstrings --- .../utilities/idempotency/persistence/base.py | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/base.py b/aws_lambda_powertools/utilities/idempotency/persistence/base.py index 2ae51f0b000..d6eb6533e9d 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/base.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/base.py @@ -215,7 +215,7 @@ def _validate_payload(self, lambda_event: Dict[str, Any], data_record: DataRecor DataRecord instance Raises - ______ + ---------- IdempotencyValidationError Event payload doesn't match the stored record for the given idempotency key @@ -239,6 +239,25 @@ def _get_expiry_timestamp(self) -> int: return int((now + period).timestamp()) def _save_to_cache(self, data_record: DataRecord): + """ + Save data_record to local cache except when status is "INPROGRESS" + + NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the + execution environment + + Parameters + ---------- + data_record: DataRecord + DataRecord instance + + Returns + ------- + + """ + if not self.use_local_cache: + return + if data_record.status == STATUS_CONSTANTS["INPROGRESS"]: + return self._cache[data_record.idempotency_key] = data_record def _retrieve_from_cache(self, idempotency_key: str): @@ -254,7 +273,7 @@ def _delete_from_cache(self, idempotency_key: str): def save_success(self, event: Dict[str, Any], result: dict) -> None: """ - Save record of function's execution completing succesfully + Save record of function's execution completing successfully Parameters ---------- From 3876721edd2377ce5d64c466512b1f43da0bd813 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Sat, 20 Feb 2021 21:16:28 +0100 Subject: [PATCH 5/7] chore: readability improvements --- .../utilities/idempotency/persistence/base.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/base.py b/aws_lambda_powertools/utilities/idempotency/persistence/base.py index d6eb6533e9d..1e30bbb4fbf 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/base.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/base.py @@ -82,8 +82,7 @@ def status(self) -> str: """ if self.is_expired: return STATUS_CONSTANTS["EXPIRED"] - - if self._status in STATUS_CONSTANTS.values(): + elif self._status in STATUS_CONSTANTS.values(): return self._status else: raise IdempotencyInvalidStatusError(self._status) @@ -222,7 +221,7 @@ def _validate_payload(self, lambda_event: Dict[str, Any], data_record: DataRecor """ if self.payload_validation_enabled: lambda_payload_hash = self._get_hashed_payload(lambda_event) - if not data_record.payload_hash == lambda_payload_hash: + if data_record.payload_hash != lambda_payload_hash: raise IdempotencyValidationError("Payload does not match stored record for this event key") def _get_expiry_timestamp(self) -> int: @@ -263,7 +262,7 @@ def _save_to_cache(self, data_record: DataRecord): def _retrieve_from_cache(self, idempotency_key: str): cached_record = self._cache.get(idempotency_key) if cached_record: - if all((not cached_record.is_expired, not cached_record.status == "INPROGRESS")): + if not cached_record.is_expired: return cached_record logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}") self._delete_from_cache(idempotency_key) From a3cdef8705b2a40b973439089276361e4672373a Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Sat, 20 Feb 2021 21:18:33 +0100 Subject: [PATCH 6/7] chore: move cache conditionals inside of cache methods --- .../utilities/idempotency/persistence/base.py | 32 ++++++++--------- .../idempotency/test_idempotency.py | 34 ++++++++++++++++++- 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/base.py b/aws_lambda_powertools/utilities/idempotency/persistence/base.py index 1e30bbb4fbf..c3183e0df84 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/base.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/base.py @@ -260,6 +260,8 @@ def _save_to_cache(self, data_record: DataRecord): self._cache[data_record.idempotency_key] = data_record def _retrieve_from_cache(self, idempotency_key: str): + if not self.use_local_cache: + return cached_record = self._cache.get(idempotency_key) if cached_record: if not cached_record.is_expired: @@ -268,6 +270,8 @@ def _retrieve_from_cache(self, idempotency_key: str): self._delete_from_cache(idempotency_key) def _delete_from_cache(self, idempotency_key: str): + if not self.use_local_cache: + return del self._cache[idempotency_key] def save_success(self, event: Dict[str, Any], result: dict) -> None: @@ -296,8 +300,7 @@ def save_success(self, event: Dict[str, Any], result: dict) -> None: ) self._update_record(data_record=data_record) - if self.use_local_cache: - self._save_to_cache(data_record) + self._save_to_cache(data_record) def save_inprogress(self, event: Dict[str, Any]) -> None: """ @@ -317,10 +320,8 @@ def save_inprogress(self, event: Dict[str, Any]) -> None: logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}") - if self.use_local_cache: - cached_record = self._retrieve_from_cache(idempotency_key=data_record.idempotency_key) - if cached_record: - raise IdempotencyItemAlreadyExistsError + if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key): + raise IdempotencyItemAlreadyExistsError self._put_record(data_record) @@ -343,8 +344,7 @@ def delete_record(self, event: Dict[str, Any], exception: Exception): ) self._delete_record(data_record) - if self.use_local_cache: - self._delete_from_cache(data_record.idempotency_key) + self._delete_from_cache(data_record.idempotency_key) def get_record(self, event: Dict[str, Any]) -> DataRecord: """ @@ -370,19 +370,15 @@ def get_record(self, event: Dict[str, Any]) -> DataRecord: idempotency_key = self._get_hashed_idempotency_key(event) - if self.use_local_cache: - cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key) - if cached_record: - logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}") - self._validate_payload(event, cached_record) - return cached_record + cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key) + if cached_record: + logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}") + self._validate_payload(event, cached_record) + return cached_record record = self._get_record(idempotency_key) - # We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the - # execution environment - if self.use_local_cache and not record.status == STATUS_CONSTANTS["INPROGRESS"]: - self._save_to_cache(data_record=record) + self._save_to_cache(data_record=record) self._validate_payload(event, record) return record diff --git a/tests/functional/idempotency/test_idempotency.py b/tests/functional/idempotency/test_idempotency.py index 643ffc19dd0..269ab6f9b33 100644 --- a/tests/functional/idempotency/test_idempotency.py +++ b/tests/functional/idempotency/test_idempotency.py @@ -157,7 +157,7 @@ def lambda_handler(event, context): assert retrieve_from_cache_spy.call_count == 2 * loops retrieve_from_cache_spy.assert_called_with(idempotency_key=hashed_idempotency_key) - save_to_cache_spy.assert_not_called() + save_to_cache_spy.assert_called() assert persistence_store._cache.get(hashed_idempotency_key) is None stubber.assert_no_pending_responses() @@ -595,3 +595,35 @@ def test_data_record_invalid_status_value(): _ = data_record.status assert e.value.args[0] == "UNSUPPORTED_STATUS" + + +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": True}], indirect=True) +def test_in_progress_never_saved_to_cache(persistence_store): + # GIVEN a data record with status "INPROGRESS" + # and persistence_store has use_local_cache = True + data_record = DataRecord("key", status="INPROGRESS") + + # WHEN saving to local cache + persistence_store._save_to_cache(data_record) + + # THEN don't save to local cache + assert persistence_store._cache.get("key") is None + + +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}], indirect=True) +def test_user_local_disabled(persistence_store): + # GIVEN a persistence_store with use_local_cache = False + + # WHEN calling any local cache options + data_record = DataRecord("key", status="COMPLETED") + try: + persistence_store._save_to_cache(data_record) + cache_value = persistence_store._retrieve_from_cache("key") + assert cache_value is None + persistence_store._delete_from_cache("key") + except AttributeError as e: + pytest.fail(f"AttributeError should not be raised: {e}") + + # THEN raise AttributeError + # AND don't have a _cache attribute + assert not hasattr("persistence_store", "_cache") From a7f81990cb82e60ccd96f8d2f971cb24796256f8 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Sat, 20 Feb 2021 21:19:07 +0100 Subject: [PATCH 7/7] chore: add test for unhandled types --- tests/unit/test_json_encoder.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/unit/test_json_encoder.py b/tests/unit/test_json_encoder.py index 8d6a9f3944c..af8de4257a8 100644 --- a/tests/unit/test_json_encoder.py +++ b/tests/unit/test_json_encoder.py @@ -1,6 +1,8 @@ import decimal import json +import pytest + from aws_lambda_powertools.shared.json_encoder import Encoder @@ -12,3 +14,11 @@ def test_jsonencode_decimal(): def test_jsonencode_decimal_nan(): result = json.dumps({"val": decimal.Decimal("NaN")}, cls=Encoder) assert result == '{"val": NaN}' + + +def test_jsonencode_calls_default(): + class CustomClass: + pass + + with pytest.raises(TypeError): + json.dumps({"val": CustomClass()}, cls=Encoder)