Skip to content

Commit 9333e14

Browse files
author
Michal Ploski
committed
Use threading for batch message delete
1 parent b70d73d commit 9333e14

File tree

2 files changed

+47
-19
lines changed

2 files changed

+47
-19
lines changed

Diff for: aws_lambda_powertools/utilities/batch/sqs.py

+35-14
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
Batch SQS utilities
55
"""
66
import logging
7+
import math
78
import sys
8-
from typing import Callable, Dict, List, Optional, Tuple, cast
9+
from concurrent.futures import ThreadPoolExecutor, as_completed
10+
from typing import Any, Callable, Dict, List, Optional, Tuple, cast
911

1012
import boto3
1113
from botocore.config import Config
@@ -73,6 +75,7 @@ def __init__(
7375
session = boto3_session or boto3.session.Session()
7476
self.client = session.client("sqs", config=config)
7577
self.suppress_exception = suppress_exception
78+
self.max_message_batch = 10
7679

7780
super().__init__()
7881

@@ -120,28 +123,39 @@ def _prepare(self):
120123
self.success_messages.clear()
121124
self.fail_messages.clear()
122125

123-
def _clean(self):
126+
def _clean(self) -> Optional[List]:
124127
"""
125128
Delete messages from Queue in case of partial failure.
126129
"""
130+
127131
# If all messages were successful, fall back to the default SQS -
128-
# Lambda behaviour which deletes messages if Lambda responds successfully
132+
# Lambda behavior which deletes messages if Lambda responds successfully
129133
if not self.fail_messages:
130134
logger.debug(f"All {len(self.success_messages)} records successfully processed")
131-
return
135+
return None
132136

133137
queue_url = self._get_queue_url()
134138
entries_to_remove = self._get_entries_to_clean()
135-
136-
delete_message_response = None
137-
while entries_to_remove:
138-
# Batch delete up to 10 messages at a time (SQS limit)
139-
delete_message_response = self.client.delete_message_batch(
140-
QueueUrl=queue_url,
141-
Entries=entries_to_remove[:10],
142-
)
143-
entries_to_remove = entries_to_remove[10:]
144-
139+
# Batch delete up to 10 messages at a time (SQS limit)
140+
max_workers = math.ceil(len(entries_to_remove) / self.max_message_batch)
141+
142+
if entries_to_remove:
143+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
144+
futures, results = [], []
145+
while entries_to_remove:
146+
futures.append(
147+
executor.submit(
148+
self._delete_messages, queue_url, entries_to_remove[: self.max_message_batch], self.client
149+
)
150+
)
151+
entries_to_remove = entries_to_remove[self.max_message_batch :]
152+
for future in as_completed(futures):
153+
try:
154+
logger.debug("Deleted batch of processed messages from SQS")
155+
results.append(future.result())
156+
except Exception:
157+
logger.exception("Couldn't remove batch of processed messages from SQS")
158+
raise
145159
if self.suppress_exception:
146160
logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed")
147161
else:
@@ -152,6 +166,13 @@ def _clean(self):
152166
child_exceptions=self.exceptions,
153167
)
154168

169+
return results
170+
171+
def _delete_messages(self, queue_url: str, entries_to_remove: List, sqs_client: Any):
172+
delete_message_response = sqs_client.delete_message_batch(
173+
QueueUrl=queue_url,
174+
Entries=entries_to_remove,
175+
)
155176
return delete_message_response
156177

157178

Diff for: tests/functional/test_utilities_batch.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import math
23
from random import randint
34
from typing import Callable, Dict, Optional
45
from unittest.mock import patch
@@ -166,20 +167,26 @@ def factory(item: Dict) -> str:
166167
return factory
167168

168169

169-
def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler, partial_processor):
170+
@pytest.mark.parametrize(
171+
"success_messages_count",
172+
([1, 18, 34]),
173+
)
174+
def test_partial_sqs_processor_context_with_failure(
175+
success_messages_count, sqs_event_factory, record_handler, partial_processor
176+
):
170177
"""
171178
Test processor with one failing record
172179
"""
173180
fail_record = sqs_event_factory("fail")
174-
success_record = sqs_event_factory("success")
181+
success_records = [sqs_event_factory("success") for i in range(0, success_messages_count)]
175182

176-
records = [fail_record, success_record]
183+
records = [fail_record, *success_records]
177184

178185
response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
179186

180187
with Stubber(partial_processor.client) as stubber:
181-
stubber.add_response("delete_message_batch", response)
182-
188+
for _ in range(0, math.ceil((success_messages_count / partial_processor.max_message_batch))):
189+
stubber.add_response("delete_message_batch", response)
183190
with pytest.raises(SQSBatchProcessingError) as error:
184191
with partial_processor(records, record_handler) as ctx:
185192
ctx.process()

0 commit comments

Comments
 (0)