Skip to content

Commit ba190dc

Browse files
fix: idempotency timeout bug (#1285)
* Idemp should be fixed, test isnt * Fix some of the tests. Need to go through the rest still * Add some docs * Update powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DataRecord.java Co-authored-by: Jérôme Van Der Linden <[email protected]> * Address review comment --------- Co-authored-by: Jérôme Van Der Linden <[email protected]>
1 parent a7e3782 commit ba190dc

File tree

3 files changed

+42
-7
lines changed

3 files changed

+42
-7
lines changed

powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DataRecord.java

+21
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,30 @@
2626
public class DataRecord {
2727
private final String idempotencyKey;
2828
private final String status;
29+
30+
/**
31+
* This field is controlling how long the result of the idempotent
32+
* event is cached. It is stored in _seconds since epoch_.
33+
*
34+
* DynamoDB's TTL mechanism is used to remove the record once the
35+
* expiry has been reached, and subsequent execution of the request
36+
* will be permitted. The user must configure this on their table.
37+
*/
2938
private final long expiryTimestamp;
3039
private final String responseData;
3140
private final String payloadHash;
41+
42+
/**
43+
* The in-progress field is set to the remaining lambda execution time
44+
* when the record is created.
45+
* This field is stored in _milliseconds since epoch_.
46+
*
47+
* This ensures that:
48+
*
49+
* 1/ other concurrently executing requests are blocked from starting
50+
* 2/ if a lambda times out, subsequent requests will be allowed again, despite
51+
* the fact that the idempotency record is already in the table
52+
*/
3253
private final OptionalLong inProgressExpiryTimestamp;
3354

3455
public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, String responseData, String payloadHash) {

powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStore.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,19 @@ public DataRecord getRecord(String idempotencyKey) throws IdempotencyItemNotFoun
126126
return itemToRecord(response.item());
127127
}
128128

129+
/**
130+
* Store's the given idempotency record in the DDB store. If there
131+
* is an existing record that has expired - either due to the
132+
* cache expiry or due to the in_progress_expiry - the record
133+
* will be overwritten and the idempotent operation can continue.
134+
*
135+
* <b>Note: This method writes only expiry and status information - not
136+
* the results of the operation itself.</b>
137+
*
138+
* @param record DataRecord instance to store
139+
* @param now
140+
* @throws IdempotencyItemAlreadyExistsException
141+
*/
129142
@Override
130143
public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlreadyExistsException {
131144
Map<String, AttributeValue> item = new HashMap<>(getKey(record.getIdempotencyKey()));
@@ -152,6 +165,7 @@ public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlre
152165

153166
Map<String, AttributeValue> expressionAttributeValues = Stream.of(
154167
new AbstractMap.SimpleEntry<>(":now", AttributeValue.builder().n(String.valueOf(now.getEpochSecond())).build()),
168+
new AbstractMap.SimpleEntry<>(":now_milliseconds", AttributeValue.builder().n(String.valueOf(now.toEpochMilli())).build()),
155169
new AbstractMap.SimpleEntry<>(":inprogress", AttributeValue.builder().s(INPROGRESS.toString()).build())
156170
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
157171

@@ -160,7 +174,7 @@ public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlre
160174
PutItemRequest.builder()
161175
.tableName(tableName)
162176
.item(item)
163-
.conditionExpression("attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now AND #status = :inprogress)")
177+
.conditionExpression("attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_milliseconds AND #status = :inprogress)")
164178
.expressionAttributeNames(expressionAttributeNames)
165179
.expressionAttributeValues(expressionAttributeValues)
166180
.build()

powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStoreTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void putRecord_shouldCreateRecordInDynamoDB_IfLambdaWasInProgressAndTimed
9595
Map<String, AttributeValue> item = new HashMap<>(key);
9696
Instant now = Instant.now();
9797
long expiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond();
98-
long progressExpiry = now.minus(30, ChronoUnit.SECONDS).getEpochSecond();
98+
long progressExpiry = now.minus(30, ChronoUnit.SECONDS).toEpochMilli();
9999
item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build());
100100
item.put("status", AttributeValue.builder().s(DataRecord.Status.INPROGRESS.toString()).build());
101101
item.put("data", AttributeValue.builder().s("Fake Data").build());
@@ -152,14 +152,14 @@ public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordA
152152
}
153153

154154
@Test
155-
public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordAlreadyExistAndProgressNotExpiredAfterLambdaTimedOut() {
155+
public void putRecord_shouldBlockUpdate_IfRecordAlreadyExistAndProgressNotExpiredAfterLambdaTimedOut() {
156156
key = Collections.singletonMap("id", AttributeValue.builder().s("key").build());
157157

158158
// GIVEN: Insert a fake item with same id
159159
Map<String, AttributeValue> item = new HashMap<>(key);
160160
Instant now = Instant.now();
161161
long expiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond(); // not expired
162-
long progressExpiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond(); // not expired
162+
long progressExpiry = now.plus(30, ChronoUnit.SECONDS).toEpochMilli(); // not expired
163163
item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build());
164164
item.put("status", AttributeValue.builder().s(DataRecord.Status.INPROGRESS.toString()).build());
165165
item.put("data", AttributeValue.builder().s("Fake Data").build());
@@ -172,10 +172,10 @@ public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordA
172172
new DataRecord("key",
173173
DataRecord.Status.INPROGRESS,
174174
expiry2,
175-
null,
175+
"Fake Data 2",
176176
null
177-
), now)
178-
).isInstanceOf(IdempotencyItemAlreadyExistsException.class);
177+
), now))
178+
.isInstanceOf(IdempotencyItemAlreadyExistsException.class);
179179

180180
// THEN: item was not updated, retrieve the initial one
181181
Map<String, AttributeValue> itemInDb = client.getItem(GetItemRequest.builder().tableName(TABLE_NAME).key(key).build()).item();

0 commit comments

Comments
 (0)