Skip to content

Commit 40b159a

Browse files
committed
* 'develop' of https://github.com/awslabs/aws-lambda-powertools-python: fix(batch): report multiple failures (aws-powertools#967) docs(tutorial): fix path to images (aws-powertools#963) Update documentation mistake (aws-powertools#961)
2 parents 2a1c703 + 70c35b1 commit 40b159a

File tree

4 files changed

+85
-44
lines changed

4 files changed

+85
-44
lines changed

aws_lambda_powertools/utilities/batch/base.py

+17-12
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ def _clean(self):
385385
)
386386

387387
messages = self._get_messages_to_report()
388-
self.batch_response = {"batchItemFailures": [messages]}
388+
self.batch_response = {"batchItemFailures": messages}
389389

390390
def _has_messages_to_report(self) -> bool:
391391
if self.fail_messages:
@@ -397,7 +397,7 @@ def _has_messages_to_report(self) -> bool:
397397
def _entire_batch_failed(self) -> bool:
398398
return len(self.exceptions) == len(self.records)
399399

400-
def _get_messages_to_report(self) -> Dict[str, str]:
400+
def _get_messages_to_report(self) -> List[Dict[str, str]]:
401401
"""
402402
Format messages to use in batch deletion
403403
"""
@@ -406,20 +406,25 @@ def _get_messages_to_report(self) -> Dict[str, str]:
406406
# Event Source Data Classes follow python idioms for fields
407407
# while Parser/Pydantic follows the event field names to the latter
408408
def _collect_sqs_failures(self):
409-
if self.model:
410-
return {"itemIdentifier": msg.messageId for msg in self.fail_messages}
411-
return {"itemIdentifier": msg.message_id for msg in self.fail_messages}
409+
failures = []
410+
for msg in self.fail_messages:
411+
msg_id = msg.messageId if self.model else msg.message_id
412+
failures.append({"itemIdentifier": msg_id})
413+
return failures
412414

413415
def _collect_kinesis_failures(self):
414-
if self.model:
415-
# Pydantic model uses int but Lambda poller expects str
416-
return {"itemIdentifier": msg.kinesis.sequenceNumber for msg in self.fail_messages}
417-
return {"itemIdentifier": msg.kinesis.sequence_number for msg in self.fail_messages}
416+
failures = []
417+
for msg in self.fail_messages:
418+
msg_id = msg.kinesis.sequenceNumber if self.model else msg.kinesis.sequence_number
419+
failures.append({"itemIdentifier": msg_id})
420+
return failures
418421

419422
def _collect_dynamodb_failures(self):
420-
if self.model:
421-
return {"itemIdentifier": msg.dynamodb.SequenceNumber for msg in self.fail_messages}
422-
return {"itemIdentifier": msg.dynamodb.sequence_number for msg in self.fail_messages}
423+
failures = []
424+
for msg in self.fail_messages:
425+
msg_id = msg.dynamodb.SequenceNumber if self.model else msg.dynamodb.sequence_number
426+
failures.append({"itemIdentifier": msg_id})
427+
return failures
423428

424429
@overload
425430
def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels":

docs/tutorial/index.md

