Skip to content

feat(idempotency): support composite primary key in DynamoDBPersistenceLayer #740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def __init__(
self,
table_name: str,
key_attr: str = "id",
key_attr_value: str = "powertools#idempotency",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@heitorlessa, I created this as a Draft PR to get your early feedback on the interface. Once we agree on the ergonomics, I'm happy to update documentation and add examples.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@heitorlessa, I didn't see any feedback so I moved this from Draft to Ready. I would still like to update documentation and examples before merging this PR, and would love some feedback on the API before I take the time to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because DynamoDB IAM resource conditions don't allow you to restrict row level access based on SK, where the function name is known, I'd suggest we change the primary key slightly to: idempotency#function-name.

If they want, for example, to restrict anyone from ever modifying idempotency tokens for X function that has a prod stage or something more specific to their business in the name they now can.

Here are the scenarios I understood by going through the PR, as C is new - please correct me if I misunderstood them.


Scenarios

A) No hash key set.: Use default hash key and value

from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer

# KEY_ATTR="id"
# KEY_ATTR_VALUE="test-func#ec534ff6b4746107270b412333b59f52"
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")

B) Hash key set.: Use new hash key and default value

# KEY_ATTR="idempotency_key"
# KEY_ATTR_VALUE="test-func#ec534ff6b4746107270b412333b59f52"
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable", key_attr="idempotency_key")

C) Hash and Sort key set.: Use new hash and sort key, and default value as sort key

# KEY_ATTR="pk"
# SORT_KEY_ATTR="sk"
# KEY_ATTR_VALUE="idempotency#test-func"
# SORT_KEY_ATTR_VALUE="test-func#ec534ff6b4746107270b412333b59f52"
persistence_layer = DynamoDBPersistenceLayer(
    table_name="IdempotencyTable", 
    key_attr="idempotency_key",
    sort_key_attr="idempotency_key")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@heitorlessa, re the scenarios: yes, C is new. I think there is some copy/pasta in the code, the key_attr and sort_key_attr should match the comments, as in:

# KEY_ATTR="pk"
# SORT_KEY_ATTR="sk"
# KEY_ATTR_VALUE="idempotency#test-func"
# SORT_KEY_ATTR_VALUE="test-func#ec534ff6b4746107270b412333b59f52"
persistence_layer = DynamoDBPersistenceLayer(
    table_name="IdempotencyTable", 
    key_attr="pk",       # <- changed to match comment
    sort_key_attr="sk")  # <- changed to match comment

There is also a related scenario in which the DynamoDBPersistenceLayer passes the desired value for key_attr_value, e.g.:

# KEY_ATTR="pk"
# SORT_KEY_ATTR="sk"
# KEY_ATTR_VALUE="known-static-partition-key-value"
# SORT_KEY_ATTR_VALUE="test-func#ec534ff6b4746107270b412333b59f52"
persistence_layer = DynamoDBPersistenceLayer(
    table_name="IdempotencyTable", 
    key_attr="pk",
    key_attr_value="known-static-partition-key-value",       
    sort_key_attr="sk")

