Skip to content

Commit e96e73c

Browse files
fix(parser): make key attribute optional in Kafka model (#6523)
* [issues/6508](-) key in KafkaRecordModel is made optional, added a new Kafka record without key into json files and added tests * [issues/6508](-) added the tests if key is set None, added a new Kafka record with explicit setting "key": null into kafkaEventMsk.json and kafkaEventSelfManaged.json. --------- Co-authored-by: Leandro Damascena <[email protected]>
1 parent 395aba2 commit e96e73c

File tree

6 files changed

+152
-23
lines changed

6 files changed

+152
-23
lines changed

aws_lambda_powertools/utilities/data_classes/kafka_event.py

+17-6
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,25 @@ def timestamp_type(self) -> str:
3737
return self["timestampType"]
3838

3939
@property
40-
def key(self) -> str:
41-
"""The raw (base64 encoded) Kafka record key."""
42-
return self["key"]
40+
def key(self) -> str | None:
41+
"""
42+
The raw (base64 encoded) Kafka record key.
43+
44+
This key is optional; if not provided,
45+
a round-robin algorithm will be used to determine
46+
the partition for the message.
47+
"""
48+
49+
return self.get("key")
4350

4451
@property
45-
def decoded_key(self) -> bytes:
46-
"""Decode the base64 encoded key as bytes."""
47-
return base64.b64decode(self.key)
52+
def decoded_key(self) -> bytes | None:
53+
"""
54+
Decode the base64 encoded key as bytes.
55+
56+
If the key is not provided, this will return None.
57+
"""
58+
return None if self.key is None else base64.b64decode(self.key)
4859

4960
@property
5061
def value(self) -> str:

aws_lambda_powertools/utilities/parser/models/kafka.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from typing import Dict, List, Literal, Type, Union
2+
from typing import Dict, List, Literal, Optional, Type, Union
33

44
from pydantic import BaseModel, field_validator
55

@@ -14,12 +14,16 @@ class KafkaRecordModel(BaseModel):
1414
offset: int
1515
timestamp: datetime
1616
timestampType: str
17-
key: bytes
17+
key: Optional[bytes] = None
1818
value: Union[str, Type[BaseModel]]
1919
headers: List[Dict[str, bytes]]
2020

21-
# Added type ignore to keep compatibility between Pydantic v1 and v2
22-
_decode_key = field_validator("key")(base64_decode) # type: ignore[type-var, unused-ignore]
21+
# key is optional; only decode if not None
22+
@field_validator("key", mode="before")
23+
def decode_key(cls, value):
24+
if value is not None:
25+
return base64_decode(value)
26+
return value
2327

2428
@field_validator("value", mode="before")
2529
def data_base64_decode(cls, value):

tests/events/kafkaEventMsk.json

+51
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,57 @@
2929
]
3030
}
3131
]
32+
},
33+
{
34+
"topic":"mytopic",
35+
"partition":0,
36+
"offset":15,
37+
"timestamp":1545084650987,
38+
"timestampType":"CREATE_TIME",
39+
"value":"eyJrZXkiOiJ2YWx1ZSJ9",
40+
"headers":[
41+
{
42+
"headerKey":[
43+
104,
44+
101,
45+
97,
46+
100,
47+
101,
48+
114,
49+
86,
50+
97,
51+
108,
52+
117,
53+
101
54+
]
55+
}
56+
]
57+
},
58+
{
59+
"topic":"mytopic",
60+
"partition":0,
61+
"offset":15,
62+
"timestamp":1545084650987,
63+
"timestampType":"CREATE_TIME",
64+
"key": null,
65+
"value":"eyJrZXkiOiJ2YWx1ZSJ9",
66+
"headers":[
67+
{
68+
"headerKey":[
69+
104,
70+
101,
71+
97,
72+
100,
73+
101,
74+
114,
75+
86,
76+
97,
77+
108,
78+
117,
79+
101
80+
]
81+
}
82+
]
3283
}
3384
]
3485
}

tests/events/kafkaEventSelfManaged.json

+51
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,57 @@
2828
]
2929
}
3030
]
31+
},
32+
{
33+
"topic": "mytopic",
34+
"partition": 0,
35+
"offset": 15,
36+
"timestamp": 1545084650987,
37+
"timestampType": "CREATE_TIME",
38+
"value": "eyJrZXkiOiJ2YWx1ZSJ9",
39+
"headers": [
40+
{
41+
"headerKey": [
42+
104,
43+
101,
44+
97,
45+
100,
46+
101,
47+
114,
48+
86,
49+
97,
50+
108,
51+
117,
52+
101
53+
]
54+
}
55+
]
56+
},
57+
{
58+
"topic": "mytopic",
59+
"partition": 0,
60+
"offset": 15,
61+
"timestamp": 1545084650987,
62+
"timestampType": "CREATE_TIME",
63+
"key": null,
64+
"value": "eyJrZXkiOiJ2YWx1ZSJ9",
65+
"headers": [
66+
{
67+
"headerKey": [
68+
104,
69+
101,
70+
97,
71+
100,
72+
101,
73+
114,
74+
86,
75+
97,
76+
108,
77+
117,
78+
101
79+
]
80+
}
81+
]
3182
}
3283
]
3384
}