+7-7
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ From here, we could [set specific keys](./core/logger.md#append_keys-method){tar
511511
By having structured logs like this, we can easily search and analyse them in [CloudWatch Logs Insight](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AnalyzingLogData.html){target="_blank"}.
512512

513513
=== "CloudWatch Logs Insight Example"
514-
![CloudWatch Logs Insight Example](./media/cloudwatch_logs_insight_example.png)
514+
![CloudWatch Logs Insight Example](../media/cloudwatch_logs_insight_example.png)
515515

516516
## Tracing
517517

@@ -625,7 +625,7 @@ We've made the following changes in `template.yaml` for this to work seamless:
625625

626626
You can now build and deploy our updates with `sam build && sam deploy`. Once deployed, try invoking the application via the API endpoint, and visit [AWS X-Ray Console](https://console.aws.amazon.com/xray/home#/traces/){target="_blank"} to see how much progress we've made so far!!
627627

628-
![AWS X-Ray Console trace view](./media/tracer_xray_sdk_showcase.png)
628+
![AWS X-Ray Console trace view](../media/tracer_xray_sdk_showcase.png)
629629

630630
### Enriching our generated traces
631631

@@ -709,11 +709,11 @@ Let's break it down:
709709

710710
Repeat the process of building, deploying, and invoking your application via the API endpoint. Within the [AWS X-Ray Console](https://console.aws.amazon.com/xray/home#/traces/){target="_blank"}, you should now be able to group traces by the `User` and `ColdStart` annotation.
711711

712-
![Filtering traces by annotations](./media/tracer_xray_sdk_enriched.png)
712+
![Filtering traces by annotations](../media/tracer_xray_sdk_enriched.png)
713713

714714
If you choose any of the traces available, try opening the `handler` subsegment and you should see the response of your Lambda function under the `Metadata` tab.
715715

716-
![Filtering traces by metadata](./media/tracer_xray_sdk_enriched_2.png)
716+
![Filtering traces by metadata](../media/tracer_xray_sdk_enriched_2.png)
717717

718718
### Simplifying with Tracer
719719

@@ -773,14 +773,14 @@ Lambda Powertools optimizes for Lambda compute environment. As such, we add thes
773773
Repeat the process of building, deploying, and invoking your application via the API endpoint. Within the [AWS X-Ray Console](https://console.aws.amazon.com/xray/home#/traces/){target="_blank"}, you should see a similar view:
774774

775775

776-
![AWS X-Ray Console trace view using Lambda Powertools Tracer](./media/tracer_utility_showcase_2.png)
776+
![AWS X-Ray Console trace view using Lambda Powertools Tracer](../media/tracer_utility_showcase_2.png)
777777

778778
???+ tip
779779
Consider using [Amazon CloudWatch ServiceLens view](https://console.aws.amazon.com/cloudwatch/home#servicelens:service-map/map){target="_blank"} as it aggregates AWS X-Ray traces and CloudWatch metrics and logs in one view.
780780

781781
From here, you can browse to specific logs in CloudWatch Logs Insight, Metrics Dashboard or AWS X-Ray traces.
782782

783-
![CloudWatch ServiceLens View](./media/tracer_utility_showcase_3.png)
783+
![CloudWatch ServiceLens View](../media/tracer_utility_showcase_3.png)
784784

785785
???+ info
786786
For more information on Amazon CloudWatch ServiceLens, please visit [link](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/ServiceLens.html).
@@ -990,7 +990,7 @@ That's a lot less boilerplate code! Let's break this down:
990990

991991
Repeat the process of building, deploying, and invoking your application via the API endpoint a few times to generate metrics - [Artillery](https://www.artillery.io/){target="_blank"} and [K6.io](https://k6.io/open-source){target="_blank"} are quick ways to generate some load. Within [CloudWatch Metrics view](https://console.aws.amazon.com/cloudwatch/home#metricsV2:graph=~()){target="_blank}, you should see `MyApp` custom namespace with your custom metrics there and `SuccessfulGreetings` available to graph.
992992

993-
![Custom Metrics Example](./media/metrics_utility_showcase.png)
993+
![Custom Metrics Example](../media/metrics_utility_showcase.png)
994994

995995
If you're curious about how the EMF portion of your function logs look like, you can quickly go to [CloudWatch ServiceLens view](https://console.aws.amazon.com/cloudwatch/home#servicelens:service-map/map){target="_blank"}, choose your function and open logs. You will see a similar entry that looks like this:
996996

docs/utilities/batch.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,7 @@ class MyPartialProcessor(BasePartialProcessor):
921921
def _clean(self):
922922
# It's called once, *after* closing processing all records (closing the context manager)
923923
# Here we're sending, at once, all successful messages to a ddb table
924-
with ddb_table.batch_writer() as batch:
924+
with self.ddb_table.batch_writer() as batch:
925925
for result in self.success_messages:
926926
batch.put_item(Item=result)
927927

tests/functional/test_utilities_batch.py

+60-24
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,8 @@ def test_batch_processor_middleware_with_failure(sqs_event_factory, record_handl
414414
# GIVEN
415415
first_record = SQSRecord(sqs_event_factory("fail"))
416416
second_record = SQSRecord(sqs_event_factory("success"))
417-
event = {"Records": [first_record.raw_event, second_record.raw_event]}
417+
third_record = SQSRecord(sqs_event_factory("fail"))
418+
event = {"Records": [first_record.raw_event, second_record.raw_event, third_record.raw_event]}
418419

419420
processor = BatchProcessor(event_type=EventType.SQS)
420421

@@ -426,7 +427,7 @@ def lambda_handler(event, context):
426427
result = lambda_handler(event, {})
427428

428429
# THEN
429-
assert len(result["batchItemFailures"]) == 1
430+
assert len(result["batchItemFailures"]) == 2
430431

431432

432433
def test_batch_processor_context_success_only(sqs_event_factory, record_handler):
@@ -453,7 +454,8 @@ def test_batch_processor_context_with_failure(sqs_event_factory, record_handler)
453454
# GIVEN
454455
first_record = SQSRecord(sqs_event_factory("failure"))
455456
second_record = SQSRecord(sqs_event_factory("success"))
456-
records = [first_record.raw_event, second_record.raw_event]
457+
third_record = SQSRecord(sqs_event_factory("fail"))
458+
records = [first_record.raw_event, second_record.raw_event, third_record.raw_event]
457459
processor = BatchProcessor(event_type=EventType.SQS)
458460

459461
# WHEN
@@ -462,8 +464,10 @@ def test_batch_processor_context_with_failure(sqs_event_factory, record_handler)
462464

463465
# THEN
464466
assert processed_messages[1] == ("success", second_record.body, second_record.raw_event)
465-
assert len(batch.fail_messages) == 1
466-
assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record.message_id}]}
467+
assert len(batch.fail_messages) == 2
468+
assert batch.response() == {
469+
"batchItemFailures": [{"itemIdentifier": first_record.message_id}, {"itemIdentifier": third_record.message_id}]
470+
}
467471

468472

469473
def test_batch_processor_kinesis_context_success_only(kinesis_event_factory, kinesis_record_handler):
@@ -491,8 +495,9 @@ def test_batch_processor_kinesis_context_with_failure(kinesis_event_factory, kin
491495
# GIVEN
492496
first_record = KinesisStreamRecord(kinesis_event_factory("failure"))
493497
second_record = KinesisStreamRecord(kinesis_event_factory("success"))
498+
third_record = KinesisStreamRecord(kinesis_event_factory("failure"))
494499

495-
records = [first_record.raw_event, second_record.raw_event]
500+
records = [first_record.raw_event, second_record.raw_event, third_record.raw_event]
496501
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
497502

498503
# WHEN
@@ -501,15 +506,21 @@ def test_batch_processor_kinesis_context_with_failure(kinesis_event_factory, kin
501506

502507
# THEN
503508
assert processed_messages[1] == ("success", b64_to_str(second_record.kinesis.data), second_record.raw_event)
504-
assert len(batch.fail_messages) == 1
505-
assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record.kinesis.sequence_number}]}
509+
assert len(batch.fail_messages) == 2
510+
assert batch.response() == {
511+
"batchItemFailures": [
512+
{"itemIdentifier": first_record.kinesis.sequence_number},
513+
{"itemIdentifier": third_record.kinesis.sequence_number},
514+
]
515+
}
506516

507517

508518
def test_batch_processor_kinesis_middleware_with_failure(kinesis_event_factory, kinesis_record_handler):
509519
# GIVEN
510520
first_record = KinesisStreamRecord(kinesis_event_factory("failure"))
511521
second_record = KinesisStreamRecord(kinesis_event_factory("success"))
512-
event = {"Records": [first_record.raw_event, second_record.raw_event]}
522+
third_record = KinesisStreamRecord(kinesis_event_factory("failure"))
523+
event = {"Records": [first_record.raw_event, second_record.raw_event, third_record.raw_event]}
513524

514525
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
515526

@@ -521,7 +532,7 @@ def lambda_handler(event, context):
521532
result = lambda_handler(event, {})
522533

523534
# THEN
524-
assert len(result["batchItemFailures"]) == 1
535+
assert len(result["batchItemFailures"]) == 2
525536

526537

527538
def test_batch_processor_dynamodb_context_success_only(dynamodb_event_factory, dynamodb_record_handler):
@@ -548,7 +559,8 @@ def test_batch_processor_dynamodb_context_with_failure(dynamodb_event_factory, d
548559
# GIVEN
549560
first_record = dynamodb_event_factory("failure")
550561
second_record = dynamodb_event_factory("success")
551-
records = [first_record, second_record]
562+
third_record = dynamodb_event_factory("failure")
563+
records = [first_record, second_record, third_record]
552564
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
553565

554566
# WHEN
@@ -557,15 +569,21 @@ def test_batch_processor_dynamodb_context_with_failure(dynamodb_event_factory, d
557569

558570
# THEN
559571
assert processed_messages[1] == ("success", second_record["dynamodb"]["NewImage"]["Message"]["S"], second_record)
560-
assert len(batch.fail_messages) == 1
561-
assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["dynamodb"]["SequenceNumber"]}]}
572+
assert len(batch.fail_messages) == 2
573+
assert batch.response() == {
574+
"batchItemFailures": [
575+
{"itemIdentifier": first_record["dynamodb"]["SequenceNumber"]},
576+
{"itemIdentifier": third_record["dynamodb"]["SequenceNumber"]},
577+
]
578+
}
562579

563580

564581
def test_batch_processor_dynamodb_middleware_with_failure(dynamodb_event_factory, dynamodb_record_handler):
565582
# GIVEN
566583
first_record = dynamodb_event_factory("failure")
567584
second_record = dynamodb_event_factory("success")
568-
event = {"Records": [first_record, second_record]}
585+
third_record = dynamodb_event_factory("failure")
586+
event = {"Records": [first_record, second_record, third_record]}
569587

570588
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
571589

@@ -577,7 +595,7 @@ def lambda_handler(event, context):
577595
result = lambda_handler(event, {})
578596

579597
# THEN
580-
assert len(result["batchItemFailures"]) == 1
598+
assert len(result["batchItemFailures"]) == 2
581599

582600

583601
def test_batch_processor_context_model(sqs_event_factory, order_event_factory):
@@ -639,17 +657,23 @@ def record_handler(record: OrderSqs):
639657
order_event = order_event_factory({"type": "success"})
640658
order_event_fail = order_event_factory({"type": "fail"})
641659
first_record = sqs_event_factory(order_event_fail)
660+
third_record = sqs_event_factory(order_event_fail)
642661
second_record = sqs_event_factory(order_event)
643-
records = [first_record, second_record]
662+
records = [first_record, second_record, third_record]
644663

645664
# WHEN
646665
processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqs)
647666
with processor(records, record_handler) as batch:
648667
batch.process()
649668

650669
# THEN
651-
assert len(batch.fail_messages) == 1
652-
assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["messageId"]}]}
670+
assert len(batch.fail_messages) == 2
671+
assert batch.response() == {
672+
"batchItemFailures": [
673+
{"itemIdentifier": first_record["messageId"]},
674+
{"itemIdentifier": third_record["messageId"]},
675+
]
676+
}
653677

