-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathkinesis_stream_event.py
122 lines (92 loc) · 3.87 KB
/
kinesis_stream_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from __future__ import annotations
import base64
import json
import zlib
from typing import TYPE_CHECKING
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import (
CloudWatchLogsDecodedData,
)
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
if TYPE_CHECKING:
from collections.abc import Iterator
class KinesisStreamRecordPayload(DictWrapper):
@property
def approximate_arrival_timestamp(self) -> float:
"""The approximate time that the record was inserted into the stream"""
return float(self["approximateArrivalTimestamp"])
@property
def data(self) -> str:
"""The data blob"""
return self["data"]
@property
def kinesis_schema_version(self) -> str:
"""Schema version for the record"""
return self["kinesisSchemaVersion"]
@property
def partition_key(self) -> str:
"""Identifies which shard in the stream the data record is assigned to"""
return self["partitionKey"]
@property
def sequence_number(self) -> str:
"""The unique identifier of the record within its shard"""
return self["sequenceNumber"]
def data_as_bytes(self) -> bytes:
"""Decode binary encoded data as bytes"""
return base64.b64decode(self.data)
def data_as_text(self) -> str:
"""Decode binary encoded data as text"""
return self.data_as_bytes().decode("utf-8")
def data_as_json(self) -> dict:
"""Decode binary encoded data as json"""
return json.loads(self.data_as_text())
def data_zlib_compressed_as_json(self) -> dict:
"""Decode binary encoded data as bytes"""
decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32)
return json.loads(decompressed)
class KinesisStreamRecord(DictWrapper):
@property
def aws_region(self) -> str:
"""AWS region where the event originated eg: us-east-1"""
return self["awsRegion"]
@property
def event_id(self) -> str:
"""A globally unique identifier for the event that was recorded in this stream record."""
return self["eventID"]
@property
def event_name(self) -> str:
"""Event type eg: aws:kinesis:record"""
return self["eventName"]
@property
def event_source(self) -> str:
"""The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis"""
return self["eventSource"]
@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceARN"]
@property
def event_version(self) -> str:
"""The eventVersion key value contains a major and minor version in the form <major>.<minor>."""
return self["eventVersion"]
@property
def invoke_identity_arn(self) -> str:
"""The ARN for the identity used to invoke the Lambda Function"""
return self["invokeIdentityArn"]
@property
def kinesis(self) -> KinesisStreamRecordPayload:
"""Underlying Kinesis record associated with the event"""
return KinesisStreamRecordPayload(self["kinesis"])
class KinesisStreamEvent(DictWrapper):
"""Kinesis stream event
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
"""
@property
def records(self) -> Iterator[KinesisStreamRecord]:
for record in self["Records"]:
yield KinesisStreamRecord(record)
def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> list[CloudWatchLogsDecodedData]:
return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records]
def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) -> CloudWatchLogsDecodedData:
return CloudWatchLogsDecodedData(data=record.kinesis.data_zlib_compressed_as_json())