Skip to content

Commit 64c7093

Browse files
author
Tom McCarthy
committed
fix: throw exception by default if messages processing fails
1 parent 8517dd1 commit 64c7093

File tree

6 files changed

+136
-27
lines changed

6 files changed

+136
-27
lines changed

aws_lambda_powertools/utilities/batch/base.py

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class BasePartialProcessor(ABC):
1616
def __init__(self):
1717
self.success_messages: List = []
1818
self.fail_messages: List = []
19+
self.exceptions: List = []
1920

2021
@abstractmethod
2122
def _prepare(self):
@@ -89,5 +90,6 @@ def failure_handler(self, record: Any, exception: Exception):
8990
"fail", exceptions args, original record
9091
"""
9192
entry = ("fail", exception.args, record)
93+
self.exceptions.append(exception)
9294
self.fail_messages.append(record)
9395
return entry
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""
2+
Batch processing exceptions
3+
"""
4+
5+
6+
class SQSBatchProcessingError(Exception):
7+
"""When at least one message within a batch could not be processed"""

aws_lambda_powertools/utilities/batch/middlewares.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,15 @@ def batch_processor(
6060

6161
@lambda_handler_decorator
6262
def sqs_batch_processor(
63-
handler: Callable, event: Dict, context: Dict, record_handler: Callable, config: Optional[Config] = None
63+
handler: Callable,
64+
event: Dict,
65+
context: Dict,
66+
record_handler: Callable,
67+
config: Optional[Config] = None,
68+
suppress_exception: bool = False,
6469
):
6570
"""
66-
Middleware to handle batch event processing
71+
Middleware to handle SQS batch event processing
6772
6873
Parameters
6974
----------
@@ -77,6 +82,8 @@ def sqs_batch_processor(
7782
Callable to process each record from the batch
7883
config: Config
7984
botocore config object
85+
suppress_exception: bool, optional
86+
Supress exception raised if any messages fail processing, by default False
8087
8188
Examples
8289
--------
@@ -96,7 +103,7 @@ def sqs_batch_processor(
96103
97104
"""
98105
config = config or Config()
99-
processor = PartialSQSProcessor(config=config)
106+
processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception)
100107

101108
records = event["Records"]
102109

aws_lambda_powertools/utilities/batch/sqs.py

+18-6
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,24 @@
99
from botocore.config import Config
1010

1111
from .base import BasePartialProcessor
12+
from .exceptions import SQSBatchProcessingError
1213

1314

1415
class PartialSQSProcessor(BasePartialProcessor):
1516
"""
1617
Amazon SQS batch processor to delete successes from the Queue.
1718
18-
Only the **special** case of partial failure is handled, thus a batch in
19-
which all records failed is **not** going to be removed from the queue, and
20-
the same is valid for a full success.
19+
The whole batch will be processed, even if failures occur. After all records are processed,
20+
SQSBatchProcessingError will be raised if there were any failures, causing messages to
21+
be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception.
2122
2223
Parameters
2324
----------
2425
config: Config
2526
botocore config object
27+
suppress_exception: bool, optional
28+
Supress exception raised if any messages fail processing, by default False
29+
2630
2731
Example
2832
-------
@@ -46,12 +50,13 @@ class PartialSQSProcessor(BasePartialProcessor):
4650
>>> return result
4751
"""
4852

49-
def __init__(self, config: Optional[Config] = None):
53+
def __init__(self, config: Optional[Config] = None, suppress_exception: bool = False):
5054
"""
5155
Initializes sqs client.
5256
"""
5357
config = config or Config()
5458
self.client = boto3.client("sqs", config=config)
59+
self.suppress_exception = suppress_exception
5560

5661
super().__init__()
5762

@@ -97,10 +102,17 @@ def _clean(self):
97102
"""
98103
Delete messages from Queue in case of partial failure.
99104
"""
100-
if not (self.fail_messages and self.success_messages):
105+
# If all messages were successful, fall back to the default SQS -
106+
# Lambda behaviour which deletes messages if Lambda responds successfully
107+
if not self.fail_messages:
101108
return
102109

