Skip to content

Commit 3bd8fdb

Browse files
tolutheodreamorosiam29d
committed
feat(idempotency): leverage new dynamodB Failed conditional writes behavior (#1779)
* created custom error with existingRecord field * fetch items in DB on ReturnValuesOnConditionCheckFailure * custom error field to fetch original Item from failed Item from Db * set base persitence layer to undefined. * converted error type to IdempotencyRecord * dev branch commit and unit tests * pull request corrections * chore: remove comment * chore: simplify conditional item hydration * chore: rebase * fix: use returned items + tests * add info about ReturnValuesOnConditionCheckFailure and conditional expression --------- Co-authored-by: Andrea Amorosi <[email protected]> Co-authored-by: Alexander Schueren <[email protected]>
1 parent 90981de commit 3bd8fdb

File tree

8 files changed

+101
-21
lines changed

8 files changed

+101
-21
lines changed

Diff for: docs/utilities/idempotency.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,11 @@ If you're not [changing the default configuration for the DynamoDB persistence l
109109
Larger items cannot be written to DynamoDB and will cause exceptions.
110110

111111
???+ info "Info: DynamoDB"
112-
Each function invocation will generally make 2 requests to DynamoDB. If the
113-
result returned by your Lambda is less than 1kb, you can expect 2 WCUs per invocation. For retried invocations, you will
114-
see 1WCU and 1RCU. Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/){target="_blank"} to
115-
estimate the cost.
112+
Each function invocation will make only 1 request to DynamoDB by using DynamoDB's [conditional expressions](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.ConditionExpressions.html){target="_blank"} to ensure that we don't overwrite existing records,
113+
and [ReturnValuesOnConditionCheckFailure](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html#DDB-PutItem-request-ReturnValuesOnConditionCheckFailure){target="_blank"} to return the record if it exists.
114+
See [AWS Blog post on handling conditional write errors](https://aws.amazon.com/blogs/database/handle-conditional-write-errors-in-high-concurrency-scenarios-with-amazon-dynamodb/) for more details.
115+
For retried invocations, you will see 1WCU and 1RCU.
116+
Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/){target="_blank"} to estimate the cost.
116117

117118
### MakeIdempotent function wrapper
118119

Diff for: packages/idempotency/src/IdempotencyHandler.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,10 @@ export class IdempotencyHandler<Func extends AnyFunction> {
318318
} catch (e) {
319319
if (e instanceof IdempotencyItemAlreadyExistsError) {
320320
const idempotencyRecord: IdempotencyRecord =
321-
await this.#persistenceStore.getRecord(
321+
e.existingRecord ||
322+
(await this.#persistenceStore.getRecord(
322323
this.#functionPayloadToBeHashed
323-
);
324+
));
324325

325326
return IdempotencyHandler.determineResultFromIdempotencyRecord(
326327
idempotencyRecord

Diff for: packages/idempotency/src/errors.ts

+10-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
1+
import type { IdempotencyRecord } from './persistence';
2+
13
/**
24
* Item attempting to be inserted into persistence store already exists and is not expired
35
*/
4-
class IdempotencyItemAlreadyExistsError extends Error {}
6+
class IdempotencyItemAlreadyExistsError extends Error {
7+
public existingRecord?: IdempotencyRecord;
8+
9+
public constructor(message?: string, existingRecord?: IdempotencyRecord) {
10+
super(message);
11+
this.existingRecord = existingRecord;
12+
}
13+
}
514

615
/**
716
* Item does not exist in persistence store

Diff for: packages/idempotency/src/persistence/BasePersistenceLayer.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,10 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
157157
}
158158

159159
if (this.getFromCache(idempotencyRecord.idempotencyKey)) {
160-
throw new IdempotencyItemAlreadyExistsError();
160+
throw new IdempotencyItemAlreadyExistsError(
161+
`Failed to put record for already existing idempotency key: ${idempotencyRecord.idempotencyKey}`,
162+
idempotencyRecord
163+
);
161164
}
162165

163166
await this._putRecord(idempotencyRecord);

Diff for: packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts

+18-7
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import { IdempotencyRecordStatus } from '../constants.js';
66
import type { DynamoDBPersistenceOptions } from '../types/DynamoDBPersistence.js';
77
import {
88
AttributeValue,
9+
ConditionalCheckFailedException,
910
DeleteItemCommand,
1011
DynamoDBClient,
1112
DynamoDBClientConfig,
12-
DynamoDBServiceException,
1313
GetItemCommand,
1414
PutItemCommand,
1515
UpdateItemCommand,
@@ -198,15 +198,26 @@ class DynamoDBPersistenceLayer extends BasePersistenceLayer {
198198
':inprogress': IdempotencyRecordStatus.INPROGRESS,
199199
}),
200200
ConditionExpression: conditionExpression,
201+
ReturnValuesOnConditionCheckFailure: 'ALL_OLD',
201202
})
202203
);
203204
} catch (error) {
204-
if (error instanceof DynamoDBServiceException) {
205-
if (error.name === 'ConditionalCheckFailedException') {
206-
throw new IdempotencyItemAlreadyExistsError(
207-
`Failed to put record for already existing idempotency key: ${record.idempotencyKey}`
208-
);
209-
}
205+
if (error instanceof ConditionalCheckFailedException) {
206+
const item = error.Item && unmarshall(error.Item);
207+
const idempotencyRecord =
208+
item &&
209+
new IdempotencyRecord({
210+
idempotencyKey: item[this.keyAttr],
211+
status: item[this.statusAttr],
212+
expiryTimestamp: item[this.expiryAttr],
213+
inProgressExpiryTimestamp: item[this.inProgressExpiryAttr],
214+
responseData: item[this.dataAttr],
215+
payloadHash: item[this.validationKeyAttr],
216+
});
217+
throw new IdempotencyItemAlreadyExistsError(
218+
`Failed to put record for already existing idempotency key: ${record.idempotencyKey}`,
219+
idempotencyRecord
220+
);
210221
}
211222

212223
throw error;

Diff for: packages/idempotency/tests/unit/IdempotencyHandler.test.ts

+22
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,28 @@ describe('Class IdempotencyHandler', () => {
119119
expect(saveInProgressSpy).toHaveBeenCalledTimes(1);
120120
});
121121

122+
test('when IdempotencyAlreadyInProgressError is thrown and it contains the existing item, it returns it directly', async () => {
123+
// Prepare
124+
const saveInProgressSpy = jest
125+
.spyOn(persistenceStore, 'saveInProgress')
126+
.mockRejectedValueOnce(
127+
new IdempotencyItemAlreadyExistsError(
128+
'Failed to put record for already existing idempotency key: idempotence-key',
129+
new IdempotencyRecord({
130+
idempotencyKey: 'key',
131+
status: IdempotencyRecordStatus.COMPLETED,
132+
responseData: 'Hi',
133+
})
134+
)
135+
);
136+
const getRecordSpy = jest.spyOn(persistenceStore, 'getRecord');
137+
138+
// Act & Assess
139+
await expect(idempotentHandler.handle()).resolves.toEqual('Hi');
140+
expect(saveInProgressSpy).toHaveBeenCalledTimes(1);
141+
expect(getRecordSpy).toHaveBeenCalledTimes(0);
142+
});
143+
122144
test('when IdempotencyInconsistentStateError is thrown, it retries until max retries are exhausted', async () => {
123145
// Prepare
124146
const mockProcessIdempotency = jest

Diff for: packages/idempotency/tests/unit/persistence/BasePersistenceLayer.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ describe('Class: BasePersistenceLayer', () => {
410410
// Act & Assess
411411
await expect(
412412
persistenceLayer.saveInProgress({ foo: 'bar' })
413-
).rejects.toThrow(new IdempotencyItemAlreadyExistsError());
413+
).rejects.toThrow(IdempotencyItemAlreadyExistsError);
414414
expect(putRecordSpy).toHaveBeenCalledTimes(0);
415415
});
416416

Diff for: packages/idempotency/tests/unit/persistence/DynamoDbPersistenceLayer.test.ts

+38-5
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import {
1212
IdempotencyItemNotFoundError,
1313
} from '../../../src/index.js';
1414
import {
15+
ConditionalCheckFailedException,
1516
DynamoDBClient,
16-
DynamoDBServiceException,
1717
PutItemCommand,
1818
GetItemCommand,
1919
UpdateItemCommand,
@@ -395,19 +395,30 @@ describe('Class: DynamoDBPersistenceLayer', () => {
395395
expiryTimestamp: 0,
396396
});
397397
client.on(PutItemCommand).rejects(
398-
new DynamoDBServiceException({
399-
$fault: 'client',
398+
new ConditionalCheckFailedException({
400399
$metadata: {
401400
httpStatusCode: 400,
402401
requestId: 'someRequestId',
403402
},
404-
name: 'ConditionalCheckFailedException',
403+
message: 'Conditional check failed',
404+
Item: {
405+
id: { S: 'test-key' },
406+
status: { S: 'INPROGRESS' },
407+
expiration: { N: Date.now().toString() },
408+
},
405409
})
406410
);
407411

408412
// Act & Assess
409413
await expect(persistenceLayer._putRecord(record)).rejects.toThrowError(
410-
IdempotencyItemAlreadyExistsError
414+
new IdempotencyItemAlreadyExistsError(
415+
`Failed to put record for already existing idempotency key: ${record.idempotencyKey}`,
416+
new IdempotencyRecord({
417+
idempotencyKey: record.idempotencyKey,
418+
status: IdempotencyRecordStatus.EXPIRED,
419+
expiryTimestamp: Date.now() / 1000 - 1,
420+
})
421+
)
411422
);
412423
});
413424

@@ -676,4 +687,26 @@ describe('Class: DynamoDBPersistenceLayer', () => {
676687
});
677688
});
678689
});
690+
691+
test('_putRecord throws Error when Item is undefined', async () => {
692+
// Prepare
693+
const persistenceLayer = new TestDynamoDBPersistenceLayer({
694+
tableName: dummyTableName,
695+
});
696+
const mockRecord = new IdempotencyRecord({
697+
idempotencyKey: 'test-key',
698+
status: 'INPROGRESS',
699+
expiryTimestamp: Date.now(),
700+
});
701+
702+
DynamoDBClient.prototype.send = jest.fn().mockRejectedValueOnce(
703+
new ConditionalCheckFailedException({
704+
message: 'Conditional check failed',
705+
$metadata: {},
706+
})
707+
);
708+
await expect(
709+
persistenceLayer._putRecord(mockRecord)
710+
).rejects.toThrowError();
711+
});
679712
});

0 commit comments

Comments
 (0)