Skip to content

feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis #886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b1ca7f1
feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis
heitorlessa Dec 10, 2021
113a44f
refactor: response over report
heitorlessa Dec 11, 2021
5ab2ec7
fix: mutability bug
heitorlessa Dec 11, 2021
4652237
feat(batch): add Kinesis Data streams support
heitorlessa Dec 12, 2021
4cfcd34
fix: item identifier key should be constant
heitorlessa Dec 12, 2021
139f52b
feat(batch): add DynamoDB Streams support
heitorlessa Dec 12, 2021
1822456
feat(batch): use event source data classes by default
heitorlessa Dec 12, 2021
c757316
chore: permanent exceptions TBD in separate PR
heitorlessa Dec 12, 2021
3217097
feat: mypy support
heitorlessa Dec 12, 2021
09257ba
feat: draft implementation
heitorlessa Dec 12, 2021
251541c
fix: mypy typing
heitorlessa Dec 13, 2021
4c95d39
chore: improve mypy support on success/failure
heitorlessa Dec 13, 2021
7756d2c
fix: failure handler record types
heitorlessa Dec 13, 2021
1b242eb
fix: copy data not pointer if one subclasses it
heitorlessa Dec 17, 2021
4a7ceea
chore: linting
heitorlessa Dec 17, 2021
53b8e75
chore: address Tom's feedback on type name
heitorlessa Dec 17, 2021
b0f170e
chore: test model support
heitorlessa Dec 17, 2021
01eb5a7
Merge branch 'develop' of https://github.com/awslabs/aws-lambda-power…
heitorlessa Dec 17, 2021
77a7ab5
chore: remove leftovers
heitorlessa Dec 17, 2021
11ab825
docs: new BatchProcessor for SQS, Kinesis, DynamoDB
heitorlessa Dec 19, 2021
0d5d24e
fix: ensure BatchProcessorError is raised when entire batch fails
heitorlessa Dec 19, 2021
e1dc4cf
fix: exception leftover
heitorlessa Dec 19, 2021
cf3b01a
chore: cleanup exceptions
heitorlessa Dec 19, 2021
3fc3e40
docs: update mechanics section
heitorlessa Dec 19, 2021
9ceb74b
docs: update IAM permission
heitorlessa Dec 19, 2021
be8ab03
docs: keep old section name for stats
heitorlessa Dec 19, 2021
27f2937
docs: update accessing processed messages section
heitorlessa Dec 19, 2021
1496b6f
docs: update sentry section
heitorlessa Dec 19, 2021
9057791
docs: add extension, update create own processor
heitorlessa Dec 19, 2021
580eeae
docs: add pydantic section
heitorlessa Dec 19, 2021
f380f57
docs: add migration guide
heitorlessa Dec 19, 2021
58d78ca
docs: add caveat section
heitorlessa Dec 19, 2021
95715d0
docs: add testing section
heitorlessa Dec 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ repos:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-toml
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.5.1
hooks:
- id: python-use-type-annotations
- repo: local
hooks:
- id: black
Expand Down
13 changes: 10 additions & 3 deletions aws_lambda_powertools/utilities/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
Batch processing utility
"""

from .base import BasePartialProcessor, batch_processor
from .sqs import PartialSQSProcessor, sqs_batch_processor
from aws_lambda_powertools.utilities.batch.base import BasePartialProcessor, BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.batch.sqs import PartialSQSProcessor, sqs_batch_processor

__all__ = ("BasePartialProcessor", "PartialSQSProcessor", "batch_processor", "sqs_batch_processor")
__all__ = (
"BatchProcessor",
"BasePartialProcessor",
"EventType",
"PartialSQSProcessor",
"batch_processor",
"sqs_batch_processor",
)
174 changes: 165 additions & 9 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,63 @@
"""
Batch processing utilities
"""

import copy
import logging
import sys
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Tuple
from enum import Enum
from types import TracebackType
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, overload

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

logger = logging.getLogger(__name__)


class EventType(Enum):
SQS = "SQS"
KinesisDataStreams = "KinesisDataStreams"
DynamoDBStreams = "DynamoDBStreams"


#
# type specifics
#
has_pydantic = "pydantic" in sys.modules
_ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
_OptExcInfo = Union[_ExcInfo, Tuple[None, None, None]]

# For IntelliSense and Mypy to work, we need to account for possible SQS, Kinesis and DynamoDB subclasses
# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation
if has_pydantic:
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel
from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord as KinesisDataStreamRecordModel
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel

BatchTypeModels = Optional[
Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]]
]

# When using processor with default arguments, records will carry EventSourceDataClassTypes
# and depending on what EventType it's passed it'll correctly map to the right record
# When using Pydantic Models, it'll accept any
EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord]
BatchEventTypes = Union[EventSourceDataClassTypes, "BatchTypeModels"]
SuccessResponse = Tuple[str, Any, BatchEventTypes]
FailureResponse = Tuple[str, str, BatchEventTypes]


class BasePartialProcessor(ABC):
"""
Abstract class for batch processors.
"""

def __init__(self):
self.success_messages: List = []
self.fail_messages: List = []
self.success_messages: List[BatchEventTypes] = []
self.fail_messages: List[BatchEventTypes] = []
self.exceptions: List = []

@abstractmethod
Expand All @@ -38,7 +77,7 @@ def _clean(self):
raise NotImplementedError()

@abstractmethod
def _process_record(self, record: Any):
def _process_record(self, record: dict):
"""
Process record with handler.
"""
Expand All @@ -57,13 +96,13 @@ def __enter__(self):
def __exit__(self, exception_type, exception_value, traceback):
self._clean()

def __call__(self, records: List[Any], handler: Callable):
def __call__(self, records: List[dict], handler: Callable):
"""
Set instance attributes before execution

