forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka.py
50 lines (39 loc) · 1.91 KB
/
kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, cast
from aws_lambda_powertools.utilities.parser.envelopes.base import BaseEnvelope
from aws_lambda_powertools.utilities.parser.models import KafkaMskEventModel, KafkaSelfManagedEventModel
if TYPE_CHECKING:
from aws_lambda_powertools.utilities.parser.types import Model
logger = logging.getLogger(__name__)
class KafkaEnvelope(BaseEnvelope):
"""Kafka event envelope to extract data within body key
The record's body parameter is a string, though it can also be a JSON encoded string.
Regardless of its type it'll be parsed into a BaseModel object.
Note: Records will be parsed the same way so if model is str,
all items in the list will be parsed as str and npt as JSON (and vice versa)
"""
def parse(self, data: dict[str, Any] | Any | None, model: type[Model]) -> list[Model | None]:
"""Parses data found with model provided
Parameters
----------
data : dict
Lambda event to be parsed
model : type[Model]
Data model provided to parse after extracting data using envelope
Returns
-------
list
List of records parsed with model provided
"""
event_source = cast(dict, data).get("eventSource")
model_parse_event: type[KafkaMskEventModel | KafkaSelfManagedEventModel] = (
KafkaMskEventModel if event_source == "aws:kafka" else KafkaSelfManagedEventModel
)
logger.debug(f"Parsing incoming data with Kafka event model {model_parse_event}")
parsed_envelope = model_parse_event.model_validate(data)
logger.debug(f"Parsing Kafka event records in `value` with {model}")
ret_list = []
for records in parsed_envelope.records.values():
ret_list += [self._parse(data=record.value, model=model) for record in records]
return ret_list