Skip to content

feat(idempotency): Add support for ReturnValuesOnConditionCheckFailure in Idempotency. #1821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@

package software.amazon.lambda.powertools.idempotency.exceptions;

import java.util.Optional;

import software.amazon.lambda.powertools.idempotency.persistence.DataRecord;

/**
* Exception thrown when trying to store an item which already exists.
*/
public class IdempotencyItemAlreadyExistsException extends RuntimeException {
private static final long serialVersionUID = 9027152772149436500L;
// transient because we don't want to accidentally dump any payloads in logs / stack traces
private transient Optional<DataRecord> dr = Optional.empty();

public IdempotencyItemAlreadyExistsException() {
super();
Expand All @@ -27,4 +33,13 @@ public IdempotencyItemAlreadyExistsException() {
public IdempotencyItemAlreadyExistsException(String msg, Throwable e) {
super(msg, e);
}

public IdempotencyItemAlreadyExistsException(String msg, Throwable e, DataRecord dr) {
super(msg, e);
this.dr = Optional.ofNullable(dr);
}

public Optional<DataRecord> getDataRecord() {
return dr;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ private Object processIdempotency() throws Throwable {
// already exists. If it succeeds, there's no need to call getRecord.
persistenceStore.saveInProgress(data, Instant.now(), getRemainingTimeInMillis());
} catch (IdempotencyItemAlreadyExistsException iaee) {
DataRecord record = getIdempotencyRecord();
if (record != null) {
return handleForStatus(record);
// If a DataRecord is already present on the Exception we can immediately take that one instead of trying
// to fetch it first.
DataRecord dr = iaee.getDataRecord().orElseGet(this::getIdempotencyRecord);
if (dr != null) {
return handleForStatus(dr);
}
} catch (IdempotencyKeyException ike) {
throw ike;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class DataRecord {
private final OptionalLong inProgressExpiryTimestamp;

public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, String responseData,
String payloadHash) {
String payloadHash) {
this.idempotencyKey = idempotencyKey;
this.status = status.toString();
this.expiryTimestamp = expiryTimestamp;
Expand All @@ -63,7 +63,7 @@ public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, St
}

public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, String responseData,
String payloadHash, OptionalLong inProgressExpiryTimestamp) {
String payloadHash, OptionalLong inProgressExpiryTimestamp) {
this.idempotencyKey = idempotencyKey;
this.status = status.toString();
this.expiryTimestamp = expiryTimestamp;
Expand Down Expand Up @@ -131,13 +131,22 @@ public int hashCode() {
return Objects.hash(idempotencyKey, status, expiryTimestamp, responseData, payloadHash);
}

@Override
public String toString() {
return "DataRecord{" +
"idempotencyKey='" + idempotencyKey + '\'' +
", status='" + status + '\'' +
", expiryTimestamp=" + expiryTimestamp +
", payloadHash='" + payloadHash + '\'' +
'}';
}

/**
* Status of the record:
* <ul>
* <li>INPROGRESS: record initialized when function starts</li>
* <li>COMPLETED: record updated with the result of the function when it ends</li>
* <li>EXPIRED: record expired, idempotency will not happen</li>
* <li>INPROGRESS: record initialized when function starts</li>
* <li>COMPLETED: record updated with the result of the function when it ends</li>
* <li>EXPIRED: record expired, idempotency will not happen</li>
* </ul>
*/
public enum Status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
Expand Down Expand Up @@ -154,7 +155,7 @@ public void secondCall_notExpired_shouldGetFromStore() throws JsonProcessingExce
.build())
.configure();

doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
doThrow(new IdempotencyItemAlreadyExistsException()).when(store).saveInProgress(any(), any(), any());

Product p = new Product(42, "fake product", 12);
Basket b = new Basket(p);
Expand All @@ -175,6 +176,44 @@ public void secondCall_notExpired_shouldGetFromStore() throws JsonProcessingExce
assertThat(function.handlerCalled()).isFalse();
}

@Test
public void secondCall_notExpired_shouldNotGetFromStoreIfPresentOnIdempotencyException()
throws JsonProcessingException {
// GIVEN
Idempotency.config()
.withPersistenceStore(store)
.withConfig(IdempotencyConfig.builder()
.withEventKeyJMESPath("id")
.build())
.configure();

Product p = new Product(42, "fake product", 12);
Basket b = new Basket(p);
DataRecord dr = new DataRecord(
"42",
DataRecord.Status.COMPLETED,
Instant.now().plus(356, SECONDS).getEpochSecond(),
JsonConfig.get().getObjectMapper().writer().writeValueAsString(b),
null);

// A data record on this exception should take precedence over fetching a record from the store / cache
doThrow(new IdempotencyItemAlreadyExistsException(
"Test message",
new RuntimeException("Test Cause"),
dr))
.when(store).saveInProgress(any(), any(), any());

// WHEN
IdempotencyEnabledFunction function = new IdempotencyEnabledFunction();
Basket basket = function.handleRequest(p, context);

// THEN
assertThat(basket).isEqualTo(b);
assertThat(function.handlerCalled()).isFalse();
// Should never call the store because item is already present on IdempotencyItemAlreadyExistsException
verify(store, never()).getRecord(any(), any());
}

@Test
public void secondCall_notExpired_shouldGetStringFromStore() {
// GIVEN
Expand All @@ -185,7 +224,7 @@ public void secondCall_notExpired_shouldGetStringFromStore() {
.build())
.configure();

doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
doThrow(new IdempotencyItemAlreadyExistsException()).when(store).saveInProgress(any(), any(), any());

Product p = new Product(42, "fake product", 12);
DataRecord dr = new DataRecord(
Expand Down Expand Up @@ -220,7 +259,7 @@ public void secondCall_notExpired_shouldGetStringFromStoreWithResponseHook() {
.build())
.configure();

doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
doThrow(new IdempotencyItemAlreadyExistsException()).when(store).saveInProgress(any(), any(), any());

Product p = new Product(42, "fake product", 12);
DataRecord dr = new DataRecord(
Expand Down Expand Up @@ -251,7 +290,7 @@ public void secondCall_inProgress_shouldThrowIdempotencyAlreadyInProgressExcepti
.build())
.configure();

doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
doThrow(new IdempotencyItemAlreadyExistsException()).when(store).saveInProgress(any(), any(), any());

Product p = new Product(42, "fake product", 12);
Basket b = new Basket(p);
Expand Down Expand Up @@ -283,7 +322,7 @@ public void secondCall_inProgress_lambdaTimeout_timeoutExpired_shouldThrowIncons
.build())
.configure();

doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
doThrow(new IdempotencyItemAlreadyExistsException()).when(store).saveInProgress(any(), any(), any());

Product p = new Product(42, "fake product", 12);
Basket b = new Basket(p);
Expand Down Expand Up @@ -412,7 +451,7 @@ public void idempotencyOnSubMethodAnnotated_secondCall_notExpired_shouldGetFromS
.withPersistenceStore(store)
.configure();

doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
doThrow(new IdempotencyItemAlreadyExistsException()).when(store).saveInProgress(any(), any(), any());

Product p = new Product(42, "fake product", 12);
Basket b = new Basket(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,17 @@ public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlre
"attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_milliseconds AND #status = :inprogress)")
.expressionAttributeNames(expressionAttributeNames)
.expressionAttributeValues(expressionAttributeValues)
.build()
);
.returnValuesOnConditionCheckFailure("ALL_OLD")
.build());
} catch (ConditionalCheckFailedException e) {
LOG.debug("Failed to put record for already existing idempotency key: {}", record.getIdempotencyKey());
if (e.hasItem()) {
DataRecord existingRecord = itemToRecord(e.item());
throw new IdempotencyItemAlreadyExistsException(
"Failed to put record for already existing idempotency key: " + record.getIdempotencyKey()
+ ". Existing record: " + existingRecord,
e, existingRecord);
}
throw new IdempotencyItemAlreadyExistsException(
"Failed to put record for already existing idempotency key: " + record.getIdempotencyKey(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
*
*/

package software.amazon.lambda.powertools.idempotency.dynamodb;
package software.amazon.lambda.powertools.idempotency.persistence.dynamodb;

import com.amazonaws.services.dynamodbv2.local.main.ServerRunner;
import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import com.amazonaws.services.dynamodbv2.local.main.ServerRunner;
import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
Expand Down Expand Up @@ -66,13 +69,12 @@ public static void setupDynamo() {
.tableName(TABLE_NAME)
.keySchema(KeySchemaElement.builder().keyType(KeyType.HASH).attributeName("id").build())
.attributeDefinitions(
AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.S).build()
)
AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.S).build())
.billingMode(BillingMode.PAY_PER_REQUEST)
.build());

DescribeTableResponse response =
client.describeTable(DescribeTableRequest.builder().tableName(TABLE_NAME).build());
DescribeTableResponse response = client
.describeTable(DescribeTableRequest.builder().tableName(TABLE_NAME).build());
if (response == null) {
throw new RuntimeException("Table was not created within expected time");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.lambda.powertools.idempotency.Constants;
import software.amazon.lambda.powertools.idempotency.IdempotencyConfig;
import software.amazon.lambda.powertools.idempotency.dynamodb.DynamoDBConfig;

import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyItemAlreadyExistsException;
import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyItemNotFoundException;
import software.amazon.lambda.powertools.idempotency.persistence.DataRecord;
Expand Down Expand Up @@ -155,13 +155,14 @@ public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordA
DataRecord.Status.INPROGRESS,
expiry2,
null,
null
), now)
).isInstanceOf(IdempotencyItemAlreadyExistsException.class);
null),
now)).isInstanceOf(IdempotencyItemAlreadyExistsException.class)
// DataRecord should be present due to returnValuesOnConditionCheckFailure("ALL_OLD")
.matches(e -> ((IdempotencyItemAlreadyExistsException) e).getDataRecord().isPresent());

