Skip to content

Commit a9324f9

Browse files
ran-isenbergleandrodamascenaheitorlessaRelease botpeterschutt
authored andcommitted
feat(parser): add KafkaMskEventModel and KafkaSelfManagedEventModel (#1499)
Co-authored-by: Leandro Damascena <[email protected]> Co-authored-by: Heitor Lessa <[email protected]> Co-authored-by: heitorlessa <[email protected]> Co-authored-by: Release bot <[email protected]> Co-authored-by: Peter Schutt <[email protected]> Co-authored-by: Ran Isenberg <[email protected]> Co-authored-by: Ran Isenberg <[email protected]>
1 parent 4e9bd6e commit a9324f9

File tree

12 files changed

+242
-5
lines changed

12 files changed

+242
-5
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -2393,4 +2393,4 @@
23932393
[v1.0.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v1.0.0...v1.0.1
23942394
[v1.0.0]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.11.0...v1.0.0
23952395
[v0.11.0]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.1...v0.11.0
2396-
[v0.10.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.0...v0.10.1
2396+
[v0.10.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.0...v0.10.1

aws_lambda_powertools/shared/functions.py

+20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
import base64
2+
import logging
3+
from binascii import Error as BinAsciiError
14
from typing import Optional, Union
25

6+
logger = logging.getLogger(__name__)
7+
38

49
def strtobool(value: str) -> bool:
510
"""Convert a string representation of truth to True or False.
@@ -58,3 +63,18 @@ def resolve_env_var_choice(
5863
resolved choice as either bool or environment value
5964
"""
6065
return choice if choice is not None else env
66+
67+
68+
def base64_decode(value: str) -> bytes:
69+
try:
70+
logger.debug("Decoding base64 Kafka record item before parsing")
71+
return base64.b64decode(value)
72+
except (BinAsciiError, TypeError):
73+
raise ValueError("base64 decode failed")
74+
75+
76+
def bytes_to_string(value: bytes) -> str:
77+
try:
78+
return value.decode("utf-8")
79+
except (BinAsciiError, TypeError):
80+
raise ValueError("base64 UTF-8 decode failed")

aws_lambda_powertools/utilities/data_classes/kafka_event.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,11 @@ def get_header_value(
8585

8686

8787
class KafkaEvent(DictWrapper):
88-
"""Self-managed Apache Kafka event trigger
88+
"""Self-managed or MSK Apache Kafka event trigger
8989
Documentation:
9090
--------------
9191
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
92+
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
9293
"""
9394

9495
@property
@@ -98,7 +99,7 @@ def event_source(self) -> str:
9899

99100
@property
100101
def event_source_arn(self) -> Optional[str]:
101-
"""The AWS service ARN from which the Kafka event record originated."""
102+
"""The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK."""
102103
return self.get("eventSourceArn")
103104

104105
@property

aws_lambda_powertools/utilities/parser/envelopes/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .cloudwatch import CloudWatchLogsEnvelope
55
from .dynamodb import DynamoDBStreamEnvelope
66
from .event_bridge import EventBridgeEnvelope
7+
from .kafka import KafkaEnvelope
78
from .kinesis import KinesisDataStreamEnvelope
89
from .lambda_function_url import LambdaFunctionUrlEnvelope
910
from .sns import SnsEnvelope, SnsSqsEnvelope
@@ -20,5 +21,6 @@
2021
"SnsEnvelope",
2122
"SnsSqsEnvelope",
2223
"SqsEnvelope",
24+
"KafkaEnvelope",
2325
"BaseEnvelope",
2426
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import logging
2+
from typing import Any, Dict, List, Optional, Type, Union, cast
3+
4+
from ..models import KafkaMskEventModel, KafkaSelfManagedEventModel
5+
from ..types import Model
6+
from .base import BaseEnvelope
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class KafkaEnvelope(BaseEnvelope):
12+
"""Kafka event envelope to extract data within body key
13+
The record's body parameter is a string, though it can also be a JSON encoded string.
14+
Regardless of its type it'll be parsed into a BaseModel object.
15+
16+
Note: Records will be parsed the same way so if model is str,
17+
all items in the list will be parsed as str and npt as JSON (and vice versa)
18+
"""
19+
20+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
21+
"""Parses data found with model provided
22+
23+
Parameters
24+
----------
25+
data : Dict
26+
Lambda event to be parsed
27+
model : Type[Model]
28+
Data model provided to parse after extracting data using envelope
29+
30+
Returns
31+
-------
32+
List
33+
List of records parsed with model provided
34+
"""
35+
event_source = cast(dict, data).get("eventSource")
36+
model_parse_event = KafkaMskEventModel if event_source == "aws:kafka" else KafkaSelfManagedEventModel
37+
38+
logger.debug(f"Parsing incoming data with Kafka event model {model_parse_event}")
39+
parsed_envelope = model_parse_event.parse_obj(data)
40+
logger.debug(f"Parsing Kafka event records in `value` with {model}")
41+
ret_list = []
42+
for records in parsed_envelope.records.values():
43+
ret_list += [self._parse(data=record.value, model=model) for record in records]
44+
return ret_list

aws_lambda_powertools/utilities/parser/models/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
1818
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
1919
from .event_bridge import EventBridgeModel
20+
from .kafka import KafkaBaseEventModel, KafkaMskEventModel, KafkaRecordModel, KafkaSelfManagedEventModel
2021
from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload
2122
from .lambda_function_url import LambdaFunctionUrlModel
2223
from .s3 import S3Model, S3RecordModel
@@ -98,4 +99,8 @@
9899
"APIGatewayEventRequestContext",
99100
"APIGatewayEventAuthorizer",
100101
"APIGatewayEventIdentity",
102+
"KafkaSelfManagedEventModel",
103+
"KafkaRecordModel",
104+
"KafkaMskEventModel",
105+
"KafkaBaseEventModel",
101106
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from datetime import datetime
2+
from typing import Dict, List, Type, Union
3+
4+
from pydantic import BaseModel, validator
5+
6+
from aws_lambda_powertools.shared.functions import base64_decode, bytes_to_string
7+
from aws_lambda_powertools.utilities.parser.types import Literal
8+
9+
SERVERS_DELIMITER = ","
10+
11+
12+
class KafkaRecordModel(BaseModel):
13+
topic: str
14+
partition: int
15+
offset: int
16+
timestamp: datetime
17+
timestampType: str
18+
key: bytes
19+
value: Union[str, Type[BaseModel]]
20+
headers: List[Dict[str, bytes]]
21+
22+
# validators
23+
_decode_key = validator("key", allow_reuse=True)(base64_decode)
24+
25+
@validator("value", pre=True, allow_reuse=True)
26+
def data_base64_decode(cls, value):
27+
as_bytes = base64_decode(value)
28+
return bytes_to_string(as_bytes)
29+
30+
@validator("headers", pre=True, allow_reuse=True)
31+
def decode_headers_list(cls, value):
32+
for header in value:
33+
for key, values in header.items():
34+
header[key] = bytes(values)
35+
return value
36+
37+
38+
class KafkaBaseEventModel(BaseModel):
39+
bootstrapServers: List[str]
40+
records: Dict[str, List[KafkaRecordModel]]
41+
42+
@validator("bootstrapServers", pre=True, allow_reuse=True)
43+
def split_servers(cls, value):
44+
return None if not value else value.split(SERVERS_DELIMITER)
45+
46+
47+
class KafkaSelfManagedEventModel(KafkaBaseEventModel):
48+
"""Self-managed Apache Kafka event trigger
49+
Documentation:
50+
--------------
51+
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
52+
"""
53+
54+
eventSource: Literal["aws:SelfManagedKafka"]
55+
56+
57+
class KafkaMskEventModel(KafkaBaseEventModel):
58+
"""Fully-managed AWS Apache Kafka event trigger
59+
Documentation:
60+
--------------
61+
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
62+
"""
63+
64+
eventSource: Literal["aws:kafka"]
65+
eventSourceArn: str

docs/utilities/parser.md

+3
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ Parser comes with the following built-in models:
168168
| **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway |
169169
| **APIGatewayProxyEventV2Model** | Lambda Event Source payload for Amazon API Gateway v2 payload |
170170
| **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload |
171+
| **KafkaSelfManagedEventModel** | Lambda Event Source payload for self managed Kafka payload |
172+
| **KafkaMskEventModel** | Lambda Event Source payload for AWS MSK payload |
171173

172174
### extending built-in models
173175

@@ -308,6 +310,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return
308310
| **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
309311
| **ApiGatewayV2Envelope** | 1. Parses data using `APIGatewayProxyEventV2Model`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
310312
| **LambdaFunctionUrlEnvelope** | 1. Parses data using `LambdaFunctionUrlModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
313+
| **KafkaEnvelope** | 1. Parses data using `KafkaRecordModel`. <br/> 2. Parses `value` key using your model and returns it. | `Model` |
311314

312315
### Bringing your own envelope
313316

tests/events/kafkaEventSelfManaged.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"eventSource":"aws:aws:SelfManagedKafka",
2+
"eventSource":"aws:SelfManagedKafka",
33
"bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
44
"records":{
55
"mytopic-0":[

tests/functional/parser/schemas.py

+4
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,7 @@ class MyApiGatewayBusiness(BaseModel):
9191
class MyALambdaFuncUrlBusiness(BaseModel):
9292
message: str
9393
username: str
94+
95+
96+
class MyLambdaKafkaBusiness(BaseModel):
97+
key: str

tests/functional/parser/test_kafka.py

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
from typing import List
2+
3+
from aws_lambda_powertools.utilities.parser import envelopes, event_parser
4+
from aws_lambda_powertools.utilities.parser.models import (
5+
KafkaMskEventModel,
6+
KafkaRecordModel,
7+
KafkaSelfManagedEventModel,
8+
)
9+
from aws_lambda_powertools.utilities.typing import LambdaContext
10+
from tests.functional.parser.schemas import MyLambdaKafkaBusiness
11+
from tests.functional.utils import load_event
12+
13+
14+
@event_parser(model=MyLambdaKafkaBusiness, envelope=envelopes.KafkaEnvelope)
15+
def handle_lambda_kafka_with_envelope(event: List[MyLambdaKafkaBusiness], _: LambdaContext):
16+
assert event[0].key == "value"
17+
assert len(event) == 1
18+
19+
20+
@event_parser(model=KafkaSelfManagedEventModel)
21+
def handle_kafka_event(event: KafkaSelfManagedEventModel, _: LambdaContext):
22+
return event
23+
24+
25+
def test_kafka_msk_event_with_envelope():
26+
event = load_event("kafkaEventMsk.json")
27+
handle_lambda_kafka_with_envelope(event, LambdaContext())
28+
29+
30+
def test_kafka_self_managed_event_with_envelope():
31+
event = load_event("kafkaEventSelfManaged.json")
32+
handle_lambda_kafka_with_envelope(event, LambdaContext())
33+
34+
35+
def test_self_managed_kafka_event():
36+
json_event = load_event("kafkaEventSelfManaged.json")
37+
event: KafkaSelfManagedEventModel = handle_kafka_event(json_event, LambdaContext())
38+
assert event.eventSource == "aws:SelfManagedKafka"
39+
bootstrap_servers = [
40+
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
41+
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
42+
]
43+
assert event.bootstrapServers == bootstrap_servers
44+
45+
records = list(event.records["mytopic-0"])
46+
assert len(records) == 1
47+
record: KafkaRecordModel = records[0]
48+
assert record.topic == "mytopic"
49+
assert record.partition == 0
50+
assert record.offset == 15
51+
assert record.timestamp is not None
52+
convert_time = int(round(record.timestamp.timestamp() * 1000))
53+
assert convert_time == 1545084650987
54+
assert record.timestampType == "CREATE_TIME"
55+
assert record.key == b"recordKey"
56+
assert record.value == '{"key":"value"}'
57+
assert len(record.headers) == 1
58+
assert record.headers[0]["headerKey"] == b"headerValue"
59+
60+
61+
@event_parser(model=KafkaMskEventModel)
62+
def handle_msk_event(event: KafkaMskEventModel, _: LambdaContext):
63+
return event
64+
65+
66+
def test_kafka_msk_event():
67+
json_event = load_event("kafkaEventMsk.json")
68+
event: KafkaMskEventModel = handle_msk_event(json_event, LambdaContext())
69+
assert event.eventSource == "aws:kafka"
70+
bootstrap_servers = [
71+
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
72+
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
73+
]
74+
assert event.bootstrapServers == bootstrap_servers
75+
assert (
76+
event.eventSourceArn
77+
== "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"
78+
)
79+
80+
records = list(event.records["mytopic-0"])
81+
assert len(records) == 1
82+
record: KafkaRecordModel = records[0]
83+
assert record.topic == "mytopic"
84+
assert record.partition == 0
85+
assert record.offset == 15
86+
assert record.timestamp is not None
87+
convert_time = int(round(record.timestamp.timestamp() * 1000))
88+
assert convert_time == 1545084650987
89+
assert record.timestampType == "CREATE_TIME"
90+
assert record.key == b"recordKey"
91+
assert record.value == '{"key":"value"}'
92+
assert len(record.headers) == 1
93+
assert record.headers[0]["headerKey"] == b"headerValue"

tests/functional/test_data_classes.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ def test_kafka_msk_event():
11741174

11751175
def test_kafka_self_managed_event():
11761176
event = KafkaEvent(load_event("kafkaEventSelfManaged.json"))
1177-
assert event.event_source == "aws:aws:SelfManagedKafka"
1177+
assert event.event_source == "aws:SelfManagedKafka"
11781178

11791179
bootstrap_servers_raw = "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" # noqa E501
11801180

0 commit comments

Comments
 (0)