Skip to content

feat(idempotency): handle lambda timeout scenarios for INPROGRESS records #933

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 14, 2022
1 change: 1 addition & 0 deletions powertools-idempotency/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-tests</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ public static Idempotency getInstance() {
return Holder.instance;
}

/**
* Can be used in a method which is not the handler to capture the Lambda context,
* to calculate the remaining time before the invocation times out.
*
* @param lambdaContext
*/
public static void registerLambdaContext(Context lambdaContext) {
getInstance().getConfig().setLambdaContext(lambdaContext);
}

/**
* Acts like a builder that can be used to configure {@link Idempotency}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package software.amazon.lambda.powertools.idempotency;

import com.amazonaws.services.lambda.runtime.Context;
import software.amazon.lambda.powertools.idempotency.internal.cache.LRUCache;

import java.security.MessageDigest;
import java.time.Duration;

/**
Expand All @@ -28,6 +30,7 @@ public class IdempotencyConfig {
private final String payloadValidationJMESPath;
private final boolean throwOnNoIdempotencyKey;
private final String hashFunction;
private Context lambdaContext;

private IdempotencyConfig(String eventKeyJMESPath, String payloadValidationJMESPath, boolean throwOnNoIdempotencyKey, boolean useLocalCache, int localCacheMaxItems, long expirationInSeconds, String hashFunction) {
this.localCacheMaxItems = localCacheMaxItems;
Expand Down Expand Up @@ -71,12 +74,20 @@ public String getHashFunction() {
/**
* Create a builder that can be used to configure and create a {@link IdempotencyConfig}.
*
* @return a new instance of {@link IdempotencyConfig.Builder}
* @return a new instance of {@link Builder}
*/
public static Builder builder() {
return new Builder();
}

public void setLambdaContext(Context lambdaContext) {
this.lambdaContext = lambdaContext;
}

public Context getLambdaContext() {
return lambdaContext;
}

