Skip to content

Commit e3aa284

Browse files
author
Ran Isenberg
committed
feature: Kafka Parser support
1 parent f2a6c9b commit e3aa284

File tree

4 files changed

+194
-0
lines changed

4 files changed

+194
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import logging
2+
from typing import Any, Dict, List, Optional, Type, Union
3+
4+
from ..models import KafkaEventModel
5+
from ..types import Model
6+
from .base import BaseEnvelope
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class KafkaEnvelope(BaseEnvelope):
12+
"""Kafka event envelope to extract data within body key
13+
The record's body parameter is a string, though it can also be a JSON encoded string.
14+
Regardless of its type it'll be parsed into a BaseModel object.
15+
16+
Note: Records will be parsed the same way so if model is str,
17+
all items in the list will be parsed as str and npt as JSON (and vice versa)
18+
"""
19+
20+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
21+
"""Parses data found with model provided
22+
23+
Parameters
24+
----------
25+
data : Dict
26+
Lambda event to be parsed
27+
model : Type[Model]
28+
Data model provided to parse after extracting data using envelope
29+
30+
Returns
31+
-------
32+
Any
33+
Parsed detail payload with model provided
34+
"""
35+
logger.debug(f"Parsing incoming data with Kafka event model {KafkaEventModel}")
36+
parsed_envelope: KafkaEventModel = KafkaEventModel.parse_obj(data)
37+
logger.debug(f"Parsing Kafka event records in `value` with {model}")
38+
ret_list = []
39+
for records in parsed_envelope.records.values():
40+
ret_list += [self._parse(data=record.value, model=model) for record in records]
41+
return ret_list
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import base64
2+
import logging
3+
from binascii import Error as BinAsciiError
4+
from datetime import datetime
5+
from typing import Dict, List, Optional, Type, Union
6+
7+
from pydantic import BaseModel, validator
8+
9+
from aws_lambda_powertools.utilities.parser.types import Literal
10+
11+
SERVERS_DELIMITER = ","
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def _base64_decode(value: str) -> bytes:
17+
try:
18+
logger.debug("Decoding base64 Kafka record item before parsing")
19+
return base64.b64decode(value)
20+
except (BinAsciiError, TypeError):
21+
raise ValueError("base64 decode failed")
22+
23+
24+
def _bytes_to_string(value: bytes) -> str:
25+
try:
26+
return value.decode("utf-8")
27+
except (BinAsciiError, TypeError):
28+
raise ValueError("base64 UTF-8 decode failed")
29+
30+
31+
class KafkaRecordModel(BaseModel):
32+
topic: str
33+
partition: int
34+
offset: int
35+
timestamp: datetime
36+
timestampType: str
37+
key: bytes
38+
value: Union[str, Type[BaseModel]]
39+
headers: List[Dict[str, bytes]]
40+
41+
# validators
42+
_decode_key = validator("key", allow_reuse=True)(_base64_decode)
43+
44+
@validator("value", pre=True, allow_reuse=True)
45+
def data_base64_decode(cls, value):
46+
as_bytes = _base64_decode(value)
47+
return _bytes_to_string(as_bytes)
48+
49+
@validator("headers", pre=True, allow_reuse=True)
50+
def decode_headers_list(cls, value):
51+
for header in value:
52+
for key, values in header.items():
53+
header[key] = bytes(values)
54+
return value
55+
56+
57+
class KafkaEventModel(BaseModel):
58+
"""Self-managed Apache Kafka event trigger
59+
Documentation:
60+
--------------
61+
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
62+
"""
63+
64+
eventSource: Literal["aws:SelfManagedKafka"]
65+
bootstrapServers: Optional[List[str]]
66+
records: Dict[str, List[KafkaRecordModel]]
67+
68+
@validator("bootstrapServers", pre=True, allow_reuse=True)
69+
def split_servers(cls, value):
70+
return None if not value else value.split(SERVERS_DELIMITER)

tests/events/kafkaEvent.json

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{
2+
"eventSource": "aws:SelfManagedKafka",
3+
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
4+
"records": {
5+
"mytopic-0": [
6+
{
7+
"topic": "mytopic",
8+
"partition": 0,
9+
"offset": 15,
10+
"timestamp": 1545084650987,
11+
"timestampType": "CREATE_TIME",
12+
"key": "cmVjb3JkS2V5",
13+
"value": "eyJrZXkiOiJ2YWx1ZSJ9",
14+
"headers": [
15+
{
16+
"headerKey": [
17+
104,
18+
101,
19+
97,
20+
100,
21+
101,
22+
114,
23+
86,
24+
97,
25+
108,
26+
117,
27+
101
28+
]
29+
}
30+
]
31+
}
32+
]
33+
}
34+
}

tests/functional/parser/test_kafka.py

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from typing import List
2+
3+
from aws_lambda_powertools.utilities.parser import envelopes, event_parser
4+
from aws_lambda_powertools.utilities.parser.models import KafkaEventModel, KafkaRecordModel
5+
from aws_lambda_powertools.utilities.typing import LambdaContext
6+
from tests.functional.parser.schemas import MyALambdaKafkaBusiness
7+
from tests.functional.utils import load_event
8+
9+
10+
@event_parser(model=MyALambdaKafkaBusiness, envelope=envelopes.KafkaEnvelope)
11+
def handle_lambda_kafka_with_envelope(event: List[MyALambdaKafkaBusiness], _: LambdaContext):
12+
assert event[0].key == "value"
13+
assert len(event) == 1
14+
15+
16+
@event_parser(model=KafkaEventModel)
17+
def handle_kafka_event(event: KafkaEventModel, _: LambdaContext):
18+
return event
19+
20+
21+
def test_kafka_event_with_envelope():
22+
event = load_event("kafkaEvent.json")
23+
handle_lambda_kafka_with_envelope(event, LambdaContext())
24+
25+
26+
def test_kafka_event():
27+
json_event = load_event("kafkaEvent.json")
28+
event: KafkaEventModel = handle_kafka_event(json_event, LambdaContext())
29+
assert event.eventSource == "aws:SelfManagedKafka"
30+
bootstrap_servers = [
31+
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
32+
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
33+
]
34+
assert event.bootstrapServers == bootstrap_servers
35+
36+
records = list(event.records["mytopic-0"])
37+
assert len(records) == 1
38+
record: KafkaRecordModel = records[0]
39+
assert record.topic == "mytopic"
40+
assert record.partition == 0
41+
assert record.offset == 15
42+
assert record.timestamp is not None
43+
convert_time = int(round(record.timestamp.timestamp() * 1000))
44+
assert convert_time == 1545084650987
45+
assert record.timestampType == "CREATE_TIME"
46+
assert record.key == b"recordKey"
47+
assert record.value == '{"key":"value"}'
48+
assert len(record.headers) == 1
49+
assert record.headers[0]["headerKey"] == b"headerValue"

0 commit comments

Comments
 (0)