654678

655679
def test_batch_processor_dynamodb_context_model(dynamodb_event_factory, order_event_factory):
@@ -726,16 +750,22 @@ def record_handler(record: OrderDynamoDBRecord):
726750
order_event_fail = order_event_factory({"type": "fail"})
727751
first_record = dynamodb_event_factory(order_event_fail)
728752
second_record = dynamodb_event_factory(order_event)
729-
records = [first_record, second_record]
753+
third_record = dynamodb_event_factory(order_event_fail)
754+
records = [first_record, second_record, third_record]
730755

731756
# WHEN
732757
processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord)
733758
with processor(records, record_handler) as batch:
734759
batch.process()
735760

736761
# THEN
737-
assert len(batch.fail_messages) == 1
738-
assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["dynamodb"]["SequenceNumber"]}]}
762+
assert len(batch.fail_messages) == 2
763+
assert batch.response() == {
764+
"batchItemFailures": [
765+
{"itemIdentifier": first_record["dynamodb"]["SequenceNumber"]},
766+
{"itemIdentifier": third_record["dynamodb"]["SequenceNumber"]},
767+
]
768+
}
739769

740770

741771
def test_batch_processor_kinesis_context_parser_model(kinesis_event_factory, order_event_factory):
@@ -807,16 +837,22 @@ def record_handler(record: OrderKinesisRecord):
807837

808838
first_record = kinesis_event_factory(order_event_fail)
809839
second_record = kinesis_event_factory(order_event)
810-
records = [first_record, second_record]
840+
third_record = kinesis_event_factory(order_event_fail)
841+
records = [first_record, second_record, third_record]
811842

812843
# WHEN
813844
processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord)
814845
with processor(records, record_handler) as batch:
815846
batch.process()
816847

817848
# THEN
818-
assert len(batch.fail_messages) == 1
819-
assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["kinesis"]["sequenceNumber"]}]}
849+
assert len(batch.fail_messages) == 2
850+
assert batch.response() == {
851+
"batchItemFailures": [
852+
{"itemIdentifier": first_record["kinesis"]["sequenceNumber"]},
853+
{"itemIdentifier": third_record["kinesis"]["sequenceNumber"]},
854+
]
855+
}
820856

821857

822858
def test_batch_processor_error_when_entire_batch_fails(sqs_event_factory, record_handler):

0 commit comments

Comments
 (0)