Skip to content

feat(idempotency): leverage new dynamodB Failed conditional writes behavior #1779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 4, 2024
5 changes: 3 additions & 2 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,10 @@ export class IdempotencyHandler<Func extends AnyFunction> {
} catch (e) {
if (e instanceof IdempotencyItemAlreadyExistsError) {
const idempotencyRecord: IdempotencyRecord =
await this.#persistenceStore.getRecord(
e.existingRecord ||
(await this.#persistenceStore.getRecord(
this.#functionPayloadToBeHashed
);
));

return IdempotencyHandler.determineResultFromIdempotencyRecord(
idempotencyRecord
Expand Down
11 changes: 10 additions & 1 deletion packages/idempotency/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import type { IdempotencyRecord } from './persistence';

/**
* Item attempting to be inserted into persistence store already exists and is not expired
*/
class IdempotencyItemAlreadyExistsError extends Error {}
class IdempotencyItemAlreadyExistsError extends Error {
public existingRecord?: IdempotencyRecord;

public constructor(message?: string, existingRecord?: IdempotencyRecord) {
super(message);
this.existingRecord = existingRecord;
}
}

/**
* Item does not exist in persistence store
Expand Down
5 changes: 4 additions & 1 deletion packages/idempotency/src/persistence/BasePersistenceLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
}

if (this.getFromCache(idempotencyRecord.idempotencyKey)) {
throw new IdempotencyItemAlreadyExistsError();
throw new IdempotencyItemAlreadyExistsError(
`Failed to put record for already existing idempotency key: ${idempotencyRecord.idempotencyKey}`,
idempotencyRecord
);
}

await this._putRecord(idempotencyRecord);
Expand Down
25 changes: 18 additions & 7 deletions packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import { IdempotencyRecordStatus } from '../constants';
import type { DynamoDBPersistenceOptions } from '../types';
import {
AttributeValue,
ConditionalCheckFailedException,
DeleteItemCommand,
DynamoDBClient,
DynamoDBClientConfig,
DynamoDBServiceException,
GetItemCommand,
PutItemCommand,
UpdateItemCommand,
Expand Down Expand Up @@ -198,15 +198,26 @@ class DynamoDBPersistenceLayer extends BasePersistenceLayer {
':inprogress': IdempotencyRecordStatus.INPROGRESS,
}),
ConditionExpression: conditionExpression,
ReturnValuesOnConditionCheckFailure: 'ALL_OLD',
})
);
} catch (error) {
if (error instanceof DynamoDBServiceException) {
if (error.name === 'ConditionalCheckFailedException') {
throw new IdempotencyItemAlreadyExistsError(
`Failed to put record for already existing idempotency key: ${record.idempotencyKey}`
);
}
if (error instanceof ConditionalCheckFailedException) {
const item = error.Item && unmarshall(error.Item);
const idempotencyRecord =
item &&
new IdempotencyRecord({
idempotencyKey: item[this.keyAttr],
status: item[this.statusAttr],
expiryTimestamp: item[this.expiryAttr],
inProgressExpiryTimestamp: item[this.inProgressExpiryAttr],
responseData: item[this.dataAttr],
payloadHash: item[this.validationKeyAttr],
});
throw new IdempotencyItemAlreadyExistsError(
`Failed to put record for already existing idempotency key: ${record.idempotencyKey}`,
idempotencyRecord
);
}

throw error;
Expand Down
22 changes: 22 additions & 0 deletions packages/idempotency/tests/unit/IdempotencyHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,28 @@ describe('Class IdempotencyHandler', () => {
expect(saveInProgressSpy).toHaveBeenCalledTimes(1);
});

test('when IdempotencyAlreadyInProgressError is thrown and it contains the existing item, it returns it directly', async () => {
// Prepare
const saveInProgressSpy = jest
.spyOn(persistenceStore, 'saveInProgress')
.mockRejectedValueOnce(
new IdempotencyItemAlreadyExistsError(
'Failed to put record for already existing idempotency key: idempotence-key',
new IdempotencyRecord({
idempotencyKey: 'key',
status: IdempotencyRecordStatus.COMPLETED,
responseData: 'Hi',
})
)
);
const getRecordSpy = jest.spyOn(persistenceStore, 'getRecord');

// Act & Assess
await expect(idempotentHandler.handle()).resolves.toEqual('Hi');
expect(saveInProgressSpy).toHaveBeenCalledTimes(1);
expect(getRecordSpy).toHaveBeenCalledTimes(0);
});

test('when IdempotencyInconsistentStateError is thrown, it retries until max retries are exhausted', async () => {
// Prepare
const mockProcessIdempotency = jest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ describe('Class: BasePersistenceLayer', () => {
// Act & Assess
await expect(
persistenceLayer.saveInProgress({ foo: 'bar' })
).rejects.toThrow(new IdempotencyItemAlreadyExistsError());
).rejects.toThrow(IdempotencyItemAlreadyExistsError);
expect(putRecordSpy).toHaveBeenCalledTimes(0);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { IdempotencyRecord } from '../../../src/persistence';
import type { DynamoDBPersistenceOptions } from '../../../src/types';
import { IdempotencyRecordStatus } from '../../../src';
import {
ConditionalCheckFailedException,
DynamoDBClient,
DynamoDBServiceException,
PutItemCommand,
GetItemCommand,
UpdateItemCommand,
Expand Down Expand Up @@ -395,19 +395,30 @@ describe('Class: DynamoDBPersistenceLayer', () => {
expiryTimestamp: 0,
});
client.on(PutItemCommand).rejects(
new DynamoDBServiceException({
$fault: 'client',
new ConditionalCheckFailedException({
$metadata: {
httpStatusCode: 400,
requestId: 'someRequestId',
},
name: 'ConditionalCheckFailedException',
message: 'Conditional check failed',
Item: {
id: { S: 'test-key' },
status: { S: 'INPROGRESS' },
expiration: { N: Date.now().toString() },
},
})
);

// Act & Assess
await expect(persistenceLayer._putRecord(record)).rejects.toThrowError(
IdempotencyItemAlreadyExistsError
new IdempotencyItemAlreadyExistsError(
`Failed to put record for already existing idempotency key: ${record.idempotencyKey}`,
new IdempotencyRecord({
idempotencyKey: record.idempotencyKey,
status: IdempotencyRecordStatus.EXPIRED,
expiryTimestamp: Date.now() / 1000 - 1,
})
)
);
});

Expand Down Expand Up @@ -676,4 +687,26 @@ describe('Class: DynamoDBPersistenceLayer', () => {
});
});
});

test('_putRecord throws Error when Item is undefined', async () => {
// Prepare
const persistenceLayer = new TestDynamoDBPersistenceLayer({
tableName: dummyTableName,
});
const mockRecord = new IdempotencyRecord({
idempotencyKey: 'test-key',
status: 'INPROGRESS',
expiryTimestamp: Date.now(),
});

DynamoDBClient.prototype.send = jest.fn().mockRejectedValueOnce(
new ConditionalCheckFailedException({
message: 'Conditional check failed',
$metadata: {},
})
);
await expect(
persistenceLayer._putRecord(mockRecord)
).rejects.toThrowError();
});
});