tests/unit/data_classes/required_dependencies/test_kafka_event.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def test_kafka_msk_event():
2121
assert parsed_event.decoded_bootstrap_servers == bootstrap_servers_list
2222

2323
records = list(parsed_event.records)
24-
assert len(records) == 1
24+
assert len(records) == 3
2525
record = records[0]
2626
raw_record = raw_event["records"]["mytopic-0"][0]
2727
assert record.topic == raw_record["topic"]
@@ -36,6 +36,9 @@ def test_kafka_msk_event():
3636
assert record.decoded_headers["HeaderKey"] == b"headerValue"
3737

3838
assert parsed_event.record == records[0]
39+
for i in range(1, 3):
40+
record = records[i]
41+
assert record.key is None
3942

4043

4144
def test_kafka_self_managed_event():
@@ -52,7 +55,7 @@ def test_kafka_self_managed_event():
5255
assert parsed_event.decoded_bootstrap_servers == bootstrap_servers_list
5356

5457
records = list(parsed_event.records)
55-
assert len(records) == 1
58+
assert len(records) == 3
5659
record = records[0]
5760
raw_record = raw_event["records"]["mytopic-0"][0]
5861
assert record.topic == raw_record["topic"]
@@ -68,14 +71,18 @@ def test_kafka_self_managed_event():
6871

6972
assert parsed_event.record == records[0]
7073

74+
for i in range(1, 3):
75+
record = records[i]
76+
assert record.key is None
77+
7178

7279
def test_kafka_record_property_with_stopiteration_error():
7380
# GIVEN a kafka event with one record
7481
raw_event = load_event("kafkaEventMsk.json")
7582
parsed_event = KafkaEvent(raw_event)
7683

77-
# WHEN calling record property twice
84+
# WHEN calling record property thrice
7885
# THEN raise StopIteration
7986
with pytest.raises(StopIteration):
80-
assert parsed_event.record.topic is not None
81-
assert parsed_event.record.partition is not None
87+
for _ in range(4):
88+
assert parsed_event.record.topic is not None

tests/unit/parser/_pydantic/test_kafka.py

+13-8
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ def test_kafka_msk_event_with_envelope():
1515
model=MyLambdaKafkaBusiness,
1616
envelope=envelopes.KafkaEnvelope,
1717
)
18-
19-
assert parsed_event[0].key == "value"
20-
assert len(parsed_event) == 1
18+
for i in range(3):
19+
assert parsed_event[i].key == "value"
20+
assert len(parsed_event) == 3
2121

2222

2323
def test_kafka_self_managed_event_with_envelope():
@@ -27,9 +27,9 @@ def test_kafka_self_managed_event_with_envelope():
2727
model=MyLambdaKafkaBusiness,
2828
envelope=envelopes.KafkaEnvelope,
2929
)
30-
31-
assert parsed_event[0].key == "value"
32-
assert len(parsed_event) == 1
30+
for i in range(3):
31+
assert parsed_event[i].key == "value"
32+
assert len(parsed_event) == 3
3333

3434

3535
def test_self_managed_kafka_event():
@@ -41,7 +41,7 @@ def test_self_managed_kafka_event():
4141
assert parsed_event.bootstrapServers == raw_event["bootstrapServers"].split(",")
4242

4343
records = list(parsed_event.records["mytopic-0"])
44-
assert len(records) == 1
44+
assert len(records) == 3
4545
record: KafkaRecordModel = records[0]
4646
raw_record = raw_event["records"]["mytopic-0"][0]
4747
assert record.topic == raw_record["topic"]
@@ -55,6 +55,8 @@ def test_self_managed_kafka_event():
5555
assert record.value == '{"key":"value"}'
5656
assert len(record.headers) == 1
5757
assert record.headers[0]["headerKey"] == b"headerValue"
58+
record: KafkaRecordModel = records[1]
59+
assert record.key is None
5860

5961

6062
def test_kafka_msk_event():
@@ -66,7 +68,7 @@ def test_kafka_msk_event():
6668
assert parsed_event.eventSourceArn == raw_event["eventSourceArn"]
6769

6870
records = list(parsed_event.records["mytopic-0"])
69-
assert len(records) == 1
71+
assert len(records) == 3
7072
record: KafkaRecordModel = records[0]
7173
raw_record = raw_event["records"]["mytopic-0"][0]
7274
assert record.topic == raw_record["topic"]
@@ -80,3 +82,6 @@ def test_kafka_msk_event():
8082
assert record.value == '{"key":"value"}'
8183
assert len(record.headers) == 1
8284
assert record.headers[0]["headerKey"] == b"headerValue"
85+
for i in range(1, 3):
86+
record: KafkaRecordModel = records[i]
87+
assert record.key is None

0 commit comments

Comments
 (0)