Skip to content

Commit 2cc1135

Browse files
whardierMichal Ploski
and
Michal Ploski
authored
fix(batch): delete >10 messages in legacy sqs processor (#818)
Co-authored-by: Michal Ploski <[email protected]>
1 parent 6869155 commit 2cc1135

File tree

3 files changed

+69
-15
lines changed

3 files changed

+69
-15
lines changed

Diff for: aws_lambda_powertools/utilities/batch/sqs.py

+33-7
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
Batch SQS utilities
55
"""
66
import logging
7+
import math
78
import sys
8-
from typing import Callable, Dict, List, Optional, Tuple, cast
9+
from concurrent.futures import ThreadPoolExecutor, as_completed
10+
from typing import Any, Callable, Dict, List, Optional, Tuple, cast
911

1012
import boto3
1113
from botocore.config import Config
@@ -73,6 +75,7 @@ def __init__(
7375
session = boto3_session or boto3.session.Session()
7476
self.client = session.client("sqs", config=config)
7577
self.suppress_exception = suppress_exception
78+
self.max_message_batch = 10
7679

7780
super().__init__()
7881

@@ -120,23 +123,39 @@ def _prepare(self):
120123
self.success_messages.clear()
121124
self.fail_messages.clear()
122125

123-
def _clean(self):
126+
def _clean(self) -> Optional[List]:
124127
"""
125128
Delete messages from Queue in case of partial failure.
126129
"""
130+
127131
# If all messages were successful, fall back to the default SQS -
128-
# Lambda behaviour which deletes messages if Lambda responds successfully
132+
# Lambda behavior which deletes messages if Lambda responds successfully
129133
if not self.fail_messages:
130134
logger.debug(f"All {len(self.success_messages)} records successfully processed")
131-
return
135+
return None
132136

133137
queue_url = self._get_queue_url()
134138
entries_to_remove = self._get_entries_to_clean()
139+
# Batch delete up to 10 messages at a time (SQS limit)
140+
max_workers = math.ceil(len(entries_to_remove) / self.max_message_batch)
135141

136-
delete_message_response = None
137142
if entries_to_remove:
138-
delete_message_response = self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
139-
143+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
144+
futures, results = [], []
145+
while entries_to_remove:
146+
futures.append(
147+
executor.submit(
148+
self._delete_messages, queue_url, entries_to_remove[: self.max_message_batch], self.client
149+
)
150+
)
151+
entries_to_remove = entries_to_remove[self.max_message_batch :]
152+
for future in as_completed(futures):
153+
try:
154+
logger.debug("Deleted batch of processed messages from SQS")
155+
results.append(future.result())
156+
except Exception:
157+
logger.exception("Couldn't remove batch of processed messages from SQS")
158+
raise
140159
if self.suppress_exception:
141160
logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed")
142161
else:
@@ -147,6 +166,13 @@ def _clean(self):
147166
child_exceptions=self.exceptions,
148167
)
149168

169+
return results
170+
171+
def _delete_messages(self, queue_url: str, entries_to_remove: List, sqs_client: Any):
172+
delete_message_response = sqs_client.delete_message_batch(
173+
QueueUrl=queue_url,
174+
Entries=entries_to_remove,
175+
)
150176
return delete_message_response
151177

152178

Diff for: tests/functional/test_utilities_batch.py

+34-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import math
23
from random import randint
34
from typing import Callable, Dict, Optional
45
from unittest.mock import patch
@@ -166,20 +167,26 @@ def factory(item: Dict) -> str:
166167
return factory
167168

168169

169-
def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler, partial_processor):
170+
@pytest.mark.parametrize(
171+
"success_messages_count",
172+
([1, 18, 34]),
173+
)
174+
def test_partial_sqs_processor_context_with_failure(
175+
success_messages_count, sqs_event_factory, record_handler, partial_processor
176+
):
170177
"""
171-
Test processor with one failing record
178+
Test processor with one failing record and multiple processed records
172179
"""
173180
fail_record = sqs_event_factory("fail")
174-
success_record = sqs_event_factory("success")
181+
success_records = [sqs_event_factory("success") for i in range(0, success_messages_count)]
175182

176-
records = [fail_record, success_record]
183+
records = [fail_record, *success_records]
177184

178185
response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
179186

180187
with Stubber(partial_processor.client) as stubber:
181-
stubber.add_response("delete_message_batch", response)
182-
188+
for _ in range(0, math.ceil((success_messages_count / partial_processor.max_message_batch))):
189+
stubber.add_response("delete_message_batch", response)
183190
with pytest.raises(SQSBatchProcessingError) as error:
184191
with partial_processor(records, record_handler) as ctx:
185192
ctx.process()
@@ -188,6 +195,27 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha
188195
stubber.assert_no_pending_responses()
189196

190197

198+
def test_partial_sqs_processor_context_with_failure_exception(sqs_event_factory, record_handler, partial_processor):
199+
"""
200+
Test processor with one failing record
201+
"""
202+
fail_record = sqs_event_factory("fail")
203+
success_record = sqs_event_factory("success")
204+
205+
records = [fail_record, success_record]
206+
207+
with Stubber(partial_processor.client) as stubber:
208+
stubber.add_client_error(
209+
method="delete_message_batch", service_error_code="ServiceUnavailable", http_status_code=503
210+
)
211+
with pytest.raises(Exception) as error:
212+
with partial_processor(records, record_handler) as ctx:
213+
ctx.process()
214+
215+
assert "ServiceUnavailable" in str(error.value)
216+
stubber.assert_no_pending_responses()
217+
218+
191219
def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_handler, partial_processor):
192220
"""
193221
Test processor without failure

Diff for: tests/unit/test_utilities_batch.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,12 @@ def test_partial_sqs_clean(monkeypatch, mocker, partial_sqs_processor):
128128
entries_to_clean_mock = mocker.patch.object(PartialSQSProcessor, "_get_entries_to_clean")
129129

130130
queue_url_mock.return_value = mocker.sentinel.queue_url
131-
entries_to_clean_mock.return_value = mocker.sentinel.entries_to_clean
131+
entries_to_clean_mock.return_value = [mocker.sentinel.entries_to_clean]
132132

133133
client_mock = mocker.patch.object(partial_sqs_processor, "client", autospec=True)
134134
with pytest.raises(SQSBatchProcessingError):
135135
partial_sqs_processor._clean()
136136

137137
client_mock.delete_message_batch.assert_called_once_with(
138-
QueueUrl=mocker.sentinel.queue_url, Entries=mocker.sentinel.entries_to_clean
138+
QueueUrl=mocker.sentinel.queue_url, Entries=[mocker.sentinel.entries_to_clean]
139139
)

0 commit comments

Comments
 (0)