-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathtest_kafka_event.py
81 lines (60 loc) · 3 KB
/
test_kafka_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from __future__ import annotations
import pytest
from aws_lambda_powertools.utilities.data_classes import KafkaEvent
from tests.functional.utils import load_event
def test_kafka_msk_event():
raw_event = load_event("kafkaEventMsk.json")
parsed_event = KafkaEvent(raw_event)
assert parsed_event.event_source == raw_event["eventSource"]
assert parsed_event.event_source_arn == raw_event["eventSourceArn"]
bootstrap_servers_raw = raw_event["bootstrapServers"]
bootstrap_servers_list = raw_event["bootstrapServers"].split(",")
assert parsed_event.bootstrap_servers == bootstrap_servers_raw
assert parsed_event.decoded_bootstrap_servers == bootstrap_servers_list
records = list(parsed_event.records)
assert len(records) == 1
record = records[0]
raw_record = raw_event["records"]["mytopic-0"][0]
assert record.topic == raw_record["topic"]
assert record.partition == raw_record["partition"]
assert record.offset == raw_record["offset"]
assert record.timestamp == raw_record["timestamp"]
assert record.timestamp_type == raw_record["timestampType"]
assert record.decoded_key == b"recordKey"
assert record.value == raw_record["value"]
assert record.json_value == {"key": "value"}
assert record.decoded_headers == {"headerKey": b"headerValue"}
assert record.decoded_headers["HeaderKey"] == b"headerValue"
assert parsed_event.record == records[0]
def test_kafka_self_managed_event():
raw_event = load_event("kafkaEventSelfManaged.json")
parsed_event = KafkaEvent(raw_event)
assert parsed_event.event_source == raw_event["eventSource"]
bootstrap_servers_raw = raw_event["bootstrapServers"]
bootstrap_servers_list = raw_event["bootstrapServers"].split(",")
assert parsed_event.bootstrap_servers == bootstrap_servers_raw
assert parsed_event.decoded_bootstrap_servers == bootstrap_servers_list
records = list(parsed_event.records)
assert len(records) == 1
record = records[0]
raw_record = raw_event["records"]["mytopic-0"][0]
assert record.topic == raw_record["topic"]
assert record.partition == raw_record["partition"]
assert record.offset == raw_record["offset"]
assert record.timestamp == raw_record["timestamp"]
assert record.timestamp_type == raw_record["timestampType"]
assert record.decoded_key == b"recordKey"
assert record.value == raw_record["value"]
assert record.json_value == {"key": "value"}
assert record.decoded_headers == {"headerKey": b"headerValue"}
assert record.decoded_headers["HeaderKey"] == b"headerValue"
assert parsed_event.record == records[0]
def test_kafka_record_property_with_stopiteration_error():
# GIVEN a kafka event with one record
raw_event = load_event("kafkaEventMsk.json")
parsed_event = KafkaEvent(raw_event)
# WHEN calling record property twice
# THEN raise StopIteration
with pytest.raises(StopIteration):
assert parsed_event.record.topic is not None
assert parsed_event.record.partition is not None