Skip to content

Commit f41c82c

Browse files
feat(parser): changes in envelope
1 parent 86ef6de commit f41c82c

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

aws_lambda_powertools/utilities/parser/envelopes/kafka.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
2-
from typing import Any, Dict, List, Optional, Type, Union
2+
from typing import Any, Dict, List, Optional, Type, Union, cast
33

4-
from ..models import KafkaMskEventModel
4+
from ..models import KafkaMskEventModel, KafkaSelfManagedEventModel
55
from ..types import Model
66
from .base import BaseEnvelope
77

@@ -32,8 +32,11 @@ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model])
3232
List
3333
List of records parsed with model provided
3434
"""
35-
logger.debug(f"Parsing incoming data with Kafka event model {KafkaMskEventModel}")
36-
parsed_envelope: KafkaMskEventModel = KafkaMskEventModel.parse_obj(data)
35+
event_source = cast(dict, data).get("eventSource")
36+
model_parse_event = KafkaMskEventModel if event_source == "aws:kafka" else KafkaSelfManagedEventModel
37+
38+
logger.debug(f"Parsing incoming data with Kafka event model {model_parse_event}")
39+
parsed_envelope = model_parse_event.parse_obj(data)
3740
logger.debug(f"Parsing Kafka event records in `value` with {model}")
3841
ret_list = []
3942
for records in parsed_envelope.records.values():

tests/functional/parser/test_kafka.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,16 @@ def handle_kafka_event(event: KafkaSelfManagedEventModel, _: LambdaContext):
2222
return event
2323

2424

25-
def test_kafka_event_with_envelope():
25+
def test_kafka_msk_event_with_envelope():
2626
event = load_event("kafkaEventMsk.json")
2727
handle_lambda_kafka_with_envelope(event, LambdaContext())
2828

2929

30+
def test_kafka_self_managed_event_with_envelope():
31+
event = load_event("kafkaEventSelfManaged.json")
32+
handle_lambda_kafka_with_envelope(event, LambdaContext())
33+
34+
3035
def test_self_managed_kafka_event():
3136
json_event = load_event("kafkaEventSelfManaged.json")
3237
event: KafkaSelfManagedEventModel = handle_kafka_event(json_event, LambdaContext())

0 commit comments

Comments
 (0)