Skip to content

improv(idempotency): handle functions with no return value #2521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 52 additions & 35 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ export class IdempotencyHandler<Func extends AnyFunction> {
let e;
for (let retryNo = 0; retryNo <= MAX_RETRIES; retryNo++) {
try {
const result = await this.#saveInProgressOrReturnExistingResult();
if (result) return result as ReturnType<Func>;
const { isIdempotent, result } =
await this.#saveInProgressOrReturnExistingResult();
if (isIdempotent) return result as ReturnType<Func>;

return await this.getFunctionResult();
} catch (error) {
Expand Down Expand Up @@ -215,8 +216,9 @@ export class IdempotencyHandler<Func extends AnyFunction> {
): Promise<ReturnType<Func> | void> {
for (let retryNo = 0; retryNo <= MAX_RETRIES; retryNo++) {
try {
const result = await this.#saveInProgressOrReturnExistingResult();
if (result) {
const { isIdempotent, result } =
await this.#saveInProgressOrReturnExistingResult();
if (isIdempotent) {
await callback(request);

return result as ReturnType<Func>;
Expand Down Expand Up @@ -310,43 +312,58 @@ export class IdempotencyHandler<Func extends AnyFunction> {
* Before returning a result, we might neede to look up the idempotency record
* and validate it to ensure that it is consistent with the payload to be hashed.
*/
#saveInProgressOrReturnExistingResult =
async (): Promise<JSONValue | void> => {
try {
await this.#persistenceStore.saveInProgress(
this.#functionPayloadToBeHashed,
this.#idempotencyConfig.lambdaContext?.getRemainingTimeInMillis()
);
} catch (e) {
if (e instanceof IdempotencyItemAlreadyExistsError) {
let idempotencyRecord = e.existingRecord;
if (idempotencyRecord !== undefined) {
// If the error includes the existing record, we can use it to validate
// the record being processed and cache it in memory.
idempotencyRecord = this.#persistenceStore.processExistingRecord(
idempotencyRecord,
this.#functionPayloadToBeHashed
);
// If the error doesn't include the existing record, we need to fetch
// it from the persistence layer. In doing so, we also call the processExistingRecord
// method to validate the record and cache it in memory.
} else {
idempotencyRecord = await this.#persistenceStore.getRecord(
this.#functionPayloadToBeHashed
);
}
#saveInProgressOrReturnExistingResult = async (): Promise<{
isIdempotent: boolean;
result: JSONValue;
}> => {
const returnValue: {
isIdempotent: boolean;
result: JSONValue;
} = {
isIdempotent: false,
result: undefined,
};
try {
await this.#persistenceStore.saveInProgress(
this.#functionPayloadToBeHashed,
this.#idempotencyConfig.lambdaContext?.getRemainingTimeInMillis()
);

return IdempotencyHandler.determineResultFromIdempotencyRecord(
idempotencyRecord
return returnValue;
} catch (e) {
if (e instanceof IdempotencyItemAlreadyExistsError) {
let idempotencyRecord = e.existingRecord;
if (idempotencyRecord !== undefined) {
// If the error includes the existing record, we can use it to validate
// the record being processed and cache it in memory.
idempotencyRecord = this.#persistenceStore.processExistingRecord(
idempotencyRecord,
this.#functionPayloadToBeHashed
);
// If the error doesn't include the existing record, we need to fetch
// it from the persistence layer. In doing so, we also call the processExistingRecord
// method to validate the record and cache it in memory.
} else {
throw new IdempotencyPersistenceLayerError(
'Failed to save in progress record to idempotency store',
e as Error
idempotencyRecord = await this.#persistenceStore.getRecord(
this.#functionPayloadToBeHashed
);
}

returnValue.isIdempotent = true;
returnValue.result =
IdempotencyHandler.determineResultFromIdempotencyRecord(
idempotencyRecord
);

return returnValue;
} else {
throw new IdempotencyPersistenceLayerError(
'Failed to save in progress record to idempotency store',
e as Error
);
}
};
}
};

/**
* Save a successful result to the idempotency store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,21 +226,24 @@ class DynamoDBPersistenceLayer extends BasePersistenceLayer {

protected async _updateRecord(record: IdempotencyRecord): Promise<void> {
const updateExpressionFields: string[] = [
'#response_data = :response_data',
'#expiry = :expiry',
'#status = :status',
];
const expressionAttributeNames: Record<string, string> = {
'#response_data': this.dataAttr,
'#expiry': this.expiryAttr,
'#status': this.statusAttr,
};
const expressionAttributeValues: Record<string, unknown> = {
':response_data': record.responseData,
':expiry': record.expiryTimestamp,
':status': record.getStatus(),
};

if (record.responseData !== undefined) {
updateExpressionFields.push('#response_data = :response_data');
expressionAttributeNames['#response_data'] = this.dataAttr;
expressionAttributeValues[':response_data'] = record.responseData;
}

if (this.isPayloadValidationEnabled()) {
updateExpressionFields.push('#validation_key = :validation_key');
expressionAttributeNames['#validation_key'] = this.validationKeyAttr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ class DefaultLambda implements LambdaInterface {
public async handler(
_event: Record<string, unknown>,
_context: Context
): Promise<string> {
): Promise<void> {
logger.info(`Got test event: ${JSON.stringify(_event)}`);
// sleep to enforce error with parallel execution
await new Promise((resolve) => setTimeout(resolve, 1000));

return 'Hello World';
// We return void to test that the utility handles it correctly
return;
}

@idempotent({
Expand Down
2 changes: 1 addition & 1 deletion packages/idempotency/tests/e2e/idempotentDecorator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ describe('Idempotency e2e test decorator, default settings', () => {
expect(idempotencyRecord.Items?.[0].id).toEqual(
`${functionNameDefault}#${payloadHash}`
);
expect(idempotencyRecord.Items?.[0].data).toEqual('Hello World');
expect(idempotencyRecord.Items?.[0].data).toBeUndefined();
expect(idempotencyRecord.Items?.[0].status).toEqual('COMPLETED');
// During the first invocation the handler should be called, so the logs should contain 1 log
expect(functionLogs[0]).toHaveLength(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ describe('Class: DynamoDBPersistenceLayer', () => {
id: dummyKey,
}),
UpdateExpression:
'SET #response_data = :response_data, #expiry = :expiry, #status = :status',
'SET #expiry = :expiry, #status = :status, #response_data = :response_data',
ExpressionAttributeNames: {
'#status': 'status',
'#expiry': 'expiration',
Expand All @@ -618,6 +618,41 @@ describe('Class: DynamoDBPersistenceLayer', () => {
});
});

it('updates the item when the response_data is undefined', async () => {
// Prepare
const persistenceLayer = new TestDynamoDBPersistenceLayer({
tableName: dummyTableName,
});
const status = IdempotencyRecordStatus.EXPIRED;
const expiryTimestamp = Date.now();
const record = new IdempotencyRecord({
idempotencyKey: dummyKey,
status,
expiryTimestamp,
responseData: undefined,
});

// Act
persistenceLayer._updateRecord(record);

// Assess
expect(client).toReceiveCommandWith(UpdateItemCommand, {
TableName: dummyTableName,
Key: marshall({
id: dummyKey,
}),
UpdateExpression: 'SET #expiry = :expiry, #status = :status',
ExpressionAttributeNames: {
'#status': 'status',
'#expiry': 'expiration',
},
ExpressionAttributeValues: marshall({
':status': IdempotencyRecordStatus.EXPIRED,
':expiry': expiryTimestamp,
}),
});
});

test('when called to update a record and payload validation is enabled, it adds the payload hash to the update expression', async () => {
// Prepare
const persistenceLayer = new TestDynamoDBPersistenceLayer({
Expand Down Expand Up @@ -646,7 +681,7 @@ describe('Class: DynamoDBPersistenceLayer', () => {
id: dummyKey,
}),
UpdateExpression:
'SET #response_data = :response_data, #expiry = :expiry, #status = :status, #validation_key = :validation_key',
'SET #expiry = :expiry, #status = :status, #response_data = :response_data, #validation_key = :validation_key',
ExpressionAttributeNames: {
'#status': 'status',
'#expiry': 'expiration',
Expand Down