Skip to content

Commit d6115ac

Browse files
authored
feat(idempotency): handle lambda timeout scenarios for INPROGRESS records (#933)
* fix idempotency when function timeouts
1 parent 4fb0384 commit d6115ac

18 files changed

+563
-40
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ Desktop.ini
8282
######################
8383
/bin/
8484
/deploy/
85+
/dist/
86+
/site/
8587

8688
######################
8789
# Logs

docs/utilities/idempotency.md

+124-2
Original file line numberDiff line numberDiff line change
@@ -355,19 +355,141 @@ Imagine the function executes successfully, but the client never receives the re
355355

356356
This sequence diagram shows an example flow of what happens in the payment scenario:
357357

358-
![Idempotent sequence](../media/idempotent_sequence.png)
358+
<center>
359+
```mermaid
360+
sequenceDiagram
361+
participant Client
362+
participant Lambda
363+
participant Persistence Layer
364+
alt initial request
365+
Client->>Lambda: Invoke (event)
366+
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
367+
activate Persistence Layer
368+
Note right of Persistence Layer: Locked to prevent concurrent<br/>invocations with <br/> the same payload.
369+
Lambda-->>Lambda: Call handler (event)
370+
Lambda->>Persistence Layer: Update record with result
371+
deactivate Persistence Layer
372+
Persistence Layer-->>Persistence Layer: Update record with result
373+
Lambda-->>Client: Response sent to client
374+
else retried request
375+
Client->>Lambda: Invoke (event)
376+
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
377+
Persistence Layer-->>Lambda: Already exists in persistence layer. Return result
378+
Lambda-->>Client: Response sent to client
379+
end
380+
```
381+
<i>Idempotent sequence</i>
382+
</center>
359383

360384
The client was successful in receiving the result after the retry. Since the Lambda handler was only executed once, our customer hasn't been charged twice.
361385

362386
!!! note
363387
Bear in mind that the entire Lambda handler is treated as a single idempotent operation. If your Lambda handler can cause multiple side effects, consider splitting it into separate functions.
364388

389+
#### Lambda timeouts
390+
391+
This is automatically done when you annotate your Lambda handler with [@Idempotent annotation](#idempotent-annotation).
392+
393+
To prevent against extended failed retries when a [Lambda function times out](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-verify-invocation-timeouts/), Powertools calculates and includes the remaining invocation available time as part of the idempotency record.
394+
395+
!!! example
396+
If a second invocation happens **after** this timestamp, and the record is marked as `INPROGRESS`, we will execute the invocation again as if it was in the `EXPIRED` state.
397+
This means that if an invocation expired during execution, it will be quickly executed again on the next retry.
398+
399+
!!! important
400+
If you are using the [@Idempotent annotation on another method](#idempotent-annotation-on-another-method) to guard isolated parts of your code, you must use `registerLambdaContext` method available in the `Idempotency` object to benefit from this protection.
401+
402+
Here is an example on how you register the Lambda context in your handler:
403+
404+
```java hl_lines="13-19" title="Registering the Lambda context"
405+
public class PaymentHandler implements RequestHandler<SQSEvent, List<String>> {
406+
407+
public PaymentHandler() {
408+
Idempotency.config()
409+
.withPersistenceStore(
410+
DynamoDBPersistenceStore.builder()
411+
.withTableName(System.getenv("IDEMPOTENCY_TABLE"))
412+
.build())
413+
.configure();
414+
}
415+
416+
@Override
417+
public List<String> handleRequest(SQSEvent sqsEvent, Context context) {
418+
Idempotency.registerLambdaContext(context);
419+
return sqsEvent.getRecords().stream().map(record -> process(record.getMessageId(), record.getBody())).collect(Collectors.toList());
420+
}
421+
422+
@Idempotent
423+
private String process(String messageId, @IdempotencyKey String messageBody) {
424+
logger.info("Processing messageId: {}", messageId);
425+
PaymentRequest request = extractDataFrom(messageBody).as(PaymentRequest.class);
426+
return paymentService.process(request);
427+
}
428+
429+
}
430+
```
431+
432+
#### Lambda timeout sequence diagram
433+
434+
This sequence diagram shows an example flow of what happens if a Lambda function times out:
435+
436+
<center>
437+
```mermaid
438+
sequenceDiagram
439+
participant Client
440+
participant Lambda
441+
participant Persistence Layer
442+
alt initial request
443+
Client->>Lambda: Invoke (event)
444+
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
445+
activate Persistence Layer
446+
Note right of Persistence Layer: Locked to prevent concurrent<br/>invocations with <br/> the same payload.
447+
Note over Lambda: Time out
448+
Lambda--xLambda: Call handler (event)
449+
Lambda-->>Client: Return error response
450+
deactivate Persistence Layer
451+
else concurrent request before timeout
452+
Client->>Lambda: Invoke (event)
453+
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
454+
Persistence Layer-->>Lambda: Request already INPROGRESS
455+
Lambda--xClient: Return IdempotencyAlreadyInProgressError
456+
else retry after Lambda timeout
457+
Client->>Lambda: Invoke (event)
458+
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
459+
activate Persistence Layer
460+
Note right of Persistence Layer: Locked to prevent concurrent<br/>invocations with <br/> the same payload.
461+
Lambda-->>Lambda: Call handler (event)
462+
Lambda->>Persistence Layer: Update record with result
463+
deactivate Persistence Layer
464+
Persistence Layer-->>Persistence Layer: Update record with result
465+
Lambda-->>Client: Response sent to client
466+
end
467+
```
468+
<i>Idempotent sequence for Lambda timeouts</i>
469+
</center>
470+
365471
### Handling exceptions
366472

367473
If you are using the `@Idempotent` annotation on your Lambda handler or any other method, any unhandled exceptions that are thrown during the code execution will cause **the record in the persistence layer to be deleted**.
368474
This means that new invocations will execute your code again despite having the same payload. If you don't want the record to be deleted, you need to catch exceptions within the idempotent function and return a successful response.
369475

370-
![Idempotent sequence exception](../media/idempotent_sequence_exception.png)
476+
<center>
477+
```mermaid
478+
sequenceDiagram
479+
participant Client
480+
participant Lambda
481+
participant Persistence Layer
482+
Client->>Lambda: Invoke (event)
483+
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
484+
activate Persistence Layer
485+
Note right of Persistence Layer: Locked during this time. Prevents multiple<br/>Lambda invocations with the same<br/>payload running concurrently.
486+
Lambda--xLambda: Call handler (event).<br/>Raises exception
487+
Lambda->>Persistence Layer: Delete record (id=event.search(payload))
488+
deactivate Persistence Layer
489+
Lambda-->>Client: Return error response
490+
```
491+
<i>Idempotent sequence exception</i>
492+
</center>
371493

372494
If an Exception is raised _outside_ the scope of a decorated method and after your method has been called, the persistent record will not be affected. In this case, idempotency will be maintained for your decorated function. Example:
373495

mkdocs.yml

+5
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ markdown_extensions:
5656
- pymdownx.snippets:
5757
base_path: '.'
5858
check_paths: true
59+
- pymdownx.superfences:
60+
custom_fences:
61+
- name: mermaid
62+
class: mermaid
63+
format: !!python/name:pymdownx.superfences.fence_code_format
5964
- meta
6065
- toc:
6166
permalink: true

powertools-idempotency/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
<dependency>
123123
<groupId>com.amazonaws</groupId>
124124
<artifactId>aws-lambda-java-tests</artifactId>
125+
<scope>test</scope>
125126
</dependency>
126127
<dependency>
127128
<groupId>com.amazonaws</groupId>

powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/Idempotency.java

+10
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,16 @@ public static Idempotency getInstance() {
6565
return Holder.instance;
6666
}
6767

68+
/**
69+
* Can be used in a method which is not the handler to capture the Lambda context,
70+
* to calculate the remaining time before the invocation times out.
71+
*
72+
* @param lambdaContext
73+
*/
74+
public static void registerLambdaContext(Context lambdaContext) {
75+
getInstance().getConfig().setLambdaContext(lambdaContext);
76+
}
77+
6878
/**
6979
* Acts like a builder that can be used to configure {@link Idempotency}
7080
*

powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/IdempotencyConfig.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package software.amazon.lambda.powertools.idempotency;
1515

16+
import com.amazonaws.services.lambda.runtime.Context;
1617
import software.amazon.lambda.powertools.idempotency.internal.cache.LRUCache;
1718

1819
import java.time.Duration;
@@ -28,6 +29,7 @@ public class IdempotencyConfig {
2829
private final String payloadValidationJMESPath;
2930
private final boolean throwOnNoIdempotencyKey;
3031
private final String hashFunction;
32+
private Context lambdaContext;
3133

3234
private IdempotencyConfig(String eventKeyJMESPath, String payloadValidationJMESPath, boolean throwOnNoIdempotencyKey, boolean useLocalCache, int localCacheMaxItems, long expirationInSeconds, String hashFunction) {
3335
this.localCacheMaxItems = localCacheMaxItems;
@@ -71,12 +73,20 @@ public String getHashFunction() {
7173
/**
7274
* Create a builder that can be used to configure and create a {@link IdempotencyConfig}.
7375
*
74-
* @return a new instance of {@link IdempotencyConfig.Builder}
76+
* @return a new instance of {@link Builder}
7577
*/
7678
public static Builder builder() {
7779
return new Builder();
7880
}
7981

82+
public void setLambdaContext(Context lambdaContext) {
83+
this.lambdaContext = lambdaContext;
84+
}
85+
86+
public Context getLambdaContext() {
87+
return lambdaContext;
88+
}
89+
8090
public static class Builder {
8191

8292
private int localCacheMaxItems = 256;

powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/exceptions/IdempotencyInconsistentStateException.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
/**
1717
* IdempotencyInconsistentStateException can happen under rare but expected cases
18-
* when persistent state changes in the small-time between put & get requests.
18+
* when persistent state changes in the small-time between put &amp; get requests.
1919
*/
2020
public class IdempotencyInconsistentStateException extends RuntimeException {
2121
private static final long serialVersionUID = -4293951999802300672L;

powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyHandler.java

+27-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package software.amazon.lambda.powertools.idempotency.internal;
1515

16+
import com.amazonaws.services.lambda.runtime.Context;
1617
import com.fasterxml.jackson.databind.JsonNode;
1718
import org.aspectj.lang.ProceedingJoinPoint;
1819
import org.aspectj.lang.reflect.MethodSignature;
@@ -25,6 +26,7 @@
2526
import software.amazon.lambda.powertools.utilities.JsonConfig;
2627

2728
import java.time.Instant;
29+
import java.util.OptionalInt;
2830

2931
import static software.amazon.lambda.powertools.idempotency.persistence.DataRecord.Status.EXPIRED;
3032
import static software.amazon.lambda.powertools.idempotency.persistence.DataRecord.Status.INPROGRESS;
@@ -40,10 +42,12 @@ public class IdempotencyHandler {
4042
private final ProceedingJoinPoint pjp;
4143
private final JsonNode data;
4244
private final BasePersistenceStore persistenceStore;
45+
private final Context lambdaContext;
4346

44-
public IdempotencyHandler(ProceedingJoinPoint pjp, String functionName, JsonNode payload) {
47+
public IdempotencyHandler(ProceedingJoinPoint pjp, String functionName, JsonNode payload, Context lambdaContext) {
4548
this.pjp = pjp;
4649
this.data = payload;
50+
this.lambdaContext = lambdaContext;
4751
persistenceStore = Idempotency.getInstance().getPersistenceStore();
4852
persistenceStore.configure(Idempotency.getInstance().getConfig(), functionName);
4953
}
@@ -77,7 +81,7 @@ private Object processIdempotency() throws Throwable {
7781
try {
7882
// We call saveInProgress first as an optimization for the most common case where no idempotent record
7983
// already exists. If it succeeds, there's no need to call getRecord.
80-
persistenceStore.saveInProgress(data, Instant.now());
84+
persistenceStore.saveInProgress(data, Instant.now(), getRemainingTimeInMillis());
8185
} catch (IdempotencyItemAlreadyExistsException iaee) {
8286
DataRecord record = getIdempotencyRecord();
8387
return handleForStatus(record);
@@ -89,6 +93,21 @@ private Object processIdempotency() throws Throwable {
8993
return getFunctionResponse();
9094
}
9195

96+
/**
97+
* Tries to determine the remaining time available for the current lambda invocation.
98+
* Currently, it only works if the idempotent handler decorator is used or using {@link Idempotency#registerLambdaContext(Context)}
99+
*
100+
* @return the remaining time in milliseconds or empty if the context was not provided/found
101+
*/
102+
private OptionalInt getRemainingTimeInMillis() {
103+
if (lambdaContext != null) {
104+
return OptionalInt.of(lambdaContext.getRemainingTimeInMillis());
105+
} else {
106+
LOG.warn("Couldn't determine the remaining time left. Did you call registerLambdaContext on Idempotency?");
107+
}
108+
return OptionalInt.empty();
109+
}
110+
92111
/**
93112
* Retrieve the idempotency record from the persistence layer.
94113
*
@@ -121,12 +140,18 @@ private Object handleForStatus(DataRecord record) {
121140
}
122141

123142
if (INPROGRESS.equals(record.getStatus())) {
143+
if (record.getInProgressExpiryTimestamp().isPresent()
144+
&& record.getInProgressExpiryTimestamp().getAsLong() < Instant.now().toEpochMilli()) {
145+
throw new IdempotencyInconsistentStateException("Item should have been expired in-progress because it already time-outed.");
146+
}
124147
throw new IdempotencyAlreadyInProgressException("Execution already in progress with idempotency key: " + record.getIdempotencyKey());
125148
}
126149

127150
Class<?> returnType = ((MethodSignature) pjp.getSignature()).getReturnType();
128151
try {
129152
LOG.debug("Response for key '{}' retrieved from idempotency store, skipping the function", record.getIdempotencyKey());
153+
if (returnType.equals(String.class))
154+
return record.getResponseData();
130155
return JsonConfig.get().getObjectMapper().reader().readValue(record.getResponseData(), returnType);
131156
} catch (Exception e) {
132157
throw new IdempotencyPersistenceLayerException("Unable to get function response as " + returnType.getSimpleName(), e);

powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotentAspect.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import org.aspectj.lang.annotation.Aspect;
2020
import org.aspectj.lang.annotation.Pointcut;
2121
import org.aspectj.lang.reflect.MethodSignature;
22+
import com.amazonaws.services.lambda.runtime.Context;
2223
import software.amazon.lambda.powertools.idempotency.Constants;
24+
import software.amazon.lambda.powertools.idempotency.Idempotency;
2325
import software.amazon.lambda.powertools.idempotency.IdempotencyKey;
2426
import software.amazon.lambda.powertools.idempotency.Idempotent;
2527
import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyConfigurationException;
@@ -56,12 +58,20 @@ public Object around(ProceedingJoinPoint pjp,
5658
throw new IdempotencyConfigurationException("The annotated method doesn't return anything. Unable to perform idempotency on void return type");
5759
}
5860

59-
JsonNode payload = getPayload(pjp, method);
61+
boolean isHandler = (isHandlerMethod(pjp) && placedOnRequestHandler(pjp));
62+
JsonNode payload = getPayload(pjp, method, isHandler);
6063
if (payload == null) {
6164
throw new IdempotencyConfigurationException("Unable to get payload from the method. Ensure there is at least one parameter or that you use @IdempotencyKey");
6265
}
6366

64-
IdempotencyHandler idempotencyHandler = new IdempotencyHandler(pjp, method.getName(), payload);
67+
Context lambdaContext;
68+
if (isHandler) {
69+
lambdaContext = (Context) pjp.getArgs()[1];
70+
} else {
71+
lambdaContext = Idempotency.getInstance().getConfig().getLambdaContext();
72+
}
73+
74+
IdempotencyHandler idempotencyHandler = new IdempotencyHandler(pjp, method.getName(), payload, lambdaContext);
6575
return idempotencyHandler.handle();
6676
}
6777

@@ -71,11 +81,10 @@ public Object around(ProceedingJoinPoint pjp,
7181
* @param method the annotated method
7282
* @return the payload used for idempotency
7383
*/
74-
private JsonNode getPayload(ProceedingJoinPoint pjp, Method method) {
84+
private JsonNode getPayload(ProceedingJoinPoint pjp, Method method, boolean isHandler) {
7585
JsonNode payload = null;
7686
// handleRequest or method with one parameter: get the first one
77-
if ((isHandlerMethod(pjp) && placedOnRequestHandler(pjp))
78-
|| pjp.getArgs().length == 1) {
87+
if (isHandler || pjp.getArgs().length == 1) {
7988
payload = JsonConfig.get().getObjectMapper().valueToTree(pjp.getArgs()[0]);
8089
} else {
8190
// Look for a parameter annotated with @IdempotencyKey

0 commit comments

Comments
 (0)