103110
queue_url = self._get_queue_url()
104111
entries_to_remove = self._get_entries_to_clean()
105112

106-
return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
113+
delete_message_response = self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
114+
115+
if self.fail_messages and not self.suppress_exception:
116+
raise SQSBatchProcessingError(list(self.exceptions))
117+
118+
return delete_message_response

tests/functional/test_utilities_batch.py

+96-16
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from botocore.stub import Stubber
77

88
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor, sqs_batch_processor
9+
from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError
910

1011

1112
@pytest.fixture(scope="module")
@@ -47,13 +48,25 @@ def partial_processor(config) -> PartialSQSProcessor:
4748
return PartialSQSProcessor(config=config)
4849

4950

51+
@pytest.fixture(scope="function")
52+
def partial_processor_suppressed(config) -> PartialSQSProcessor:
53+
return PartialSQSProcessor(config=config, suppress_exception=True)
54+
55+
5056
@pytest.fixture(scope="function")
5157
def stubbed_partial_processor(config) -> PartialSQSProcessor:
5258
processor = PartialSQSProcessor(config=config)
5359
with Stubber(processor.client) as stubber:
5460
yield stubber, processor
5561

5662

63+
@pytest.fixture(scope="function")
64+
def stubbed_partial_processor_suppressed(config) -> PartialSQSProcessor:
65+
processor = PartialSQSProcessor(config=config, suppress_exception=True)
66+
with Stubber(processor.client) as stubber:
67+
yield stubber, processor
68+
69+
5770
def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler, partial_processor):
5871
"""
5972
Test processor with one failing record
@@ -68,16 +81,13 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha
6881
with Stubber(partial_processor.client) as stubber:
6982
stubber.add_response("delete_message_batch", response)
7083

71-
with partial_processor(records, record_handler) as ctx:
72-
result = ctx.process()
84+
with pytest.raises(SQSBatchProcessingError) as error:
85+
with partial_processor(records, record_handler) as ctx:
86+
ctx.process()
7387

88+
assert len(error.value.args[0]) == 1
7489
stubber.assert_no_pending_responses()
7590

76-
assert result == [
77-
("fail", ("Failed to process record.",), fail_record),
78-
("success", success_record["body"], success_record),
79-
]
80-
8191

8292
def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_handler, partial_processor):
8393
"""
@@ -126,18 +136,17 @@ def lambda_handler(event, context):
126136

127137
fail_record = sqs_event_factory("fail")
128138

129-
event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]}
139+
event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("fail"), sqs_event_factory("success")]}
130140
response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
131141

132142
with Stubber(partial_processor.client) as stubber:
133143
stubber.add_response("delete_message_batch", response)
144+
with pytest.raises(SQSBatchProcessingError) as error:
145+
lambda_handler(event, {})
134146

135-
result = lambda_handler(event, {})
136-
147+
assert len(error.value.args[0]) == 2
137148
stubber.assert_no_pending_responses()
138149

139-
assert result is True
140-
141150

142151
@patch("aws_lambda_powertools.utilities.batch.middlewares.PartialSQSProcessor")
143152
def test_sqs_batch_processor_middleware(
@@ -159,10 +168,11 @@ def lambda_handler(event, context):
159168
event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]}
160169
response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
161170
stubber.add_response("delete_message_batch", response)
162-
result = lambda_handler(event, {})
163-
stubber.assert_no_pending_responses()
171+
with pytest.raises(SQSBatchProcessingError) as error:
172+
lambda_handler(event, {})
164173

165-
assert result is True
174+
assert len(error.value.args[0]) == 1
175+
stubber.assert_no_pending_responses()
166176

167177