Parameters
----------
records: List[Any]
records: List[dict]
List with objects to be processed.
handler: Callable
Callable to process "records" entries.
Expand All @@ -72,7 +111,7 @@ def __call__(self, records: List[Any], handler: Callable):
self.handler = handler
return self

def success_handler(self, record: Any, result: Any):
def success_handler(self, record, result: Any) -> SuccessResponse:
"""
Success callback

Expand All @@ -85,7 +124,7 @@ def success_handler(self, record: Any, result: Any):
self.success_messages.append(record)
return entry

def failure_handler(self, record: Any, exception: Tuple):
def failure_handler(self, record, exception: _OptExcInfo) -> FailureResponse:
"""
Failure callback

Expand Down Expand Up @@ -146,3 +185,120 @@ def batch_processor(
processor.process()

return handler(event, context)


class BatchProcessor(BasePartialProcessor):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to have separate processors for each event type (SQS, DynamoDB or Kinesis) instead of growing the complexity of this class? Then you could encapsulate the failure collection in the specific processor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the initial version we wanted to implemented - a KinesisDataStreamProcessor, DynamoDB... then @cakepietoast argued that this was gonna confuse customers with other available processors (Sqs, PartialProcessor, BaseProcessor), as we can only deprecate them in v2.

I'm 50/50 here if I'm honest. I prefer a separate one but I also can see customers easily confused of which one to pick despite docs change I'm gonna make.

Implementation wise, this will remain stable. The only two changes I can anticipate is 1/ supporting the new Permanent Exception parameter, and 2/ raising a descriptive exception in case we reach an AttributeError when collecting message id/sequence number from a malformed event/model.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we changed the nomenclature to be “producer” and “consumer” (these processors would be consumers). I had that other idea earlier to make it easier to use the SQS and DynamoDB batch write methods taking into account their batch sizes, those could be “producers” 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where would the producer sit and what would be its responsibilities?

For that suggestion on partitioning, we should add it to the Event Source Data Class as it's a no brainier.

I think the word Consumer wouldn't be explicit enough on the capabilities Batch provide - maybe something else then?

Features

  • Transform incoming records into Event Source Data Class or Pydantic Models
  • Call an user defined function for each record in the batch
  • Keep track of exceptions raised for any record
  • For partial successes, extract message IDs (SQS) or sequence numbers (DynamoDB & Kinesis) and build a response to the new BatchItemIdentifiers contract
  • Future: If a raised exception matches the permanent_exceptions set, send these records to the configured DLQ in batches of 10

DEFAULT_RESPONSE: Dict[str, List[Optional[dict]]] = {"batchItemFailures": []}

def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None):
"""Process batch and partially report failed items

Parameters
----------
event_type: EventType
Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
model: Optional["BatchTypeModels"]
Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord
"""
self.event_type = event_type
self.model = model
self.batch_response = copy.deepcopy(self.DEFAULT_RESPONSE)
self._COLLECTOR_MAPPING = {
EventType.SQS: self._collect_sqs_failures,
EventType.KinesisDataStreams: self._collect_kinesis_failures,
EventType.DynamoDBStreams: self._collect_dynamodb_failures,
}
self._DATA_CLASS_MAPPING = {
EventType.SQS: SQSRecord,
EventType.KinesisDataStreams: KinesisStreamRecord,
EventType.DynamoDBStreams: DynamoDBRecord,
}

super().__init__()

def response(self):
"""Batch items that failed processing, if any"""
return self.batch_response

def _prepare(self):
"""
Remove results from previous execution.
"""
self.success_messages.clear()
self.fail_messages.clear()
self.batch_response = self.DEFAULT_RESPONSE

def _process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
"""
Process a record with instance's handler

Parameters
----------
record: dict
A batch record to be processed.
"""
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
try:
result = self.handler(record=data)
return self.success_handler(record=record, result=result)
except Exception:
return self.failure_handler(record=data, exception=sys.exc_info())

def _clean(self):
"""
Report messages to be deleted in case of partial failure.
"""

if not self._has_messages_to_report():
return

messages = self._get_messages_to_report()
self.batch_response = {"batchItemFailures": [messages]}

def _has_messages_to_report(self) -> bool:
if self.fail_messages:
return True

logger.debug(f"All {len(self.success_messages)} records successfully processed")
return False

def _get_messages_to_report(self) -> Dict[str, str]:
"""
Format messages to use in batch deletion
"""
return self._COLLECTOR_MAPPING[self.event_type]()

# Event Source Data Classes follow python idioms for fields
# while Parser/Pydantic follows the event field names to the latter
def _collect_sqs_failures(self):
if self.model:
return {"itemIdentifier": msg.messageId for msg in self.fail_messages}
else:
return {"itemIdentifier": msg.message_id for msg in self.fail_messages}

def _collect_kinesis_failures(self):
if self.model:
# Pydantic model uses int but Lambda poller expects str
return {"itemIdentifier": msg.kinesis.sequenceNumber for msg in self.fail_messages}
else:
return {"itemIdentifier": msg.kinesis.sequence_number for msg in self.fail_messages}

def _collect_dynamodb_failures(self):
if self.model:
return {"itemIdentifier": msg.dynamodb.SequenceNumber for msg in self.fail_messages}
else:
return {"itemIdentifier": msg.dynamodb.sequence_number for msg in self.fail_messages}

@overload
def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels":
...

@overload
def _to_batch_type(self, record: dict, event_type: EventType) -> EventSourceDataClassTypes:
...

def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["BatchTypeModels"] = None):
if model is not None:
return model.parse_obj(record)
else:
return self._DATA_CLASS_MAPPING[event_type](record)
Loading