Skip to content

Commit 02d3254

Browse files
Merge branch 'develop' into feat/parameter-env-variable
2 parents 77c41a2 + b57f521 commit 02d3254

36 files changed

+1240
-563
lines changed

CHANGELOG.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,39 @@
44
<a name="unreleased"></a>
55
# Unreleased
66

7+
## Bug Fixes
8+
9+
* **batch:** handle early validation errors for pydantic models (poison pill) [#2091](https://github.com/awslabs/aws-lambda-powertools-python/issues/2091) ([#2099](https://github.com/awslabs/aws-lambda-powertools-python/issues/2099))
10+
711
## Documentation
812

13+
* **batch:** use newly supported Json model ([#2100](https://github.com/awslabs/aws-lambda-powertools-python/issues/2100))
14+
* **homepage:** remove banner for end-of-support v1 ([#2098](https://github.com/awslabs/aws-lambda-powertools-python/issues/2098))
15+
* **idempotency:** fixes to testing your code section ([#2073](https://github.com/awslabs/aws-lambda-powertools-python/issues/2073))
916
* **idempotency:** new sequence diagrams, fix idempotency record vs DynamoDB TTL confusion ([#2074](https://github.com/awslabs/aws-lambda-powertools-python/issues/2074))
1017
* **parser:** fix highlighted line ([#2064](https://github.com/awslabs/aws-lambda-powertools-python/issues/2064))
1118

19+
## Features
20+
21+
* **batch:** reduce boilerplate with process_partial_response ([#2090](https://github.com/awslabs/aws-lambda-powertools-python/issues/2090))
22+
* **idempotency:** allow custom sdk clients in DynamoDBPersistenceLayer ([#2087](https://github.com/awslabs/aws-lambda-powertools-python/issues/2087))
23+
1224
## Maintenance
1325

26+
* **deps:** bump aws-xray-sdk from 2.11.0 to 2.12.0 ([#2080](https://github.com/awslabs/aws-lambda-powertools-python/issues/2080))
1427
* **deps:** bump peaceiris/actions-gh-pages from 3.9.2 to 3.9.3 ([#2069](https://github.com/awslabs/aws-lambda-powertools-python/issues/2069))
28+
* **deps-dev:** bump aws-cdk-lib from 2.72.1 to 2.73.0 ([#2097](https://github.com/awslabs/aws-lambda-powertools-python/issues/2097))
29+
* **deps-dev:** bump aws-cdk from 2.72.1 to 2.73.0 ([#2093](https://github.com/awslabs/aws-lambda-powertools-python/issues/2093))
30+
* **deps-dev:** bump mypy-boto3-cloudformation from 1.26.60 to 1.26.108 ([#2095](https://github.com/awslabs/aws-lambda-powertools-python/issues/2095))
31+
* **deps-dev:** bump types-python-dateutil from 2.8.19.11 to 2.8.19.12 ([#2085](https://github.com/awslabs/aws-lambda-powertools-python/issues/2085))
32+
* **deps-dev:** bump cfn-lint from 0.76.1 to 0.76.2 ([#2084](https://github.com/awslabs/aws-lambda-powertools-python/issues/2084))
33+
* **deps-dev:** bump aws-cdk from 2.72.0 to 2.72.1 ([#2081](https://github.com/awslabs/aws-lambda-powertools-python/issues/2081))
34+
* **deps-dev:** bump coverage from 7.2.2 to 7.2.3 ([#2092](https://github.com/awslabs/aws-lambda-powertools-python/issues/2092))
1535
* **deps-dev:** bump mkdocs-material from 9.1.4 to 9.1.5 ([#2077](https://github.com/awslabs/aws-lambda-powertools-python/issues/2077))
1636
* **deps-dev:** bump aws-cdk-lib from 2.72.0 to 2.72.1 ([#2076](https://github.com/awslabs/aws-lambda-powertools-python/issues/2076))
1737
* **deps-dev:** bump mypy-boto3-s3 from 1.26.99 to 1.26.104 ([#2075](https://github.com/awslabs/aws-lambda-powertools-python/issues/2075))
1838
* **deps-dev:** bump aws-cdk from 2.71.0 to 2.72.0 ([#2071](https://github.com/awslabs/aws-lambda-powertools-python/issues/2071))
39+
* **deps-dev:** bump filelock from 3.10.7 to 3.11.0 ([#2094](https://github.com/awslabs/aws-lambda-powertools-python/issues/2094))
1940
* **deps-dev:** bump aws-cdk-lib from 2.71.0 to 2.72.0 ([#2070](https://github.com/awslabs/aws-lambda-powertools-python/issues/2070))
2041
* **deps-dev:** bump black from 23.1.0 to 23.3.0 ([#2066](https://github.com/awslabs/aws-lambda-powertools-python/issues/2066))
2142
* **deps-dev:** bump aws-cdk from 2.70.0 to 2.71.0 ([#2067](https://github.com/awslabs/aws-lambda-powertools-python/issues/2067))

aws_lambda_powertools/utilities/batch/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
EventType,
1313
FailureResponse,
1414
SuccessResponse,
15+
)
16+
from aws_lambda_powertools.utilities.batch.decorators import (
1517
async_batch_processor,
18+
async_process_partial_response,
1619
batch_processor,
20+
process_partial_response,
1721
)
1822
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo
1923
from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import (
@@ -22,6 +26,10 @@
2226
from aws_lambda_powertools.utilities.batch.types import BatchTypeModels
2327

2428
__all__ = (
29+
"async_batch_processor",
30+
"async_process_partial_response",
31+
"batch_processor",
32+
"process_partial_response",
2533
"BatchProcessor",
2634
"AsyncBatchProcessor",
2735
"BasePartialProcessor",
@@ -32,6 +40,4 @@
3240
"FailureResponse",
3341
"SuccessResponse",
3442
"SqsFifoPartialProcessor",
35-
"batch_processor",
36-
"async_batch_processor",
3743
)

aws_lambda_powertools/utilities/batch/base.py

Lines changed: 39 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,8 @@
1111
import sys
1212
from abc import ABC, abstractmethod
1313
from enum import Enum
14-
from typing import (
15-
Any,
16-
Awaitable,
17-
Callable,
18-
Dict,
19-
List,
20-
Optional,
21-
Tuple,
22-
Union,
23-
overload,
24-
)
14+
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, overload
2515

26-
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
2716
from aws_lambda_powertools.shared import constants
2817
from aws_lambda_powertools.utilities.batch.exceptions import (
2918
BatchProcessingError,
@@ -37,6 +26,7 @@
3726
KinesisStreamRecord,
3827
)
3928
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
29+
from aws_lambda_powertools.utilities.parser import ValidationError
4030
from aws_lambda_powertools.utilities.typing import LambdaContext
4131

4232
logger = logging.getLogger(__name__)
@@ -316,21 +306,36 @@ def _get_messages_to_report(self) -> List[Dict[str, str]]:
316306
def _collect_sqs_failures(self):
317307
failures = []
318308
for msg in self.fail_messages:
319-
msg_id = msg.messageId if self.model else msg.message_id
309+
# If a message failed due to model validation (e.g., poison pill)
310+
# we convert to an event source data class...but self.model is still true
311+
# therefore, we do an additional check on whether the failed message is still a model
312+
# see https://github.com/awslabs/aws-lambda-powertools-python/issues/2091
313+
if self.model and getattr(msg, "parse_obj", None):
314+
msg_id = msg.messageId
315+
else:
316+
msg_id = msg.message_id
320317
failures.append({"itemIdentifier": msg_id})
321318
return failures
322319

323320
def _collect_kinesis_failures(self):
324321
failures = []
325322
for msg in self.fail_messages:
326-
msg_id = msg.kinesis.sequenceNumber if self.model else msg.kinesis.sequence_number
323+
# # see https://github.com/awslabs/aws-lambda-powertools-python/issues/2091
324+
if self.model and getattr(msg, "parse_obj", None):
325+
msg_id = msg.kinesis.sequenceNumber
326+
else:
327+
msg_id = msg.kinesis.sequence_number
327328
failures.append({"itemIdentifier": msg_id})
328329
return failures
329330

330331
def _collect_dynamodb_failures(self):
331332
failures = []
332333
for msg in self.fail_messages:
333-
msg_id = msg.dynamodb.SequenceNumber if self.model else msg.dynamodb.sequence_number
334+
# see https://github.com/awslabs/aws-lambda-powertools-python/issues/2091
335+
if self.model and getattr(msg, "parse_obj", None):
336+
msg_id = msg.dynamodb.SequenceNumber
337+
else:
338+
msg_id = msg.dynamodb.sequence_number
334339
failures.append({"itemIdentifier": msg_id})
335340
return failures
336341

@@ -347,6 +352,17 @@ def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["B
347352
return model.parse_obj(record)
348353
return self._DATA_CLASS_MAPPING[event_type](record)
349354

355+
def _register_model_validation_error_record(self, record: dict):
356+
"""Convert and register failure due to poison pills where model failed validation early"""
357+
# Parser will fail validation if record is a poison pill (malformed input)
358+
# this means we can't collect the message id if we try transforming again
359+
# so we convert into to the equivalent batch type model (e.g., SQS, Kinesis, DynamoDB Stream)
360+
# and downstream we can correctly collect the correct message id identifier and make the failed record available
361+
# see https://github.com/awslabs/aws-lambda-powertools-python/issues/2091
362+
logger.debug("Record cannot be converted to customer's model; converting without model")
363+
failed_record: "EventSourceDataClassTypes" = self._to_batch_type(record=record, event_type=self.event_type)
364+
return self.failure_handler(record=failed_record, exception=sys.exc_info())
365+
350366

351367
class BatchProcessor(BasePartialBatchProcessor): # Keep old name for compatibility
352368
"""Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB.
@@ -471,63 +487,21 @@ def _process_record(self, record: dict) -> Union[SuccessResponse, FailureRespons
471487
record: dict
472488
A batch record to be processed.
473489
"""
474-
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
490+
data: Optional["BatchTypeModels"] = None
475491
try:
492+
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
476493
if self._handler_accepts_lambda_context:
477494
result = self.handler(record=data, lambda_context=self.lambda_context)
478495
else:
479496
result = self.handler(record=data)
480497

481498
return self.success_handler(record=record, result=result)
499+
except ValidationError:
500+
return self._register_model_validation_error_record(record)
482501
except Exception:
483502
return self.failure_handler(record=data, exception=sys.exc_info())
484503

485504

486-
@lambda_handler_decorator
487-
def batch_processor(
488-
handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: BatchProcessor
489-
):
490-
"""
491-
Middleware to handle batch event processing
492-
493-
Parameters
494-
----------
495-
handler: Callable
496-
Lambda's handler
497-
event: Dict
498-
Lambda's Event
499-
context: LambdaContext
500-
Lambda's Context
501-
record_handler: Callable
502-
Callable or corutine to process each record from the batch
503-
processor: BatchProcessor
504-
Batch Processor to handle partial failure cases
505-
506-
Examples
507-
--------
508-
**Processes Lambda's event with a BasePartialProcessor**
509-
510-
>>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor
511-
>>>
512-
>>> def record_handler(record):
513-
>>> return record["body"]
514-
>>>
515-
>>> @batch_processor(record_handler=record_handler, processor=BatchProcessor())
516-
>>> def handler(event, context):
517-
>>> return {"StatusCode": 200}
518-
519-
Limitations
520-
-----------
521-
* Async batch processors. Use `async_batch_processor` instead.
522-
"""
523-
records = event["Records"]
524-
525-
with processor(records, record_handler, lambda_context=context):
526-
processor.process()
527-
528-
return handler(event, context)
529-
530-
531505
class AsyncBatchProcessor(BasePartialBatchProcessor):
532506
"""Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB asynchronously.
533507
@@ -651,62 +625,16 @@ async def _async_process_record(self, record: dict) -> Union[SuccessResponse, Fa
651625
record: dict
652626
A batch record to be processed.
653627
"""
654-
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
628+
data: Optional["BatchTypeModels"] = None
655629
try:
630+
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
656631
if self._handler_accepts_lambda_context:
657632
result = await self.handler(record=data, lambda_context=self.lambda_context)
658633
else:
659634
result = await self.handler(record=data)
660635

661636
return self.success_handler(record=record, result=result)
637+
except ValidationError:
638+
return self._register_model_validation_error_record(record)
662639
except Exception:
663640
return self.failure_handler(record=data, exception=sys.exc_info())
664-
665-
666-
@lambda_handler_decorator
667-
def async_batch_processor(
668-
handler: Callable,
669-
event: Dict,
670-
context: LambdaContext,
671-
record_handler: Callable[..., Awaitable[Any]],
672-
processor: AsyncBatchProcessor,
673-
):
674-
"""
675-
Middleware to handle batch event processing
676-
Parameters
677-
----------
678-
handler: Callable
679-
Lambda's handler
680-
event: Dict
681-
Lambda's Event
682-
context: LambdaContext
683-
Lambda's Context
684-
record_handler: Callable[..., Awaitable[Any]]
685-
Callable to process each record from the batch
686-
processor: AsyncBatchProcessor
687-
Batch Processor to handle partial failure cases
688-
Examples
689-
--------
690-
**Processes Lambda's event with a BasePartialProcessor**
691-
>>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor
692-
>>>
693-
>>> async def async_record_handler(record):
694-
>>> payload: str = record.body
695-
>>> return payload
696-
>>>
697-
>>> processor = AsyncBatchProcessor(event_type=EventType.SQS)
698-
>>>
699-
>>> @async_batch_processor(record_handler=async_record_handler, processor=processor)
700-
>>> async def lambda_handler(event, context: LambdaContext):
701-
>>> return processor.response()
702-
703-
Limitations
704-
-----------
705-
* Sync batch processors. Use `batch_processor` instead.
706-
"""
707-
records = event["Records"]
708-
709-
with processor(records, record_handler, lambda_context=context):
710-
processor.async_process()
711-
712-
return handler(event, context)

0 commit comments

Comments
 (0)