|
1 | 1 | import logging
|
2 |
| -from typing import List, Optional, Tuple |
| 2 | +from typing import Dict, List, Optional, Tuple |
3 | 3 |
|
4 | 4 | from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
|
5 | 5 | from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel
|
@@ -106,23 +106,36 @@ def process(self) -> List[Tuple]:
|
106 | 106 | # If a processed message fail and skip_group_on_error is True,
|
107 | 107 | # mark subsequent messages from the same MessageGroupId as skipped
|
108 | 108 | if processed_messages[0] == "fail" and self._skip_group_on_error:
|
109 |
| - _attributes_record = record.get("attributes", {}) |
110 |
| - for subsequent_record in self.records[i + 1 :]: |
111 |
| - _attributes = subsequent_record.get("attributes", {}) |
112 |
| - if _attributes.get("MessageGroupId") == _attributes_record.get("MessageGroupId"): |
113 |
| - skip_messages_group_id.append(subsequent_record.get("messageId")) |
114 |
| - data = self._to_batch_type( |
115 |
| - record=subsequent_record, |
116 |
| - event_type=self.event_type, |
117 |
| - model=self.model, |
118 |
| - ) |
119 |
| - result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc)) |
| 109 | + self._process_failed_subsequent_messages(record, i, skip_messages_group_id, result) |
120 | 110 |
|
121 | 111 | # Append the processed message normally
|
122 | 112 | result.append(processed_messages)
|
123 | 113 |
|
124 | 114 | return result
|
125 | 115 |
|
| 116 | + def _process_failed_subsequent_messages( |
| 117 | + self, |
| 118 | + record: Dict, |
| 119 | + i: int, |
| 120 | + skip_messages_group_id: List, |
| 121 | + result: List[Tuple], |
| 122 | + ) -> None: |
| 123 | + """ |
| 124 | + Process failed subsequent messages from the same MessageGroupId and mark them as skipped. |
| 125 | + """ |
| 126 | + _attributes_record = record.get("attributes", {}) |
| 127 | + |
| 128 | + for subsequent_record in self.records[i + 1 :]: |
| 129 | + _attributes = subsequent_record.get("attributes", {}) |
| 130 | + if _attributes.get("MessageGroupId") == _attributes_record.get("MessageGroupId"): |
| 131 | + skip_messages_group_id.append(subsequent_record.get("messageId")) |
| 132 | + data = self._to_batch_type( |
| 133 | + record=subsequent_record, |
| 134 | + event_type=self.event_type, |
| 135 | + model=self.model, |
| 136 | + ) |
| 137 | + result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc)) |
| 138 | + |
126 | 139 | def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]:
|
127 | 140 | """
|
128 | 141 | Starting from the first failure index, fail all the remaining messages, and append them to the result list.
|
|
0 commit comments