// THEN: item was not updated, retrieve the initial one
Map<String, AttributeValue> itemInDb =
client.getItem(GetItemRequest.builder().tableName(TABLE_NAME).key(key).build()).item();
Map<String, AttributeValue> itemInDb = client
.getItem(GetItemRequest.builder().tableName(TABLE_NAME).key(key).build()).item();
assertThat(itemInDb).isNotNull();
assertThat(itemInDb.get("status").s()).isEqualTo("COMPLETED");
assertThat(itemInDb.get("expiration").n()).isEqualTo(String.valueOf(expiry));
Expand Down Expand Up @@ -190,13 +191,16 @@ public void putRecord_shouldBlockUpdate_IfRecordAlreadyExistAndProgressNotExpire
DataRecord.Status.INPROGRESS,
expiry2,
"Fake Data 2",
null
), now))
.isInstanceOf(IdempotencyItemAlreadyExistsException.class);
null),
now))
.isInstanceOf(IdempotencyItemAlreadyExistsException.class)
// DataRecord should be present due to returnValuesOnConditionCheckFailure("ALL_OLD")
.matches(e -> ((IdempotencyItemAlreadyExistsException) e).getDataRecord().isPresent());
;

// THEN: item was not updated, retrieve the initial one
Map<String, AttributeValue> itemInDb =
client.getItem(GetItemRequest.builder().tableName(TABLE_NAME).key(key).build()).item();
Map<String, AttributeValue> itemInDb = client
.getItem(GetItemRequest.builder().tableName(TABLE_NAME).key(key).build()).item();
assertThat(itemInDb).isNotNull();
assertThat(itemInDb.get("status").s()).isEqualTo("INPROGRESS");
assertThat(itemInDb.get("expiration").n()).isEqualTo(String.valueOf(expiry));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@
*
*/