168178
def test_batch_processor_middleware_with_custom_processor(capsys, sqs_event_factory, record_handler, config):
@@ -188,10 +198,80 @@ def lambda_handler(event, context):
188198

189199
with Stubber(processor.client) as stubber:
190200
stubber.add_response("delete_message_batch", response)
201+
with pytest.raises(SQSBatchProcessingError) as error:
202+
lambda_handler(event, {})
203+
204+
stubber.assert_no_pending_responses()
205+
206+
assert len(error.value.args[0]) == 1
207+
assert capsys.readouterr().out == "Oh no ! It's a failure.\n"
208+
209+
210+
def test_batch_processor_middleware_suppressed_exceptions(
211+
sqs_event_factory, record_handler, partial_processor_suppressed
212+
):
213+
"""
214+
Test middleware's integration with PartialSQSProcessor
215+
"""
216+
217+
@batch_processor(record_handler=record_handler, processor=partial_processor_suppressed)
218+
def lambda_handler(event, context):
219+
return True
220+
221+
fail_record = sqs_event_factory("fail")
222+
223+
event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("fail"), sqs_event_factory("success")]}
224+
response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
191225

226+
with Stubber(partial_processor_suppressed.client) as stubber:
227+
stubber.add_response("delete_message_batch", response)
192228
result = lambda_handler(event, {})
193229

194230
stubber.assert_no_pending_responses()
231+
assert result is True
232+
233+
234+
def test_partial_sqs_processor_suppressed_exceptions(sqs_event_factory, record_handler, partial_processor_suppressed):
235+
"""
236+
Test processor without failure
237+
"""
195238

239+
first_record = sqs_event_factory("success")
240+
second_record = sqs_event_factory("fail")
241+
records = [first_record, second_record]
242+
243+
fail_record = sqs_event_factory("fail")
244+
response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
245+
246+
with Stubber(partial_processor_suppressed.client) as stubber:
247+
stubber.add_response("delete_message_batch", response)
248+
with partial_processor_suppressed(records, record_handler) as ctx:
249+
ctx.process()
250+
251+
assert partial_processor_suppressed.success_messages == [first_record]
252+
253+
254+
@patch("aws_lambda_powertools.utilities.batch.middlewares.PartialSQSProcessor")
255+
def test_sqs_batch_processor_middleware_suppressed_exception(
256+
patched_sqs_processor, sqs_event_factory, record_handler, stubbed_partial_processor_suppressed
257+
):
258+
"""
259+
Test middleware's integration with PartialSQSProcessor
260+
"""
261+
262+
@sqs_batch_processor(record_handler=record_handler)
263+
def lambda_handler(event, context):
264+
return True
265+
266+
stubber, processor = stubbed_partial_processor_suppressed
267+
patched_sqs_processor.return_value = processor
268+
269+
fail_record = sqs_event_factory("fail")
270+
271+
event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]}
272+
response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
273+
stubber.add_response("delete_message_batch", response)
274+
result = lambda_handler(event, {})
275+
276+
stubber.assert_no_pending_responses()
196277
assert result is True
197-
assert capsys.readouterr().out == "Oh no ! It's a failure.\n"

tests/unit/test_utilities_batch.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from botocore.config import Config
33

44
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
5+
from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError
56

67

78
@pytest.fixture(scope="function")
@@ -126,8 +127,8 @@ def test_partial_sqs_clean(monkeypatch, mocker, partial_sqs_processor):
126127
entries_to_clean_mock.return_value = mocker.sentinel.entries_to_clean
127128

128129
client_mock = mocker.patch.object(partial_sqs_processor, "client", autospec=True)
129-
130-
partial_sqs_processor._clean()
130+
with pytest.raises(SQSBatchProcessingError):
131+
partial_sqs_processor._clean()
131132

132133
client_mock.delete_message_batch.assert_called_once_with(
133134
QueueUrl=mocker.sentinel.queue_url, Entries=mocker.sentinel.entries_to_clean

0 commit comments

Comments
 (0)