sort_key_attr: Optional[str] = None,
expiry_attr: str = "expiration",
status_attr: str = "status",
data_attr: str = "data",
Expand All @@ -35,7 +37,12 @@ def __init__(
table_name: str
Name of the table to use for storing execution records
key_attr: str, optional
DynamoDB attribute name for key, by default "id"
DynamoDB attribute name for partition key, by default "id"
key_attr_value: str, optional
DynamoDB attribute value for partition key, by default "powertools#idempotency".
This will be used if the sort_key_attr is set.
sort_key_attr: str, optional
DynamoDB attribute name for the sort key
expiry_attr: str, optional
DynamoDB attribute name for expiry timestamp, by default "expiration"
status_attr: str, optional
Expand Down Expand Up @@ -68,6 +75,8 @@ def __init__(
self._table = None
self.table_name = table_name
self.key_attr = key_attr
self.key_attr_value = key_attr_value
self.sort_key_attr = sort_key_attr
self.expiry_attr = expiry_attr
self.status_attr = status_attr
self.data_attr = data_attr
Expand All @@ -93,6 +102,11 @@ def table(self, table):
"""
self._table = table

def _get_key(self, idempotency_key: str) -> dict:
if self.sort_key_attr:
return {self.key_attr: self.key_attr_value, self.sort_key_attr: idempotency_key}
return {self.key_attr: idempotency_key}

def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
"""
Translate raw item records from DynamoDB to DataRecord
Expand All @@ -117,7 +131,7 @@ def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
)

def _get_record(self, idempotency_key) -> DataRecord:
response = self.table.get_item(Key={self.key_attr: idempotency_key}, ConsistentRead=True)
response = self.table.get_item(Key=self._get_key(idempotency_key), ConsistentRead=True)

try:
item = response["Item"]
Expand All @@ -127,7 +141,7 @@ def _get_record(self, idempotency_key) -> DataRecord:

def _put_record(self, data_record: DataRecord) -> None:
item = {
self.key_attr: data_record.idempotency_key,
**self._get_key(data_record.idempotency_key),
self.expiry_attr: data_record.expiry_timestamp,
self.status_attr: data_record.status,
}
Expand Down Expand Up @@ -168,7 +182,7 @@ def _update_record(self, data_record: DataRecord):
expression_attr_names["#validation_key"] = self.validation_key_attr

kwargs = {
"Key": {self.key_attr: data_record.idempotency_key},
"Key": self._get_key(data_record.idempotency_key),
"UpdateExpression": update_expression,
"ExpressionAttributeValues": expression_attr_values,
"ExpressionAttributeNames": expression_attr_names,
Expand All @@ -178,4 +192,4 @@ def _update_record(self, data_record: DataRecord):

def _delete_record(self, data_record: DataRecord) -> None:
logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}")
self.table.delete_item(Key={self.key_attr: data_record.idempotency_key})
self.table.delete_item(Key=self._get_key(data_record.idempotency_key))
72 changes: 49 additions & 23 deletions tests/functional/idempotency/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import hashlib
import json
from collections import namedtuple
from dataclasses import dataclass
from decimal import Decimal
from typing import Callable
from unittest import mock

import jmespath
Expand Down Expand Up @@ -80,25 +82,23 @@ def default_jmespath():


@pytest.fixture
def expected_params_update_item(serialized_lambda_response, hashed_idempotency_key):
return {
def expected_params_update_item(serialized_lambda_response):
return lambda key: {
"ExpressionAttributeNames": {"#expiry": "expiration", "#response_data": "data", "#status": "status"},
"ExpressionAttributeValues": {
":expiry": stub.ANY,
":response_data": serialized_lambda_response,
":status": "COMPLETED",
},
"Key": {"id": hashed_idempotency_key},
"Key": key,
"TableName": "TEST_TABLE",
"UpdateExpression": "SET #response_data = :response_data, " "#expiry = :expiry, #status = :status",
}


@pytest.fixture
def expected_params_update_item_with_validation(
serialized_lambda_response, hashed_idempotency_key, hashed_validation_key
):
return {
def expected_params_update_item_with_validation(serialized_lambda_response, hashed_validation_key):
return lambda key: {
"ExpressionAttributeNames": {
"#expiry": "expiration",
"#response_data": "data",
Expand All @@ -111,7 +111,7 @@ def expected_params_update_item_with_validation(
":status": "COMPLETED",
":validation_key": hashed_validation_key,
},
"Key": {"id": hashed_idempotency_key},
"Key": key,
"TableName": "TEST_TABLE",
"UpdateExpression": "SET #response_data = :response_data, "
"#expiry = :expiry, #status = :status, "
Expand All @@ -120,28 +120,23 @@ def expected_params_update_item_with_validation(


@pytest.fixture
def expected_params_put_item(hashed_idempotency_key):
return {
def expected_params_put_item():
return lambda key_attr, key: {
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration"},
"ExpressionAttributeNames": {"#id": key_attr, "#now": "expiration"},
"ExpressionAttributeValues": {":now": stub.ANY},
"Item": {"expiration": stub.ANY, "id": hashed_idempotency_key, "status": "INPROGRESS"},
"Item": {"expiration": stub.ANY, "status": "INPROGRESS", **key},
"TableName": "TEST_TABLE",
}


@pytest.fixture
def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_validation_key):
return {
def expected_params_put_item_with_validation(hashed_validation_key):
return lambda key_attr, key: {
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration"},
"ExpressionAttributeNames": {"#id": key_attr, "#now": "expiration"},
"ExpressionAttributeValues": {":now": stub.ANY},
"Item": {
"expiration": stub.ANY,
"id": hashed_idempotency_key,
"status": "INPROGRESS",
"validation": hashed_validation_key,
},
"Item": {"expiration": stub.ANY, "status": "INPROGRESS", "validation": hashed_validation_key, **key},
"TableName": "TEST_TABLE",
}

Expand All @@ -166,9 +161,40 @@ def hashed_validation_key(lambda_apigw_event):
return hashlib.md5(serialize(lambda_apigw_event["requestContext"]).encode()).hexdigest()


@dataclass(eq=True, frozen=True)
class TestPersistenceStore:
persistence_layer: DynamoDBPersistenceLayer
key_attr: str
expected_key: Callable[[str], dict]
expected_key_values: Callable[[str], dict]


@pytest.fixture(params=[{}, {"key_attr": "PK", "key_attr_value": "powertools#idempotency", "sort_key_attr": "SK"}])
def test_persistence_store(config, request) -> TestPersistenceStore:
expected_key = (
lambda idempotency_key: {"PK": "powertools#idempotency", "SK": idempotency_key}
if request.param
else {"id": idempotency_key}
)
expected_key_values = (
lambda idempotency_key: {"PK": {"S": "powertools#idempotency"}, "SK": {"S": idempotency_key}}
if request.param
else {"id": {"S": idempotency_key}}
)
return TestPersistenceStore(
persistence_layer=DynamoDBPersistenceLayer(table_name=TABLE_NAME, boto_config=config, **request.param),
key_attr="PK" if request.param else "id",
expected_key=expected_key,
expected_key_values=expected_key_values,
)


@pytest.fixture
def persistence_store(config):
return DynamoDBPersistenceLayer(table_name=TABLE_NAME, boto_config=config)
def persistence_store_with_composite_key(config):
return DynamoDBPersistenceLayer(
table_name=TABLE_NAME,
boto_config=config,
)


@pytest.fixture
Expand Down
Loading