Skip to content

Commit b5817f7

Browse files
committed
[issues/6508](-) key in KafkaRecordModel is made optional, added a new Kafka record without key into json files and added tests
1 parent 04fef34 commit b5817f7

File tree

6 files changed

+98
-22
lines changed

6 files changed

+98
-22
lines changed

aws_lambda_powertools/utilities/data_classes/kafka_event.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,25 @@ def timestamp_type(self) -> str:
3737
return self["timestampType"]
3838

3939
@property
40-
def key(self) -> str:
41-
"""The raw (base64 encoded) Kafka record key."""
42-
return self["key"]
40+
def key(self) -> str | None:
41+
"""
42+
The raw (base64 encoded) Kafka record key.
43+
44+
This key is optional; if not provided,
45+
a round-robin algorithm will be used to determine
46+
the partition for the message.
47+
"""
48+
49+
return self.get("key")
4350

4451
@property
45-
def decoded_key(self) -> bytes:
46-
"""Decode the base64 encoded key as bytes."""
47-
return base64.b64decode(self.key)
52+
def decoded_key(self) -> bytes | None:
53+
"""
54+
Decode the base64 encoded key as bytes.
55+
56+
If the key is not provided, this will return None.
57+
"""
58+
return None if self.key is None else base64.b64decode(self.key)
4859

4960
@property
5061
def value(self) -> str:

aws_lambda_powertools/utilities/parser/models/kafka.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from typing import Dict, List, Literal, Type, Union
2+
from typing import Dict, List, Literal, Optional, Type, Union
33

44
from pydantic import BaseModel, field_validator
55

@@ -14,12 +14,16 @@ class KafkaRecordModel(BaseModel):
1414
offset: int
1515
timestamp: datetime
1616
timestampType: str
17-
key: bytes
17+
key: Optional[bytes] = None
1818
value: Union[str, Type[BaseModel]]
1919
headers: List[Dict[str, bytes]]
2020

21-
# Added type ignore to keep compatibility between Pydantic v1 and v2
22-
_decode_key = field_validator("key")(base64_decode) # type: ignore[type-var, unused-ignore]
21+
# key is optional; only decode if not None
22+
@field_validator("key", mode="before")
23+
def decode_key(cls, value):
24+
if value is not None:
25+
return base64_decode(value)
26+
return value
2327

2428
@field_validator("value", mode="before")
2529
def data_base64_decode(cls, value):

tests/events/kafkaEventMsk.json

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,31 @@
2929
]
3030
}
3131
]
32+
},
33+
{
34+
"topic":"mytopic",
35+
"partition":0,
36+
"offset":15,
37+
"timestamp":1545084650987,
38+
"timestampType":"CREATE_TIME",
39+
"value":"eyJrZXkiOiJ2YWx1ZSJ9",
40+
"headers":[
41+
{
42+
"headerKey":[
43+
104,
44+
101,
45+
97,
46+
100,
47+
101,
48+
114,
49+
86,
50+
97,
51+
108,
52+
117,
53+
101
54+
]
55+
}
56+
]
3257
}
3358
]
3459
}

tests/events/kafkaEventSelfManaged.json

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,31 @@
2828
]
2929
}
3030
]
31+
},
32+
{
33+
"topic": "mytopic",
34+
"partition": 0,
35+
"offset": 15,
36+
"timestamp": 1545084650987,
37+
"timestampType": "CREATE_TIME",
38+
"value": "eyJrZXkiOiJ2YWx1ZSJ9",
39+
"headers": [
40+
{
41+
"headerKey": [
42+
104,
43+
101,
44+
97,
45+
100,
46+
101,
47+
114,
48+
86,
49+
97,
50+
108,
51+
117,
52+
101
53+
]
54+
}
55+
]
3156
}
3257
]
3358
}

tests/unit/data_classes/required_dependencies/test_kafka_event.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def test_kafka_msk_event():
2121
assert parsed_event.decoded_bootstrap_servers == bootstrap_servers_list
2222

2323
records = list(parsed_event.records)
24-
assert len(records) == 1
24+
assert len(records) == 2
2525
record = records[0]
2626
raw_record = raw_event["records"]["mytopic-0"][0]
2727
assert record.topic == raw_record["topic"]
@@ -37,6 +37,9 @@ def test_kafka_msk_event():
3737

