Skip to content

refactor(batch): type response() method #3023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import sys
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, overload
from typing import Any, Callable, List, Optional, Tuple, Union, overload

from aws_lambda_powertools.shared import constants
from aws_lambda_powertools.utilities.batch.exceptions import (
BatchProcessingError,
ExceptionInfo,
)
from aws_lambda_powertools.utilities.batch.types import BatchTypeModels
from aws_lambda_powertools.utilities.batch.types import BatchTypeModels, PartialItemFailureResponse, PartialItemFailures
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBRecord,
)
Expand Down Expand Up @@ -220,7 +220,7 @@ def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:


class BasePartialBatchProcessor(BasePartialProcessor): # noqa
DEFAULT_RESPONSE: Dict[str, List[Optional[dict]]] = {"batchItemFailures": []}
DEFAULT_RESPONSE: PartialItemFailureResponse = {"batchItemFailures": []}

def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None):
"""Process batch and partially report failed items
Expand All @@ -239,7 +239,7 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N
"""
self.event_type = event_type
self.model = model
self.batch_response = copy.deepcopy(self.DEFAULT_RESPONSE)
self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE)
self._COLLECTOR_MAPPING = {
EventType.SQS: self._collect_sqs_failures,
EventType.KinesisDataStreams: self._collect_kinesis_failures,
Expand All @@ -253,7 +253,7 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N

super().__init__()

def response(self):
def response(self) -> PartialItemFailureResponse:
"""Batch items that failed processing, if any"""
return self.batch_response

Expand Down Expand Up @@ -294,7 +294,7 @@ def _has_messages_to_report(self) -> bool:
def _entire_batch_failed(self) -> bool:
return len(self.exceptions) == len(self.records)

def _get_messages_to_report(self) -> List[Dict[str, str]]:
def _get_messages_to_report(self) -> List[PartialItemFailures]:
"""
Format messages to use in batch deletion
"""
Expand Down