Skip to content

feat(batch): Implementation of base batch processing classes #1588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
31b2306
chore: init workspace
dreamorosi Jun 27, 2023
a3dddbf
chore: init workspace
dreamorosi Jun 27, 2023
90d4ce0
Merge branch 'aws-powertools:main' into batch
erikayao93 Jun 30, 2023
d87b895
Initial base class implementation
erikayao93 Jun 30, 2023
fdcb59f
Added BatchProcessor implementation, attempted fix for async
erikayao93 Jul 3, 2023
8bb91d5
Added unit tests
erikayao93 Jul 6, 2023
8767d6a
Refactoring unit tests
erikayao93 Jul 6, 2023
a90e2b6
Lint fix, updated docstrings
erikayao93 Jul 7, 2023
fc32c81
Merge branch 'aws-powertools:main' into batch
erikayao93 Jul 7, 2023
d417d71
Added response and identifier typings
erikayao93 Jul 7, 2023
863edc8
test(idempotency): improve integration tests for utility (#1591)
dreamorosi Jul 10, 2023
ea50345
Removed unnecessary type casting
erikayao93 Jul 10, 2023
eaee175
Moved exports for handlers and factories
erikayao93 Jul 10, 2023
500fbe0
Updated imports, refactored randomization in factories
erikayao93 Jul 10, 2023
002ef9e
Refactored EventType to be const instead of enum
erikayao93 Jul 10, 2023
d9be091
Refactored and added documentation for errors
erikayao93 Jul 10, 2023
604f04d
Removed debugging line
erikayao93 Jul 10, 2023
9f1696b
chore(ci): add canary to layer deployment (#1593)
Jul 10, 2023
3b5ccc1
docs(idempotency): write utility docs (#1592)
dreamorosi Jul 11, 2023
e34d1d9
build(internal): bump semver from 5.7.1 to 5.7.2 (#1594)
dependabot[bot] Jul 11, 2023
0fc4f1e
chore(idempotency): mark the utility ready public beta (#1595)
dreamorosi Jul 11, 2023
5266d55
docs(internal): update AWS SDK links to new docs (#1597)
dreamorosi Jul 11, 2023
16719c3
chore(maintenance): remove parameters utility from layer bundling and…
Jul 11, 2023
3f03c5c
chore(release): v1.11.1 [skip ci]
github-actions[bot] Jul 11, 2023
3251b9c
fix canary deploy in ci with correct workspace name (#1601)
Jul 11, 2023
7cfac60
chore: update layer ARN on documentation
Jul 11, 2023
840208f
Merge branch 'feat/batch' into batch
erikayao93 Jul 11, 2023
3de49db
Merge branch 'aws-powertools:main' into batch
erikayao93 Jul 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"docs/snippets",
"layers",
"examples/cdk",
"examples/sam"
"examples/sam",
"packages/batch"
],
"scripts": {
"init-environment": "husky install",
Expand Down
Empty file added packages/batch/README.md
Empty file.
28 changes: 28 additions & 0 deletions packages/batch/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module.exports = {
displayName: {
name: 'Powertools for AWS Lambda (TypeScript) utility: BATCH',
color: 'orange',
},
runner: 'groups',
preset: 'ts-jest',
transform: {
'^.+\\.ts?$': 'ts-jest',
},
moduleFileExtensions: ['js', 'ts'],
collectCoverageFrom: ['**/src/**/*.ts', '!**/node_modules/**'],
testMatch: ['**/?(*.)+(spec|test).ts'],
roots: ['<rootDir>/src', '<rootDir>/tests'],
testPathIgnorePatterns: ['/node_modules/'],
testEnvironment: 'node',
coveragePathIgnorePatterns: ['/node_modules/', '/types/'],
coverageThreshold: {
global: {
statements: 100,
branches: 100,
functions: 100,
lines: 100,
},
},
coverageReporters: ['json-summary', 'text', 'lcov'],
setupFiles: ['<rootDir>/tests/helpers/populateEnvironmentVariables.ts'],
};
53 changes: 53 additions & 0 deletions packages/batch/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"name": "@aws-lambda-powertools/batch",
"version": "1.10.0",
"description": "The batch processing package for the Powertools for AWS Lambda (TypeScript) library.",
"author": {
"name": "Amazon Web Services",
"url": "https://aws.amazon.com"
},
"private": true,
"scripts": {
"test": "npm run test:unit",
"test:unit": "jest --group=unit --detectOpenHandles --coverage --verbose",
"test:e2e:nodejs14x": "echo 'Not Implemented'",
"test:e2e:nodejs16x": "echo 'Not Implemented'",
"test:e2e:nodejs18x": "echo 'Not Implemented'",
"test:e2e": "echo 'Not Implemented'",
"watch": "jest --watch",
"build": "tsc",
"lint": "eslint --ext .ts,.js --no-error-on-unmatched-pattern .",
"lint-fix": "eslint --fix --ext .ts,.js --no-error-on-unmatched-pattern .",
"prebuild": "rimraf ./lib",
"prepack": "node ../../.github/scripts/release_patch_package_json.js ."
},
"lint-staged": {
"*.ts": "npm run lint-fix",
"*.js": "npm run lint-fix"
},
"homepage": "https://github.com/aws-powertools/powertools-lambda-typescript/tree/main/packages/batch#readme",
"license": "MIT-0",
"main": "./lib/index.js",
"types": "./lib/index.d.ts",
"files": [
"lib"
],
"repository": {
"type": "git",
"url": "git+https://github.com/aws-powertools/powertools-lambda-typescript.git"
},
"bugs": {
"url": "https://github.com/aws-powertools/powertools-lambda-typescript/issues"
},
"dependencies": {},
"keywords": [
"aws",
"lambda",
"powertools",
"batch",
"batch-processing",
"serverless",
"nodejs"
],
"devDependencies": {}
}
161 changes: 161 additions & 0 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Process batch and partially report failed items
*/
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import {
BasePartialProcessor,
BatchProcessingError,
DATA_CLASS_MAPPING,
DEFAULT_RESPONSE,
EventSourceDataClassTypes,
EventType,
ItemIdentifier,
BatchResponse,
} from '.';

abstract class BasePartialBatchProcessor extends BasePartialProcessor {
public COLLECTOR_MAPPING;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this lowercase & without snake_case?


public batchResponse: BatchResponse;

public eventType: EventType;

/**
* Initializes base batch processing class
* @param eventType Whether this is SQS, DynamoDB stream, or Kinesis data stream event
*/
public constructor(eventType: EventType) {
super();
this.eventType = eventType;
this.batchResponse = DEFAULT_RESPONSE;
this.COLLECTOR_MAPPING = {
[EventType.SQS]: () => this.collectSqsFailures(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just set the function reference, rather than wrapping it in a function?

[EventType.SQS]: this.collectSqsFailures

this way when someone calls this.COLLECTOR_MAPPING[EventType.SQS](), they call the fn itself.

[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(),
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(),
};
}

/**
* Report messages to be deleted in case of partial failures
*/
public clean(): void {
if (!this.hasMessagesToReport()) {
return;
}

if (this.entireBatchFailed()) {
throw new BatchProcessingError(
'All records failed processing. ' +
this.exceptions.length +
' individual errors logged separately below.',
this.exceptions
);
}

const messages: ItemIdentifier[] = this.getMessagesToReport();
this.batchResponse = { batchItemFailures: messages };
}

/**
* Collects identifiers of failed items for a DynamoDB stream
* @returns list of identifiers for failed items
*/
public collectDynamoDBFailures(): ItemIdentifier[] {
const failures: ItemIdentifier[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber;
if (msgId) {
failures.push({ itemIdentifier: msgId });
}
}

return failures;
}

/**
* Collects identifiers of failed items for a Kinesis stream
* @returns list of identifiers for failed items
*/
public collectKinesisFailures(): ItemIdentifier[] {
const failures: ItemIdentifier[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber;
failures.push({ itemIdentifier: msgId });
}

return failures;
}

/**
* Collects identifiers of failed items for an SQS batch
* @returns list of identifiers for failed items
*/
public collectSqsFailures(): ItemIdentifier[] {
const failures: ItemIdentifier[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as SQSRecord).messageId;
failures.push({ itemIdentifier: msgId });
}

return failures;
}

/**
* Determines whether all records in a batch failed to process
* @returns true if all records resulted in exception results
*/
public entireBatchFailed(): boolean {
return this.exceptions.length == this.records.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return this.exceptions.length == this.records.length;
return this.exceptions.length === this.records.length;

}

/**
* Collects identifiers for failed batch items
* @returns formatted messages to use in batch deletion
*/
public getMessagesToReport(): ItemIdentifier[] {
return this.COLLECTOR_MAPPING[this.eventType]();
}

/**
* Determines if any records failed to process
* @returns true if any records resulted in exception
*/
public hasMessagesToReport(): boolean {
if (this.failureMessages.length != 0) {
return true;
}

// console.debug('All ' + this.successMessages.length + ' records successfully processed');

return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return this.failureMessages.length > 0;


/**
* Remove results from previous execution
*/
public prepare(): void {
this.successMessages.length = 0;
this.failureMessages.length = 0;
this.exceptions.length = 0;
this.batchResponse = DEFAULT_RESPONSE;
}

/**
* @returns Batch items that failed processing, if any
*/
public response(): BatchResponse {
return this.batchResponse;
}

public toBatchType(
record: EventSourceDataClassTypes,
eventType: EventType
): SQSRecord | KinesisStreamRecord | DynamoDBRecord {
return DATA_CLASS_MAPPING[eventType](record);
}
}

export { BasePartialBatchProcessor };
Loading