public static class Builder {

private int localCacheMaxItems = 256;
Expand Down Expand Up @@ -203,7 +214,7 @@ public Builder withThrowOnNoIdempotencyKey() {
/**
* Function to use for calculating hashes, by default MD5.
*
* @param hashFunction Can be any algorithm supported by {@link java.security.MessageDigest}, most commons are<ul>
* @param hashFunction Can be any algorithm supported by {@link MessageDigest}, most commons are<ul>
* <li>MD5</li>
* <li>SHA-1</li>
* <li>SHA-256</li></ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

/**
* IdempotencyInconsistentStateException can happen under rare but expected cases
* when persistent state changes in the small-time between put & get requests.
* when persistent state changes in the small-time between put &amp; get requests.
*/
public class IdempotencyInconsistentStateException extends RuntimeException {
private static final long serialVersionUID = -4293951999802300672L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package software.amazon.lambda.powertools.idempotency.internal;

import com.amazonaws.services.lambda.runtime.Context;
import com.fasterxml.jackson.databind.JsonNode;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
Expand All @@ -25,6 +26,8 @@
import software.amazon.lambda.powertools.utilities.JsonConfig;

import java.time.Instant;
import java.util.Optional;
import java.util.OptionalInt;

import static software.amazon.lambda.powertools.idempotency.persistence.DataRecord.Status.EXPIRED;
import static software.amazon.lambda.powertools.idempotency.persistence.DataRecord.Status.INPROGRESS;
Expand All @@ -40,10 +43,12 @@ public class IdempotencyHandler {
private final ProceedingJoinPoint pjp;
private final JsonNode data;
private final BasePersistenceStore persistenceStore;
private final Context lambdaContext;

public IdempotencyHandler(ProceedingJoinPoint pjp, String functionName, JsonNode payload) {
public IdempotencyHandler(ProceedingJoinPoint pjp, String functionName, JsonNode payload, Context lambdaContext) {
this.pjp = pjp;
this.data = payload;
this.lambdaContext = lambdaContext;
persistenceStore = Idempotency.getInstance().getPersistenceStore();
persistenceStore.configure(Idempotency.getInstance().getConfig(), functionName);
}
Expand Down Expand Up @@ -77,7 +82,7 @@ private Object processIdempotency() throws Throwable {
try {
// We call saveInProgress first as an optimization for the most common case where no idempotent record
// already exists. If it succeeds, there's no need to call getRecord.
persistenceStore.saveInProgress(data, Instant.now());
persistenceStore.saveInProgress(data, Instant.now(), getRemainingTimeInMillis());
} catch (IdempotencyItemAlreadyExistsException iaee) {
DataRecord record = getIdempotencyRecord();
return handleForStatus(record);
Expand All @@ -89,6 +94,21 @@ private Object processIdempotency() throws Throwable {
return getFunctionResponse();
}

/**
* Tries to determine the remaining time available for the current lambda invocation.
* Currently, it only works if the idempotent handler decorator is used or using {@link Idempotency#registerLambdaContext(Context)}
*
* @return the remaining time in milliseconds or empty if the context was not provided/found
*/
private OptionalInt getRemainingTimeInMillis() {
if (lambdaContext != null) {
return OptionalInt.of(lambdaContext.getRemainingTimeInMillis());
} else {
LOG.warn("Couldn't determine the remaining time left. Did you call registerLambdaContext on Idempotency?");
}
return OptionalInt.empty();
}

/**
* Retrieve the idempotency record from the persistence layer.
*
Expand Down Expand Up @@ -121,6 +141,10 @@ private Object handleForStatus(DataRecord record) {
}

if (INPROGRESS.equals(record.getStatus())) {
if (record.getInProgressExpiryTimestamp().isPresent()
&& record.getInProgressExpiryTimestamp().getAsLong() < Instant.now().toEpochMilli()) {
throw new IdempotencyInconsistentStateException("Item should have been expired in-progress because it already time-outed.");
}
throw new IdempotencyAlreadyInProgressException("Execution already in progress with idempotency key: " + record.getIdempotencyKey());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import com.amazonaws.services.lambda.runtime.Context;
import software.amazon.lambda.powertools.idempotency.Constants;
import software.amazon.lambda.powertools.idempotency.Idempotency;
import software.amazon.lambda.powertools.idempotency.IdempotencyKey;
import software.amazon.lambda.powertools.idempotency.Idempotent;
import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyConfigurationException;
Expand Down Expand Up @@ -56,12 +58,20 @@ public Object around(ProceedingJoinPoint pjp,
throw new IdempotencyConfigurationException("The annotated method doesn't return anything. Unable to perform idempotency on void return type");
}

JsonNode payload = getPayload(pjp, method);
boolean isHandler = (isHandlerMethod(pjp) && placedOnRequestHandler(pjp));
JsonNode payload = getPayload(pjp, method, isHandler);
if (payload == null) {
throw new IdempotencyConfigurationException("Unable to get payload from the method. Ensure there is at least one parameter or that you use @IdempotencyKey");
}

IdempotencyHandler idempotencyHandler = new IdempotencyHandler(pjp, method.getName(), payload);
Context lambdaContext;
if (isHandler) {
lambdaContext = (Context) pjp.getArgs()[1];
} else {
lambdaContext = Idempotency.getInstance().getConfig().getLambdaContext();
}

IdempotencyHandler idempotencyHandler = new IdempotencyHandler(pjp, method.getName(), payload, lambdaContext);
return idempotencyHandler.handle();
}

Expand All @@ -71,11 +81,10 @@ public Object around(ProceedingJoinPoint pjp,
* @param method the annotated method
* @return the payload used for idempotency
*/
private JsonNode getPayload(ProceedingJoinPoint pjp, Method method) {
private JsonNode getPayload(ProceedingJoinPoint pjp, Method method, boolean isHandler) {
JsonNode payload = null;
// handleRequest or method with one parameter: get the first one
if ((isHandlerMethod(pjp) && placedOnRequestHandler(pjp))
|| pjp.getArgs().length == 1) {
if (isHandler || pjp.getArgs().length == 1) {
payload = JsonConfig.get().getObjectMapper().valueToTree(pjp.getArgs()[0]);
} else {
// Look for a parameter annotated with @IdempotencyKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -131,19 +129,25 @@ public void saveSuccess(JsonNode data, Object result, Instant now) {
* @param data Payload
* @param now
*/
public void saveInProgress(JsonNode data, Instant now) throws IdempotencyItemAlreadyExistsException {
public void saveInProgress(JsonNode data, Instant now, OptionalInt remainingTimeInMs) throws IdempotencyItemAlreadyExistsException {
String idempotencyKey = getHashedIdempotencyKey(data);

if (retrieveFromCache(idempotencyKey, now) != null) {
throw new IdempotencyItemAlreadyExistsException();
}

OptionalLong inProgressExpirationMsTimestamp = OptionalLong.empty();
if (remainingTimeInMs.isPresent()) {
inProgressExpirationMsTimestamp = OptionalLong.of(now.plus(remainingTimeInMs.getAsInt(), ChronoUnit.MILLIS).toEpochMilli());
}

DataRecord record = new DataRecord(
idempotencyKey,
DataRecord.Status.INPROGRESS,
getExpiryEpochSecond(now),
null,
getHashedPayload(data)
getHashedPayload(data),
inProgressExpirationMsTimestamp
);
LOG.debug("saving in progress record for idempotency key: {}", record.getIdempotencyKey());
putRecord(record, now);
Expand Down Expand Up @@ -212,7 +216,8 @@ private String getHashedIdempotencyKey(JsonNode data) {
}

String hash = generateHash(node);
return functionName + "#" + hash;
hash = functionName + "#" + hash;
return hash;
}

private boolean isMissingIdemPotencyKey(JsonNode data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
*/
package software.amazon.lambda.powertools.idempotency.persistence;

import software.amazon.lambda.powertools.idempotency.IdempotencyConfig;

import java.time.Instant;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.OptionalLong;

/**
* Data Class for idempotency records. This is actually the item that will be stored in the persistence layer.
Expand All @@ -25,21 +29,32 @@ public class DataRecord {
private final long expiryTimestamp;
private final String responseData;
private final String payloadHash;
private final OptionalLong inProgressExpiryTimestamp;

public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, String responseData, String payloadHash) {
this.idempotencyKey = idempotencyKey;
this.status = status.toString();
this.expiryTimestamp = expiryTimestamp;
this.responseData = responseData;
this.payloadHash = payloadHash;
this.inProgressExpiryTimestamp = OptionalLong.empty();
}

public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, String responseData, String payloadHash, OptionalLong inProgressExpiryTimestamp) {
this.idempotencyKey = idempotencyKey;
this.status = status.toString();
this.expiryTimestamp = expiryTimestamp;
this.responseData = responseData;
this.payloadHash = payloadHash;
this.inProgressExpiryTimestamp = inProgressExpiryTimestamp;
}

public String getIdempotencyKey() {
return idempotencyKey;
}

/**
* Check if data record is expired (based on expiration configured in the {@link software.amazon.lambda.powertools.idempotency.IdempotencyConfig})
* Check if data record is expired (based on expiration configured in the {@link IdempotencyConfig})
*
* @return Whether the record is currently expired or not
*/
Expand All @@ -60,6 +75,10 @@ public long getExpiryTimestamp() {
return expiryTimestamp;
}

public OptionalLong getInProgressExpiryTimestamp() {
return inProgressExpiryTimestamp;
}

public String getResponseData() {
return responseData;
}
Expand All @@ -85,6 +104,7 @@ public int hashCode() {
return Objects.hash(idempotencyKey, status, expiryTimestamp, responseData, payloadHash);
}


/**
* Status of the record:
* <ul>
Expand Down
Loading