Skip to content

fix(idempotency): skip persistence for optional idempotency key #1507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"scripts": {
"init-environment": "husky install",
"test": "npm t -ws",
"test:e2e": "npm run test:e2e -ws",
"commit": "commit",
"package": "npm run package -ws",
"setup-local": "npm ci && npm run build && npm run init-environment",
Expand Down
12 changes: 3 additions & 9 deletions packages/commons/tests/utils/e2eUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* E2E utils is used by e2e tests. They are helper function that calls either CDK or SDK
* to interact with services.
*/
import { App, CfnOutput, Stack, Duration } from 'aws-cdk-lib';
import { App, CfnOutput, Duration, Stack } from 'aws-cdk-lib';
import {
NodejsFunction,
NodejsFunctionProps,
Expand Down Expand Up @@ -91,15 +91,11 @@ export const invokeFunction = async (
): Promise<InvocationLogs[]> => {
const invocationLogs: InvocationLogs[] = [];

const promiseFactory = (
index?: number,
includeIndex = true
): Promise<void> => {
const promiseFactory = (index?: number): Promise<void> => {
// in some cases we need to send a payload without the index, i.e. idempotency tests
const payloadToSend = includeIndex
? { invocation: index, ...payload }
: { ...payload };

const invokePromise = lambdaClient
.send(
new InvokeCommand({
Expand All @@ -126,9 +122,7 @@ export const invokeFunction = async (

const invocation =
invocationMode == 'PARALLEL'
? Promise.all(
promiseFactories.map((factory, index) => factory(index, includeIndex))
)
? Promise.all(promiseFactories.map((factory, index) => factory(index)))
: chainPromises(promiseFactories);
await invocation;

Expand Down
2 changes: 2 additions & 0 deletions packages/idempotency/src/IdempotencyConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class IdempotencyConfig {
/**
* Throw an error if the idempotency key is not found in the event.
* In some cases, you may want to allow the request to continue without idempotency.
* If set to false and idempotency key is not found, the request will continue without idempotency.
* @default false
*/
public throwOnNoIdempotencyKey: boolean;
/**
Expand Down
31 changes: 31 additions & 0 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
import { BasePersistenceLayer, IdempotencyRecord } from './persistence';
import { IdempotencyConfig } from './IdempotencyConfig';
import { MAX_RETRIES } from './constants';
import { search } from 'jmespath';

/**
* @internal
Expand Down Expand Up @@ -127,6 +128,17 @@ export class IdempotencyHandler<U> {
}

public async processIdempotency(): Promise<U> {
// early return if we should skip idempotency completely
if (
IdempotencyHandler.shouldSkipIdempotency(
this.idempotencyConfig.eventKeyJmesPath,
this.idempotencyConfig.throwOnNoIdempotencyKey,
this.fullFunctionPayload
)
) {
return await this.functionToMakeIdempotent(this.fullFunctionPayload);
}

try {
await this.persistenceStore.saveInProgress(
this.functionPayloadToBeHashed
Expand All @@ -146,4 +158,23 @@ export class IdempotencyHandler<U> {

return this.getFunctionResult();
}

/**
* avoid idempotency if the eventKeyJmesPath is not present in the payload and throwOnNoIdempotencyKey is false
* static so {@link makeHandlerIdempotent} middleware can use it
* TOOD: refactor so middy uses IdempotencyHandler internally wihtout reimplementing the logic
* @param eventKeyJmesPath
* @param throwOnNoIdempotencyKey
* @param fullFunctionPayload
* @private
*/
public static shouldSkipIdempotency(
eventKeyJmesPath: string,
throwOnNoIdempotencyKey: boolean,
fullFunctionPayload: Record<string, unknown>
): boolean {
return (eventKeyJmesPath &&
!throwOnNoIdempotencyKey &&
!search(fullFunctionPayload, eventKeyJmesPath)) as boolean;
}
}
24 changes: 22 additions & 2 deletions packages/idempotency/src/middleware/makeHandlerIdempotent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { IdempotencyHandler } from '../IdempotencyHandler';
import { IdempotencyConfig } from '../IdempotencyConfig';
import { cleanupMiddlewares } from '@aws-lambda-powertools/commons/lib/middleware';
import {
IdempotencyInconsistentStateError,
IdempotencyItemAlreadyExistsError,
IdempotencyPersistenceLayerError,
IdempotencyInconsistentStateError,
} from '../Exceptions';
import { IdempotencyRecord } from '../persistence';
import { MAX_RETRIES } from '../constants';
Expand Down Expand Up @@ -50,6 +50,9 @@ const makeHandlerIdempotent = (
config: idempotencyConfig,
});

// keep the flag for after and onError checks
let shouldSkipIdempotency = false;

/**
* Function called before the handler is executed.
*
Expand All @@ -72,6 +75,18 @@ const makeHandlerIdempotent = (
request: MiddyLikeRequest,
retryNo = 0
): Promise<unknown | void> => {
if (
IdempotencyHandler.shouldSkipIdempotency(
idempotencyConfig.eventKeyJmesPath,
idempotencyConfig.throwOnNoIdempotencyKey,
request.event as Record<string, unknown>
)
) {
// set the flag to skip checks in after and onError
shouldSkipIdempotency = true;

return;
}
try {
await persistenceStore.saveInProgress(
request.event as Record<string, unknown>,
Expand Down Expand Up @@ -114,7 +129,6 @@ const makeHandlerIdempotent = (
}
}
};

/**
* Function called after the handler has executed successfully.
*
Expand All @@ -125,6 +139,9 @@ const makeHandlerIdempotent = (
* @param request - The Middy request object
*/
const after = async (request: MiddyLikeRequest): Promise<void> => {
if (shouldSkipIdempotency) {
return;
}
try {
await persistenceStore.saveSuccess(
request.event as Record<string, unknown>,
Expand All @@ -146,6 +163,9 @@ const makeHandlerIdempotent = (
* @param request - The Middy request object
*/
const onError = async (request: MiddyLikeRequest): Promise<void> => {
if (shouldSkipIdempotency) {
return;
}
try {
await persistenceStore.deleteRecord(
request.event as Record<string, unknown>
Expand Down
7 changes: 5 additions & 2 deletions packages/idempotency/src/persistence/BasePersistenceLayer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createHash, Hash } from 'node:crypto';
import { search } from 'jmespath';
import { IdempotencyRecordStatus } from '../types';
import type { BasePersistenceLayerOptions } from '../types';
import { IdempotencyRecordStatus } from '../types';
import { EnvironmentVariablesService } from '../config';
import { IdempotencyRecord } from './IdempotencyRecord';
import { BasePersistenceLayerInterface } from './BasePersistenceLayerInterface';
Expand Down Expand Up @@ -176,10 +176,13 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
}

protected abstract _deleteRecord(record: IdempotencyRecord): Promise<void>;

protected abstract _getRecord(
idempotencyKey: string
): Promise<IdempotencyRecord>;

protected abstract _putRecord(record: IdempotencyRecord): Promise<void>;

protected abstract _updateRecord(record: IdempotencyRecord): Promise<void>;

private deleteFromCache(idempotencyKey: string): void {
Expand Down Expand Up @@ -294,7 +297,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
* Save record to local cache except for when status is `INPROGRESS`.
*
* We can't cache `INPROGRESS` records because we have no way to reflect updates
* that might happen to the record outside of the execution context of the function.
* that might happen to the record outside the execution context of the function.
*
* @param record - record to save
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { LambdaInterface } from '@aws-lambda-powertools/commons';
import { idempotentFunction, idempotentLambdaHandler } from '../../src';
import { Logger } from '../../../logger';
import { DynamoDBPersistenceLayer } from '../../src/persistence/DynamoDBPersistenceLayer';
import { IdempotencyConfig } from '../../src/';

const IDEMPOTENCY_TABLE_NAME =
process.env.IDEMPOTENCY_TABLE_NAME || 'table_name';
Expand Down Expand Up @@ -62,10 +63,25 @@ class DefaultLambda implements LambdaInterface {
_context: Context
): Promise<string> {
logger.info(`Got test event: ${JSON.stringify(_event)}`);
// sleep for 5 seconds

throw new Error('Failed');
}

@idempotentLambdaHandler({
persistenceStore: dynamoDBPersistenceLayer,
config: new IdempotencyConfig({
eventKeyJmesPath: 'idempotencyKey',
throwOnNoIdempotencyKey: false,
}),
})
public async handlerWithOptionalIdempoitencyKey(
_event: TestEvent,
_context: Context
): Promise<string> {
logger.info(`Got test event: ${JSON.stringify(_event)}`);

return 'This should not be stored in DynamoDB';
}
}

const defaultLambda = new DefaultLambda();
Expand All @@ -74,6 +90,9 @@ export const handlerCustomized =
defaultLambda.handlerCustomized.bind(defaultLambda);
export const handlerFails = defaultLambda.handlerFails.bind(defaultLambda);

export const handlerWithOptionalIdempoitencyKey =
defaultLambda.handlerWithOptionalIdempoitencyKey.bind(defaultLambda);

const logger = new Logger();

class LambdaWithKeywordArgument implements LambdaInterface {
Expand Down
40 changes: 39 additions & 1 deletion packages/idempotency/tests/e2e/idempotencyDecorator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
destroyStack,
} from '../../../commons/tests/utils/cdk-cli';
import { LEVEL } from '../../../commons/tests/utils/InvocationLogs';
import { GetCommand } from '@aws-sdk/lib-dynamodb';
import { GetCommand, ScanCommand } from '@aws-sdk/lib-dynamodb';
import { createHash } from 'node:crypto';
import { createIdempotencyResources } from '../helpers/idempotencyUtils';

Expand Down Expand Up @@ -110,6 +110,23 @@ createIdempotencyResources(
functionNameFails,
'handlerFails'
);

const functionNameOptionalIdempotencyKey = generateUniqueName(
RESOURCE_NAME_PREFIX,
uuid,
runtime,
'optionalIdempotencyKey'
);
const ddbTableNameOptionalIdempotencyKey =
stackName + '-optional-idempotencyKey-table';
createIdempotencyResources(
stack,
runtime,
ddbTableNameOptionalIdempotencyKey,
decoratorFunctionFile,
functionNameOptionalIdempotencyKey,
'handlerWithOptionalIdempoitencyKey'
);
describe('Idempotency e2e test decorator, default settings', () => {
beforeAll(async () => {
await deployStack(app, stack);
Expand Down Expand Up @@ -285,6 +302,27 @@ describe('Idempotency e2e test decorator, default settings', () => {
TEST_CASE_TIMEOUT
);

test(
'when called with a function with optional idempotency key and thorwOnNoIdempotencyKey is false, it does not create ddb entry',
async () => {
const payload = { foo: 'baz' }; // we set eventKeyJmesPath: 'idempotencyKey' in the idempotency configuration
await invokeFunction(
functionNameOptionalIdempotencyKey,
2,
'PARALLEL',
payload,
false
);
const result = await ddb.send(
new ScanCommand({
TableName: ddbTableNameOptionalIdempotencyKey,
})
);
expect(result?.Items).toEqual([]);
},
TEST_CASE_TIMEOUT
);

afterAll(async () => {
if (!process.env.DISABLE_TEARDOWN) {
await destroyStack(app, stack);
Expand Down
38 changes: 38 additions & 0 deletions packages/idempotency/tests/unit/IdempotencyHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,44 @@ describe('Class IdempotencyHandler', () => {
expect(mockGetRecord).toHaveBeenCalledTimes(1);
expect(mockDetermineResultFromIdempotencyRecord).toHaveBeenCalledTimes(1);
});

test('when throwOnNoIdempotencyKey is false and the key is missing, we skip idempotency', async () => {
const idempotentHandlerSkips = new IdempotencyHandler({
functionToMakeIdempotent: mockFunctionToMakeIdempotent,
functionPayloadToBeHashed: mockFunctionPayloadToBeHashed,
persistenceStore: mockIdempotencyOptions.persistenceStore,
fullFunctionPayload: mockFullFunctionPayload,
idempotencyConfig: new IdempotencyConfig({
throwOnNoIdempotencyKey: false,
eventKeyJmesPath: 'idempotencyKey',
}),
});

const mockSaveInProgress = jest.spyOn(
mockIdempotencyOptions.persistenceStore,
'saveInProgress'
);

const mockSaveSuccessfulResult = jest.spyOn(
mockIdempotencyOptions.persistenceStore,
'saveSuccess'
);
const mockGetRecord = jest.spyOn(
mockIdempotencyOptions.persistenceStore,
'getRecord'
);

mockFunctionToMakeIdempotent.mockImplementation(() => {
return 'result';
});

await expect(idempotentHandlerSkips.processIdempotency()).resolves.toBe(
'result'
);
expect(mockSaveInProgress).toHaveBeenCalledTimes(0);
expect(mockGetRecord).toHaveBeenCalledTimes(0);
expect(mockSaveSuccessfulResult).toHaveBeenCalledTimes(0);
});
});

describe('Method: getFunctionResult', () => {
Expand Down
Loading