Skip to content

feat(idempotency): add local cache to BasePersistenceLayer #1396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/idempotency/src/IdempotencyConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class IdempotencyConfig {
public expiresAfterSeconds: number;
public hashFunction: string;
public lambdaContext?: Context;
public maxLocalCacheSize: number;
public payloadValidationJmesPath?: string;
public throwOnNoIdempotencyKey: boolean;
public useLocalCache: boolean;
Expand All @@ -16,6 +17,7 @@ class IdempotencyConfig {
this.throwOnNoIdempotencyKey = config.throwOnNoIdempotencyKey ?? false;
this.expiresAfterSeconds = config.expiresAfterSeconds ?? 3600; // 1 hour default
this.useLocalCache = config.useLocalCache ?? false;
this.maxLocalCacheSize = config.maxLocalCacheSize ?? 1000;
this.hashFunction = config.hashFunction ?? 'md5';
this.lambdaContext = config.lambdaContext;
}
Expand Down
82 changes: 68 additions & 14 deletions packages/idempotency/src/persistence/BasePersistenceLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import type {
import { EnvironmentVariablesService } from '../config';
import { IdempotencyRecord } from './IdempotencyRecord';
import { BasePersistenceLayerInterface } from './BasePersistenceLayerInterface';
import { IdempotencyValidationError } from '../Exceptions';
import { IdempotencyItemAlreadyExistsError, IdempotencyValidationError } from '../Exceptions';
import { LRUCache } from './LRUCache';

abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
public idempotencyKeyPrefix: string;
private cache?: LRUCache<string, IdempotencyRecord>;
private configured: boolean = false;
// envVarsService is always initialized in the constructor
private envVarsService!: EnvironmentVariablesService;
Expand All @@ -25,7 +27,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
private useLocalCache: boolean = false;
private validationKeyJmesPath?: string;

public constructor() {
public constructor() {
this.envVarsService = new EnvironmentVariablesService();
this.idempotencyKeyPrefix = this.getEnvVarsService().getFunctionName();
}
Expand Down Expand Up @@ -55,7 +57,10 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
this.throwOnNoIdempotencyKey = idempotencyConfig?.throwOnNoIdempotencyKey || false;
this.eventKeyJmesPath = idempotencyConfig.eventKeyJmesPath;
this.expiresAfterSeconds = idempotencyConfig.expiresAfterSeconds; // 1 hour default
// TODO: Add support for local cache
this.useLocalCache = idempotencyConfig.useLocalCache;
if (this.useLocalCache) {
this.cache = new LRUCache({ maxSize: idempotencyConfig.maxLocalCacheSize });
}
this.hashFunction = idempotencyConfig.hashFunction;
}

Expand All @@ -64,13 +69,15 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
*
* @param data - the data payload that will be hashed to create the hash portion of the idempotency key
*/
public async deleteRecord(data: Record<string, unknown>): Promise<void> {
const idempotencyRecord = new IdempotencyRecord({
public async deleteRecord(data: Record<string, unknown>): Promise<void> {
const idempotencyRecord = new IdempotencyRecord({
idempotencyKey: this.getHashedIdempotencyKey(data),
status: IdempotencyRecordStatus.EXPIRED
});

await this._deleteRecord(idempotencyRecord);

this.deleteFromCache(idempotencyRecord.idempotencyKey);
}

/**
Expand All @@ -81,7 +88,15 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
public async getRecord(data: Record<string, unknown>): Promise<IdempotencyRecord> {
const idempotencyKey = this.getHashedIdempotencyKey(data);

const cachedRecord = this.getFromCache(idempotencyKey);
if (cachedRecord) {
this.validatePayload(data, cachedRecord);

return cachedRecord;
}

const record = await this._getRecord(idempotencyKey);
this.saveToCache(record);
this.validatePayload(data, record);

return record;
Expand All @@ -97,7 +112,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
* @param data - the data payload that will be hashed to create the hash portion of the idempotency key
* @param remainingTimeInMillis - the remaining time left in the lambda execution context
*/
public async saveInProgress(data: Record<string, unknown>, remainingTimeInMillis?: number): Promise<void> {
public async saveInProgress(data: Record<string, unknown>, remainingTimeInMillis?: number): Promise<void> {
const idempotencyRecord = new IdempotencyRecord({
idempotencyKey: this.getHashedIdempotencyKey(data),
status: IdempotencyRecordStatus.INPROGRESS,
Expand All @@ -113,6 +128,10 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
);
}

if (this.getFromCache(idempotencyRecord.idempotencyKey)) {
throw new IdempotencyItemAlreadyExistsError();
}

await this._putRecord(idempotencyRecord);
}

Expand All @@ -123,7 +142,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
* @param data - the data payload that will be hashed to create the hash portion of the idempotency key
* @param result - the result of the successfully completed function
*/
public async saveSuccess(data: Record<string, unknown>, result: Record<string, unknown>): Promise<void> {
public async saveSuccess(data: Record<string, unknown>, result: Record<string, unknown>): Promise<void> {
const idempotencyRecord = new IdempotencyRecord({
idempotencyKey: this.getHashedIdempotencyKey(data),
status: IdempotencyRecordStatus.COMPLETED,
Expand All @@ -133,23 +152,33 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
});

await this._updateRecord(idempotencyRecord);

this.saveToCache(idempotencyRecord);
}

protected abstract _deleteRecord(record: IdempotencyRecord): Promise<void>;
protected abstract _getRecord(idempotencyKey: string): Promise<IdempotencyRecord>;
protected abstract _putRecord(record: IdempotencyRecord): Promise<void>;
protected abstract _updateRecord(record: IdempotencyRecord): Promise<void>;

private deleteFromCache(idempotencyKey: string): void {
if (!this.useLocalCache) return;
// Delete from local cache if it exists
if (this.cache?.has(idempotencyKey)) {
this.cache?.remove(idempotencyKey);
}
}

/**
* Generates a hash of the data and returns the digest of that hash
*
* @param data the data payload that will generate the hash
* @returns the digest of the generated hash
*/
private generateHash(data: string): string{
private generateHash(data: string): string {
const hash: Hash = createHash(this.hashFunction);
hash.update(data);

return hash.digest('base64');
}

Expand All @@ -168,10 +197,21 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
*/
private getExpiryTimestamp(): number {
const currentTime: number = Date.now() / 1000;

return currentTime + this.expiresAfterSeconds;
}

private getFromCache(idempotencyKey: string): IdempotencyRecord | undefined {
if (!this.useLocalCache) return undefined;
const cachedRecord = this.cache?.get(idempotencyKey);
if (cachedRecord) {
// if record is not expired, return it
if (!cachedRecord.isExpired()) return cachedRecord;
// if record is expired, delete it from cache
this.deleteFromCache(idempotencyKey);
}
}

/**
* Generates the idempotency key used to identify records in the persistence store.
*
Expand All @@ -182,14 +222,14 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
if (this.eventKeyJmesPath) {
data = search(data, this.eventKeyJmesPath);
}

if (BasePersistenceLayer.isMissingIdempotencyKey(data)) {
if (this.throwOnNoIdempotencyKey) {
throw new Error('No data found to create a hashed idempotency_key');
}
console.warn(`No value found for idempotency_key. jmespath: ${this.eventKeyJmesPath}`);
}

return `${this.idempotencyKeyPrefix}#${this.generateHash(JSON.stringify(data))}`;
}

Expand All @@ -204,7 +244,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
// Therefore, the assertion is safe.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
data = search(data, this.validationKeyJmesPath!);

return this.generateHash(JSON.stringify(data));
}

Expand All @@ -223,6 +263,20 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
return !data;
}

/**
* Save record to local cache except for when status is `INPROGRESS`.
*
* We can't cache `INPROGRESS` records because we have no way to reflect updates
* that might happen to the record outside of the execution context of the function.
*
* @param record - record to save
*/
private saveToCache(record: IdempotencyRecord): void {
if (!this.useLocalCache) return;
if (record.getStatus() === IdempotencyRecordStatus.INPROGRESS) return;
this.cache?.add(record.idempotencyKey, record);
}

private validatePayload(data: Record<string, unknown>, record: IdempotencyRecord): void {
if (this.payloadValidationEnabled) {
const hashedPayload: string = this.getHashedPayload(data);
Expand Down
4 changes: 2 additions & 2 deletions packages/idempotency/src/persistence/IdempotencyRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class IdempotencyRecord {
public responseData?: Record<string, unknown>;
private status: IdempotencyRecordStatus;

public constructor(config: IdempotencyRecordOptions) {
public constructor(config: IdempotencyRecordOptions) {
this.idempotencyKey = config.idempotencyKey;
this.expiryTimestamp = config.expiryTimestamp;
this.inProgressExpiryTimestamp = config.inProgressExpiryTimestamp;
Expand All @@ -38,7 +38,7 @@ class IdempotencyRecord {
}
}

private isExpired(): boolean {
public isExpired(): boolean {
return this.expiryTimestamp !== undefined && ((Date.now() / 1000) > this.expiryTimestamp);
}
}
Expand Down
Loading