3838
assert parsed_event.record == records[0]
3939

40+
record = records[1]
41+
assert record.key is None
42+
4043

4144
def test_kafka_self_managed_event():
4245
raw_event = load_event("kafkaEventSelfManaged.json")
@@ -52,7 +55,7 @@ def test_kafka_self_managed_event():
5255
assert parsed_event.decoded_bootstrap_servers == bootstrap_servers_list
5356

5457
records = list(parsed_event.records)
55-
assert len(records) == 1
58+
assert len(records) == 2
5659
record = records[0]
5760
raw_record = raw_event["records"]["mytopic-0"][0]
5861
assert record.topic == raw_record["topic"]
@@ -68,14 +71,18 @@ def test_kafka_self_managed_event():
6871

6972
assert parsed_event.record == records[0]
7073

74+
record = records[1]
75+
assert record.key is None
76+
7177

7278
def test_kafka_record_property_with_stopiteration_error():
7379
# GIVEN a kafka event with one record
7480
raw_event = load_event("kafkaEventMsk.json")
7581
parsed_event = KafkaEvent(raw_event)
7682

77-
# WHEN calling record property twice
83+
# WHEN calling record property thrice
7884
# THEN raise StopIteration
7985
with pytest.raises(StopIteration):
8086
assert parsed_event.record.topic is not None
81-
assert parsed_event.record.partition is not None
87+
assert parsed_event.record.topic is not None
88+
assert parsed_event.record.topic is not None

tests/unit/parser/_pydantic/test_kafka.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ def test_kafka_msk_event_with_envelope():
1515
model=MyLambdaKafkaBusiness,
1616
envelope=envelopes.KafkaEnvelope,
1717
)
18-
19-
assert parsed_event[0].key == "value"
20-
assert len(parsed_event) == 1
18+
for i in [0, 1]:
19+
assert parsed_event[i].key == "value"
20+
assert len(parsed_event) == 2
2121

2222

2323
def test_kafka_self_managed_event_with_envelope():
@@ -27,9 +27,9 @@ def test_kafka_self_managed_event_with_envelope():
2727
model=MyLambdaKafkaBusiness,
2828
envelope=envelopes.KafkaEnvelope,
2929
)
30-
31-
assert parsed_event[0].key == "value"
32-
assert len(parsed_event) == 1
30+
for i in [0, 1]:
31+
assert parsed_event[i].key == "value"
32+
assert len(parsed_event) == 2
3333

3434

3535
def test_self_managed_kafka_event():
@@ -41,7 +41,7 @@ def test_self_managed_kafka_event():
4141
assert parsed_event.bootstrapServers == raw_event["bootstrapServers"].split(",")
4242

4343
records = list(parsed_event.records["mytopic-0"])
44-
assert len(records) == 1
44+
assert len(records) == 2
4545
record: KafkaRecordModel = records[0]
4646
raw_record = raw_event["records"]["mytopic-0"][0]
4747
assert record.topic == raw_record["topic"]
@@ -55,6 +55,8 @@ def test_self_managed_kafka_event():
5555
assert record.value == '{"key":"value"}'
5656
assert len(record.headers) == 1
5757
assert record.headers[0]["headerKey"] == b"headerValue"
58+
record: KafkaRecordModel = records[1]
59+
assert record.key is None
5860

5961

6062
def test_kafka_msk_event():
@@ -66,7 +68,7 @@ def test_kafka_msk_event():
6668
assert parsed_event.eventSourceArn == raw_event["eventSourceArn"]
6769

6870
records = list(parsed_event.records["mytopic-0"])
69-
assert len(records) == 1
71+
assert len(records) == 2
7072
record: KafkaRecordModel = records[0]
7173
raw_record = raw_event["records"]["mytopic-0"][0]
7274
assert record.topic == raw_record["topic"]
@@ -80,3 +82,5 @@ def test_kafka_msk_event():
8082
assert record.value == '{"key":"value"}'
8183
assert len(record.headers) == 1
8284
assert record.headers[0]["headerKey"] == b"headerValue"
85+
record: KafkaRecordModel = records[1]
86+
assert record.key is None

0 commit comments

Comments
 (0)