forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_kinesis_firehose_response.py
67 lines (55 loc) · 2.48 KB
/
test_kinesis_firehose_response.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseEvent,
KinesisFirehoseResponse,
KinesisFirehoseResponseRecord,
KinesisFirehoseResponseRecordMetadata,
)
from tests.functional.utils import load_event
def test_kinesis_firehose_response():
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
parsed_event = KinesisFirehoseEvent(data=raw_event)
response = KinesisFirehoseResponse()
for record in parsed_event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_text
metadata_partition = KinesisFirehoseResponseRecordMetadata(partition_keys={"year": 2023})
processed_record = KinesisFirehoseResponseRecord(
record_id=record.record_id,
result="Ok",
metadata=metadata_partition,
)
processed_record.data_from_text(data=data)
response.add_record(record=processed_record)
response_dict = response.asdict()
res_records = list(response_dict["records"])
assert len(res_records) == 2
record_01, record_02 = res_records[:]
record01_raw = raw_event["records"][0]
assert record_01["result"] == "Ok"
assert record_01["recordId"] == record01_raw["recordId"]
assert record_01["data"] == record01_raw["data"]
assert record_01["metadata"]["partitionKeys"]["year"] == 2023
assert response.records[0].data_as_bytes == b"Hello World"
assert response.records[0].data_as_text == "Hello World"
def test_kinesis_firehose_create_response():
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
parsed_event = KinesisFirehoseEvent(data=raw_event)
response = KinesisFirehoseResponse()
for record in parsed_event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_text
processed_record = record.create_firehose_response_record(
result="Ok",
)
processed_record.data_from_text(data=data)
response.add_record(record=processed_record)
response_dict = response.asdict()
res_records = list(response_dict["records"])
assert len(res_records) == 2
record_01, record_02 = res_records[:]
record01_raw = raw_event["records"][0]
assert record_01["result"] == "Ok"
assert record_01["recordId"] == record01_raw["recordId"]
assert record_01["data"] == record01_raw["data"]
assert response.records[0].data_as_bytes == b"Hello World"
assert response.records[0].data_as_text == "Hello World"