package software.amazon.lambda.powertools.idempotency.dynamodb;

package software.amazon.lambda.powertools.idempotency.persistence.dynamodb;

import static org.assertj.core.api.Assertions.assertThat;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
import com.amazonaws.services.lambda.runtime.tests.EventLoader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
import com.amazonaws.services.lambda.runtime.tests.EventLoader;

import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.lambda.powertools.idempotency.dynamodb.handlers.IdempotencyFunction;
import software.amazon.lambda.powertools.idempotency.persistence.dynamodb.handlers.IdempotencyFunction;

public class IdempotencyTest extends DynamoDBConfig {

Expand All @@ -41,14 +42,14 @@ void setUp() {
public void endToEndTest() {
IdempotencyFunction function = new IdempotencyFunction(client);

APIGatewayProxyResponseEvent response =
function.handleRequest(EventLoader.loadApiGatewayRestEvent("apigw_event2.json"), context);
APIGatewayProxyResponseEvent response = function
.handleRequest(EventLoader.loadApiGatewayRestEvent("apigw_event2.json"), context);
assertThat(function.handlerExecuted).isTrue();

function.handlerExecuted = false;

APIGatewayProxyResponseEvent response2 =
function.handleRequest(EventLoader.loadApiGatewayRestEvent("apigw_event2.json"), context);
APIGatewayProxyResponseEvent response2 = function
.handleRequest(EventLoader.loadApiGatewayRestEvent("apigw_event2.json"), context);
assertThat(function.handlerExecuted).isFalse();

assertThat(response).isEqualTo(response2);
Expand Down
Loading
Loading