Skip to content

fix: idempotency timeout bug #1285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,30 @@
public class DataRecord {
private final String idempotencyKey;
private final String status;

/**
* This field is control how long the result of the idempotent
* event is cached. It is stored in _seconds since epoch_.
*
* DynamoDB's TTL mechanism is used to remove the record once the
* expiry has been reached, and subsequent execution of the request
* will be permitted. The user must configure this on their table.
*/
private final long expiryTimestamp;
private final String responseData;
private final String payloadHash;

/**
* The in-progress field is set to the remaining lambda execution time
* when the record is created.
* This field is stored in _milliseconds since epoch_.
*
* This ensures that:
*
* 1/ other concurrently executing requests are blocked from starting
* 2/ if a lambda times out, subsequent requests will be allowed again, despite
* the fact that the idempotency record is already in the table
*/
private final OptionalLong inProgressExpiryTimestamp;

public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, String responseData, String payloadHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@ public DataRecord getRecord(String idempotencyKey) throws IdempotencyItemNotFoun
return itemToRecord(response.item());
}

/**
* Store's the given idempotency record in the DDB store. If there
* is an existing record that has expired - either due to the
* cache expiry or due to the in_progress_expiry - the record
* will be overwritten and the idempotent operation can continue.
*
* <b>Note: This method writes only expiry and status information - not
* the results of the operation itself.</b>
*
* @param record DataRecord instance to store
* @param now
* @throws IdempotencyItemAlreadyExistsException
*/
@Override
public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlreadyExistsException {
Map<String, AttributeValue> item = new HashMap<>(getKey(record.getIdempotencyKey()));
Expand All @@ -152,6 +165,7 @@ public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlre

Map<String, AttributeValue> expressionAttributeValues = Stream.of(
new AbstractMap.SimpleEntry<>(":now", AttributeValue.builder().n(String.valueOf(now.getEpochSecond())).build()),
new AbstractMap.SimpleEntry<>(":now_milliseconds", AttributeValue.builder().n(String.valueOf(now.getEpochSecond() * 1000)).build()),
new AbstractMap.SimpleEntry<>(":inprogress", AttributeValue.builder().s(INPROGRESS.toString()).build())
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Expand All @@ -160,7 +174,7 @@ public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlre
PutItemRequest.builder()
.tableName(tableName)
.item(item)
.conditionExpression("attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now AND #status = :inprogress)")
.conditionExpression("attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_milliseconds AND #status = :inprogress)")
.expressionAttributeNames(expressionAttributeNames)
.expressionAttributeValues(expressionAttributeValues)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void putRecord_shouldCreateRecordInDynamoDB_IfLambdaWasInProgressAndTimed
Map<String, AttributeValue> item = new HashMap<>(key);
Instant now = Instant.now();
long expiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond();
long progressExpiry = now.minus(30, ChronoUnit.SECONDS).getEpochSecond();
long progressExpiry = now.minus(30, ChronoUnit.SECONDS).toEpochMilli();
item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build());
item.put("status", AttributeValue.builder().s(DataRecord.Status.INPROGRESS.toString()).build());
item.put("data", AttributeValue.builder().s("Fake Data").build());
Expand Down Expand Up @@ -152,14 +152,14 @@ public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordA
}

@Test
public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordAlreadyExistAndProgressNotExpiredAfterLambdaTimedOut() {
public void putRecord_shouldBlockUpdate_IfRecordAlreadyExistAndProgressNotExpiredAfterLambdaTimedOut() {
key = Collections.singletonMap("id", AttributeValue.builder().s("key").build());

// GIVEN: Insert a fake item with same id
Map<String, AttributeValue> item = new HashMap<>(key);
Instant now = Instant.now();
long expiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond(); // not expired
long progressExpiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond(); // not expired
long progressExpiry = now.plus(30, ChronoUnit.SECONDS).toEpochMilli(); // not expired
item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build());
item.put("status", AttributeValue.builder().s(DataRecord.Status.INPROGRESS.toString()).build());
item.put("data", AttributeValue.builder().s("Fake Data").build());
Expand All @@ -172,10 +172,10 @@ public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordA
new DataRecord("key",
DataRecord.Status.INPROGRESS,
expiry2,
null,
"Fake Data 2",
null
), now)
).isInstanceOf(IdempotencyItemAlreadyExistsException.class);
), now))
.isInstanceOf(IdempotencyItemAlreadyExistsException.class);

// THEN: item was not updated, retrieve the initial one
Map<String, AttributeValue> itemInDb = client.getItem(GetItemRequest.builder().tableName(TABLE_NAME).key(key).build()).item();
Expand Down