-
Notifications
You must be signed in to change notification settings - Fork 154
feat(batch): Implementation of base batch processing classes #1588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
31b2306
a3dddbf
90d4ce0
d87b895
fdcb59f
8bb91d5
8767d6a
a90e2b6
fc32c81
d417d71
863edc8
ea50345
eaee175
500fbe0
002ef9e
d9be091
604f04d
9f1696b
3b5ccc1
e34d1d9
0fc4f1e
5266d55
16719c3
3f03c5c
3251b9c
7cfac60
840208f
3de49db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
module.exports = { | ||
displayName: { | ||
name: 'Powertools for AWS Lambda (TypeScript) utility: BATCH', | ||
color: 'orange', | ||
}, | ||
runner: 'groups', | ||
preset: 'ts-jest', | ||
transform: { | ||
'^.+\\.ts?$': 'ts-jest', | ||
}, | ||
moduleFileExtensions: ['js', 'ts'], | ||
collectCoverageFrom: ['**/src/**/*.ts', '!**/node_modules/**'], | ||
testMatch: ['**/?(*.)+(spec|test).ts'], | ||
roots: ['<rootDir>/src', '<rootDir>/tests'], | ||
testPathIgnorePatterns: ['/node_modules/'], | ||
testEnvironment: 'node', | ||
coveragePathIgnorePatterns: ['/node_modules/', '/types/'], | ||
coverageThreshold: { | ||
global: { | ||
statements: 100, | ||
branches: 100, | ||
functions: 100, | ||
lines: 100, | ||
}, | ||
}, | ||
coverageReporters: ['json-summary', 'text', 'lcov'], | ||
setupFiles: ['<rootDir>/tests/helpers/populateEnvironmentVariables.ts'], | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
{ | ||
"name": "@aws-lambda-powertools/batch", | ||
"version": "1.10.0", | ||
"description": "The batch processing package for the Powertools for AWS Lambda (TypeScript) library.", | ||
"author": { | ||
"name": "Amazon Web Services", | ||
"url": "https://aws.amazon.com" | ||
}, | ||
"private": true, | ||
"scripts": { | ||
"test": "npm run test:unit", | ||
"test:unit": "jest --group=unit --detectOpenHandles --coverage --verbose", | ||
"test:e2e:nodejs14x": "echo 'Not Implemented'", | ||
"test:e2e:nodejs16x": "echo 'Not Implemented'", | ||
"test:e2e:nodejs18x": "echo 'Not Implemented'", | ||
"test:e2e": "echo 'Not Implemented'", | ||
"watch": "jest --watch", | ||
"build": "tsc", | ||
"lint": "eslint --ext .ts,.js --no-error-on-unmatched-pattern .", | ||
"lint-fix": "eslint --fix --ext .ts,.js --no-error-on-unmatched-pattern .", | ||
"prebuild": "rimraf ./lib", | ||
"prepack": "node ../../.github/scripts/release_patch_package_json.js ." | ||
}, | ||
"lint-staged": { | ||
"*.ts": "npm run lint-fix", | ||
"*.js": "npm run lint-fix" | ||
}, | ||
"homepage": "https://github.com/aws-powertools/powertools-lambda-typescript/tree/main/packages/batch#readme", | ||
"license": "MIT-0", | ||
"main": "./lib/index.js", | ||
"types": "./lib/index.d.ts", | ||
"files": [ | ||
"lib" | ||
], | ||
"repository": { | ||
"type": "git", | ||
"url": "git+https://github.com/aws-powertools/powertools-lambda-typescript.git" | ||
}, | ||
"bugs": { | ||
"url": "https://github.com/aws-powertools/powertools-lambda-typescript/issues" | ||
}, | ||
"dependencies": {}, | ||
"keywords": [ | ||
"aws", | ||
"lambda", | ||
"powertools", | ||
"batch", | ||
"batch-processing", | ||
"serverless", | ||
"nodejs" | ||
], | ||
"devDependencies": {} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,161 @@ | ||||||
/** | ||||||
* Process batch and partially report failed items | ||||||
*/ | ||||||
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; | ||||||
import { | ||||||
BasePartialProcessor, | ||||||
BatchProcessingError, | ||||||
DATA_CLASS_MAPPING, | ||||||
DEFAULT_RESPONSE, | ||||||
EventSourceDataClassTypes, | ||||||
EventType, | ||||||
ItemIdentifier, | ||||||
BatchResponse, | ||||||
} from '.'; | ||||||
|
||||||
abstract class BasePartialBatchProcessor extends BasePartialProcessor { | ||||||
public COLLECTOR_MAPPING; | ||||||
|
||||||
public batchResponse: BatchResponse; | ||||||
|
||||||
public eventType: EventType; | ||||||
|
||||||
/** | ||||||
* Initializes base batch processing class | ||||||
* @param eventType Whether this is SQS, DynamoDB stream, or Kinesis data stream event | ||||||
*/ | ||||||
public constructor(eventType: EventType) { | ||||||
super(); | ||||||
this.eventType = eventType; | ||||||
this.batchResponse = DEFAULT_RESPONSE; | ||||||
this.COLLECTOR_MAPPING = { | ||||||
[EventType.SQS]: () => this.collectSqsFailures(), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we just set the function reference, rather than wrapping it in a function?
this way when someone calls |
||||||
[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(), | ||||||
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(), | ||||||
}; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Report messages to be deleted in case of partial failures | ||||||
*/ | ||||||
public clean(): void { | ||||||
if (!this.hasMessagesToReport()) { | ||||||
return; | ||||||
} | ||||||
|
||||||
if (this.entireBatchFailed()) { | ||||||
throw new BatchProcessingError( | ||||||
'All records failed processing. ' + | ||||||
this.exceptions.length + | ||||||
' individual errors logged separately below.', | ||||||
this.exceptions | ||||||
); | ||||||
} | ||||||
|
||||||
const messages: ItemIdentifier[] = this.getMessagesToReport(); | ||||||
this.batchResponse = { batchItemFailures: messages }; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Collects identifiers of failed items for a DynamoDB stream | ||||||
* @returns list of identifiers for failed items | ||||||
*/ | ||||||
public collectDynamoDBFailures(): ItemIdentifier[] { | ||||||
const failures: ItemIdentifier[] = []; | ||||||
|
||||||
for (const msg of this.failureMessages) { | ||||||
const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber; | ||||||
if (msgId) { | ||||||
failures.push({ itemIdentifier: msgId }); | ||||||
} | ||||||
} | ||||||
|
||||||
return failures; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Collects identifiers of failed items for a Kinesis stream | ||||||
* @returns list of identifiers for failed items | ||||||
*/ | ||||||
public collectKinesisFailures(): ItemIdentifier[] { | ||||||
const failures: ItemIdentifier[] = []; | ||||||
|
||||||
for (const msg of this.failureMessages) { | ||||||
const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber; | ||||||
failures.push({ itemIdentifier: msgId }); | ||||||
} | ||||||
|
||||||
return failures; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Collects identifiers of failed items for an SQS batch | ||||||
* @returns list of identifiers for failed items | ||||||
*/ | ||||||
public collectSqsFailures(): ItemIdentifier[] { | ||||||
const failures: ItemIdentifier[] = []; | ||||||
|
||||||
for (const msg of this.failureMessages) { | ||||||
const msgId = (msg as SQSRecord).messageId; | ||||||
failures.push({ itemIdentifier: msgId }); | ||||||
} | ||||||
|
||||||
return failures; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Determines whether all records in a batch failed to process | ||||||
* @returns true if all records resulted in exception results | ||||||
*/ | ||||||
public entireBatchFailed(): boolean { | ||||||
return this.exceptions.length == this.records.length; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
/** | ||||||
* Collects identifiers for failed batch items | ||||||
* @returns formatted messages to use in batch deletion | ||||||
*/ | ||||||
public getMessagesToReport(): ItemIdentifier[] { | ||||||
return this.COLLECTOR_MAPPING[this.eventType](); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Determines if any records failed to process | ||||||
* @returns true if any records resulted in exception | ||||||
*/ | ||||||
public hasMessagesToReport(): boolean { | ||||||
if (this.failureMessages.length != 0) { | ||||||
return true; | ||||||
} | ||||||
|
||||||
// console.debug('All ' + this.successMessages.length + ' records successfully processed'); | ||||||
|
||||||
return false; | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. return this.failureMessages.length > 0; |
||||||
|
||||||
/** | ||||||
* Remove results from previous execution | ||||||
*/ | ||||||
public prepare(): void { | ||||||
this.successMessages.length = 0; | ||||||
this.failureMessages.length = 0; | ||||||
this.exceptions.length = 0; | ||||||
this.batchResponse = DEFAULT_RESPONSE; | ||||||
} | ||||||
|
||||||
/** | ||||||
* @returns Batch items that failed processing, if any | ||||||
*/ | ||||||
public response(): BatchResponse { | ||||||
return this.batchResponse; | ||||||
} | ||||||
|
||||||
public toBatchType( | ||||||
record: EventSourceDataClassTypes, | ||||||
eventType: EventType | ||||||
): SQSRecord | KinesisStreamRecord | DynamoDBRecord { | ||||||
return DATA_CLASS_MAPPING[eventType](record); | ||||||
} | ||||||
} | ||||||
|
||||||
export { BasePartialBatchProcessor }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this lowercase & without snake_case?