diff --git a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DataRecord.java b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DataRecord.java index 934ec3d09..54001c449 100644 --- a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DataRecord.java +++ b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DataRecord.java @@ -26,9 +26,30 @@ public class DataRecord { private final String idempotencyKey; private final String status; + + /** + * This field is controlling how long the result of the idempotent + * event is cached. It is stored in _seconds since epoch_. + * + * DynamoDB's TTL mechanism is used to remove the record once the + * expiry has been reached, and subsequent execution of the request + * will be permitted. The user must configure this on their table. + */ private final long expiryTimestamp; private final String responseData; private final String payloadHash; + + /** + * The in-progress field is set to the remaining lambda execution time + * when the record is created. + * This field is stored in _milliseconds since epoch_. + * + * This ensures that: + * + * 1/ other concurrently executing requests are blocked from starting + * 2/ if a lambda times out, subsequent requests will be allowed again, despite + * the fact that the idempotency record is already in the table + */ private final OptionalLong inProgressExpiryTimestamp; public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, String responseData, String payloadHash) { diff --git a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStore.java b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStore.java index 47ddf4c5c..6b5d0fcb2 100644 --- a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStore.java +++ b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStore.java @@ -126,6 +126,19 @@ public DataRecord getRecord(String idempotencyKey) throws IdempotencyItemNotFoun return itemToRecord(response.item()); } + /** + * Store's the given idempotency record in the DDB store. If there + * is an existing record that has expired - either due to the + * cache expiry or due to the in_progress_expiry - the record + * will be overwritten and the idempotent operation can continue. + * + * Note: This method writes only expiry and status information - not + * the results of the operation itself. + * + * @param record DataRecord instance to store + * @param now + * @throws IdempotencyItemAlreadyExistsException + */ @Override public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlreadyExistsException { Map item = new HashMap<>(getKey(record.getIdempotencyKey())); @@ -152,6 +165,7 @@ public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlre Map expressionAttributeValues = Stream.of( new AbstractMap.SimpleEntry<>(":now", AttributeValue.builder().n(String.valueOf(now.getEpochSecond())).build()), + new AbstractMap.SimpleEntry<>(":now_milliseconds", AttributeValue.builder().n(String.valueOf(now.toEpochMilli())).build()), new AbstractMap.SimpleEntry<>(":inprogress", AttributeValue.builder().s(INPROGRESS.toString()).build()) ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); @@ -160,7 +174,7 @@ public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlre PutItemRequest.builder() .tableName(tableName) .item(item) - .conditionExpression("attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now AND #status = :inprogress)") + .conditionExpression("attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_milliseconds AND #status = :inprogress)") .expressionAttributeNames(expressionAttributeNames) .expressionAttributeValues(expressionAttributeValues) .build() diff --git a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStoreTest.java b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStoreTest.java index 86e35cd33..768da2eaa 100644 --- a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStoreTest.java +++ b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStoreTest.java @@ -95,7 +95,7 @@ public void putRecord_shouldCreateRecordInDynamoDB_IfLambdaWasInProgressAndTimed Map item = new HashMap<>(key); Instant now = Instant.now(); long expiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond(); - long progressExpiry = now.minus(30, ChronoUnit.SECONDS).getEpochSecond(); + long progressExpiry = now.minus(30, ChronoUnit.SECONDS).toEpochMilli(); item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build()); item.put("status", AttributeValue.builder().s(DataRecord.Status.INPROGRESS.toString()).build()); item.put("data", AttributeValue.builder().s("Fake Data").build()); @@ -152,14 +152,14 @@ public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordA } @Test - public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordAlreadyExistAndProgressNotExpiredAfterLambdaTimedOut() { + public void putRecord_shouldBlockUpdate_IfRecordAlreadyExistAndProgressNotExpiredAfterLambdaTimedOut() { key = Collections.singletonMap("id", AttributeValue.builder().s("key").build()); // GIVEN: Insert a fake item with same id Map item = new HashMap<>(key); Instant now = Instant.now(); long expiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond(); // not expired - long progressExpiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond(); // not expired + long progressExpiry = now.plus(30, ChronoUnit.SECONDS).toEpochMilli(); // not expired item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build()); item.put("status", AttributeValue.builder().s(DataRecord.Status.INPROGRESS.toString()).build()); item.put("data", AttributeValue.builder().s("Fake Data").build()); @@ -172,10 +172,10 @@ public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordA new DataRecord("key", DataRecord.Status.INPROGRESS, expiry2, - null, + "Fake Data 2", null - ), now) - ).isInstanceOf(IdempotencyItemAlreadyExistsException.class); + ), now)) + .isInstanceOf(IdempotencyItemAlreadyExistsException.class); // THEN: item was not updated, retrieve the initial one Map itemInDb = client.getItem(GetItemRequest.builder().tableName(TABLE_NAME).key(key).build()).item();