Skip to content

feat(parser): add KafkaMskEventModel and KafkaSelfManagedEventModel #1499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fef70b0
docs(core): match code snippet name with filename (#1286)
heitorlessa Jul 8, 2022
d918387
fix(ci): accept core arg in label related issue workflow
heitorlessa Jul 8, 2022
13558d9
chore(ci): update changelog with latest changes
Aug 9, 2022
40ce509
docs(apigateway): removes duplicate admonition (#1426)
peterschutt Aug 9, 2022
5951ab0
chore(ci): update changelog with latest changes
Aug 9, 2022
6b23903
docs(parser): minor grammar fix (#1427)
peterschutt Aug 9, 2022
6380b63
chore(ci): update changelog with latest changes
Aug 9, 2022
8e24ae3
chore(ci): update changelog with latest changes
Aug 9, 2022
af99733
Merge branch 'develop' of https://github.com/ran-isenberg/aws-lambda-…
ran-isenberg Aug 11, 2022
6c405fa
merge
Sep 4, 2022
f2a6c9b
update changelog with latest changes
Sep 4, 2022
70de885
feature: Kafka Parser support
Sep 4, 2022
574a389
feature: Kafka Parser support
Sep 4, 2022
a0f778c
Merge branch 'develop' of https://github.com/awslabs/aws-lambda-power…
Sep 5, 2022
c0df797
cr fixes
Sep 5, 2022
a918a94
cr fixes
Sep 5, 2022
06c94c9
docs fix
Sep 6, 2022
61d833c
fixes
Sep 6, 2022
4e4282c
fixes
Sep 6, 2022
9ee356a
a
Sep 6, 2022
afbd394
fixes
Sep 6, 2022
139ffd7
Update tests/functional/parser/test_kafka.py
ran-isenberg Sep 7, 2022
bfe0803
Update tests/functional/parser/test_kafka.py
ran-isenberg Sep 7, 2022
50571b7
Update tests/functional/parser/test_kafka.py
ran-isenberg Sep 7, 2022
75e84bd
cr fixes
Sep 7, 2022
86ef6de
feat(parser): small changes
leandrodamascena Sep 7, 2022
f41c82c
feat(parser): changes in envelope
leandrodamascena Sep 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2389,4 +2389,4 @@
[v1.0.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v1.0.0...v1.0.1
[v1.0.0]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.11.0...v1.0.0
[v0.11.0]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.1...v0.11.0
[v0.10.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.0...v0.10.1
[v0.10.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.0...v0.10.1
20 changes: 20 additions & 0 deletions aws_lambda_powertools/shared/functions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import base64
import logging
from binascii import Error as BinAsciiError
from typing import Optional, Union

logger = logging.getLogger(__name__)


def strtobool(value: str) -> bool:
"""Convert a string representation of truth to True or False.
Expand Down Expand Up @@ -58,3 +63,18 @@ def resolve_env_var_choice(
resolved choice as either bool or environment value
"""
return choice if choice is not None else env


def base64_decode(value: str) -> bytes:
try:
logger.debug("Decoding base64 Kafka record item before parsing")
return base64.b64decode(value)
except (BinAsciiError, TypeError):
raise ValueError("base64 decode failed")


def bytes_to_string(value: bytes) -> str:
try:
return value.decode("utf-8")
except (BinAsciiError, TypeError):
raise ValueError("base64 UTF-8 decode failed")
5 changes: 3 additions & 2 deletions aws_lambda_powertools/utilities/data_classes/kafka_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ def get_header_value(


class KafkaEvent(DictWrapper):
"""Self-managed Apache Kafka event trigger
"""Self-managed or MSK Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
"""

@property
Expand All @@ -98,7 +99,7 @@ def event_source(self) -> str:

@property
def event_source_arn(self) -> Optional[str]:
"""The AWS service ARN from which the Kafka event record originated."""
"""The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK."""
return self.get("eventSourceArn")

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .cloudwatch import CloudWatchLogsEnvelope
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
from .kafka import KafkaEnvelope
from .kinesis import KinesisDataStreamEnvelope
from .lambda_function_url import LambdaFunctionUrlEnvelope
from .sns import SnsEnvelope, SnsSqsEnvelope
Expand All @@ -20,5 +21,6 @@
"SnsEnvelope",
"SnsSqsEnvelope",
"SqsEnvelope",
"KafkaEnvelope",
"BaseEnvelope",
]
44 changes: 44 additions & 0 deletions aws_lambda_powertools/utilities/parser/envelopes/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging
from typing import Any, Dict, List, Optional, Type, Union, cast

from ..models import KafkaMskEventModel, KafkaSelfManagedEventModel
from ..types import Model
from .base import BaseEnvelope

logger = logging.getLogger(__name__)


class KafkaEnvelope(BaseEnvelope):
"""Kafka event envelope to extract data within body key
The record's body parameter is a string, though it can also be a JSON encoded string.
Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str,
all items in the list will be parsed as str and npt as JSON (and vice versa)
"""

def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
"""Parses data found with model provided

Parameters
----------
data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns
-------
List
List of records parsed with model provided
"""
event_source = cast(dict, data).get("eventSource")
model_parse_event = KafkaMskEventModel if event_source == "aws:kafka" else KafkaSelfManagedEventModel

logger.debug(f"Parsing incoming data with Kafka event model {model_parse_event}")
parsed_envelope = model_parse_event.parse_obj(data)
logger.debug(f"Parsing Kafka event records in `value` with {model}")
ret_list = []
for records in parsed_envelope.records.values():
ret_list += [self._parse(data=record.value, model=model) for record in records]
return ret_list
5 changes: 5 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
from .event_bridge import EventBridgeModel
from .kafka import KafkaBaseEventModel, KafkaMskEventModel, KafkaRecordModel, KafkaSelfManagedEventModel
from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload
from .lambda_function_url import LambdaFunctionUrlModel
from .s3 import S3Model, S3RecordModel
Expand Down Expand Up @@ -98,4 +99,8 @@
"APIGatewayEventRequestContext",
"APIGatewayEventAuthorizer",
"APIGatewayEventIdentity",
"KafkaSelfManagedEventModel",
"KafkaRecordModel",
"KafkaMskEventModel",
"KafkaBaseEventModel",
]
65 changes: 65 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from datetime import datetime
from typing import Dict, List, Type, Union

from pydantic import BaseModel, validator

from aws_lambda_powertools.shared.functions import base64_decode, bytes_to_string
from aws_lambda_powertools.utilities.parser.types import Literal

SERVERS_DELIMITER = ","


class KafkaRecordModel(BaseModel):
topic: str
partition: int
offset: int
timestamp: datetime
timestampType: str
key: bytes
value: Union[str, Type[BaseModel]]
headers: List[Dict[str, bytes]]

# validators
_decode_key = validator("key", allow_reuse=True)(base64_decode)

@validator("value", pre=True, allow_reuse=True)
def data_base64_decode(cls, value):
as_bytes = base64_decode(value)
return bytes_to_string(as_bytes)

@validator("headers", pre=True, allow_reuse=True)
def decode_headers_list(cls, value):
for header in value:
for key, values in header.items():
header[key] = bytes(values)
return value


class KafkaBaseEventModel(BaseModel):
bootstrapServers: List[str]
records: Dict[str, List[KafkaRecordModel]]

@validator("bootstrapServers", pre=True, allow_reuse=True)
def split_servers(cls, value):
return None if not value else value.split(SERVERS_DELIMITER)


class KafkaSelfManagedEventModel(KafkaBaseEventModel):
"""Self-managed Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
"""

eventSource: Literal["aws:SelfManagedKafka"]


class KafkaMskEventModel(KafkaBaseEventModel):
"""Fully-managed AWS Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
"""

eventSource: Literal["aws:kafka"]
eventSourceArn: str
3 changes: 3 additions & 0 deletions docs/utilities/parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ Parser comes with the following built-in models:
| **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway |
| **APIGatewayProxyEventV2Model** | Lambda Event Source payload for Amazon API Gateway v2 payload |
| **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload |
| **KafkaSelfManagedEventModel** | Lambda Event Source payload for self managed Kafka payload |
| **KafkaMskEventModel** | Lambda Event Source payload for AWS MSK payload |

### extending built-in models

Expand Down Expand Up @@ -308,6 +310,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return
| **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
| **ApiGatewayV2Envelope** | 1. Parses data using `APIGatewayProxyEventV2Model`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
| **LambdaFunctionUrlEnvelope** | 1. Parses data using `LambdaFunctionUrlModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
| **KafkaEnvelope** | 1. Parses data using `KafkaRecordModel`. <br/> 2. Parses `value` key using your model and returns it. | `Model` |

### Bringing your own envelope

Expand Down
2 changes: 1 addition & 1 deletion tests/events/kafkaEventSelfManaged.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"eventSource":"aws:aws:SelfManagedKafka",
"eventSource":"aws:SelfManagedKafka",
"bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records":{
"mytopic-0":[
Expand Down
4 changes: 4 additions & 0 deletions tests/functional/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,7 @@ class MyApiGatewayBusiness(BaseModel):
class MyALambdaFuncUrlBusiness(BaseModel):
message: str
username: str


class MyLambdaKafkaBusiness(BaseModel):
key: str
93 changes: 93 additions & 0 deletions tests/functional/parser/test_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import List

from aws_lambda_powertools.utilities.parser import envelopes, event_parser
from aws_lambda_powertools.utilities.parser.models import (
KafkaMskEventModel,
KafkaRecordModel,
KafkaSelfManagedEventModel,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from tests.functional.parser.schemas import MyLambdaKafkaBusiness
from tests.functional.utils import load_event


@event_parser(model=MyLambdaKafkaBusiness, envelope=envelopes.KafkaEnvelope)
def handle_lambda_kafka_with_envelope(event: List[MyLambdaKafkaBusiness], _: LambdaContext):
assert event[0].key == "value"
assert len(event) == 1


@event_parser(model=KafkaSelfManagedEventModel)
def handle_kafka_event(event: KafkaSelfManagedEventModel, _: LambdaContext):
return event


def test_kafka_msk_event_with_envelope():
event = load_event("kafkaEventMsk.json")
handle_lambda_kafka_with_envelope(event, LambdaContext())


def test_kafka_self_managed_event_with_envelope():
event = load_event("kafkaEventSelfManaged.json")
handle_lambda_kafka_with_envelope(event, LambdaContext())


def test_self_managed_kafka_event():
json_event = load_event("kafkaEventSelfManaged.json")
event: KafkaSelfManagedEventModel = handle_kafka_event(json_event, LambdaContext())
assert event.eventSource == "aws:SelfManagedKafka"
bootstrap_servers = [
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
]
assert event.bootstrapServers == bootstrap_servers

records = list(event.records["mytopic-0"])
assert len(records) == 1
record: KafkaRecordModel = records[0]
assert record.topic == "mytopic"
assert record.partition == 0
assert record.offset == 15
assert record.timestamp is not None
convert_time = int(round(record.timestamp.timestamp() * 1000))
assert convert_time == 1545084650987
assert record.timestampType == "CREATE_TIME"
assert record.key == b"recordKey"
assert record.value == '{"key":"value"}'
assert len(record.headers) == 1
assert record.headers[0]["headerKey"] == b"headerValue"


@event_parser(model=KafkaMskEventModel)
def handle_msk_event(event: KafkaMskEventModel, _: LambdaContext):
return event


def test_kafka_msk_event():
json_event = load_event("kafkaEventMsk.json")
event: KafkaMskEventModel = handle_msk_event(json_event, LambdaContext())
assert event.eventSource == "aws:kafka"
bootstrap_servers = [
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
]
assert event.bootstrapServers == bootstrap_servers
assert (
event.eventSourceArn
== "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"
)

records = list(event.records["mytopic-0"])
assert len(records) == 1
record: KafkaRecordModel = records[0]
assert record.topic == "mytopic"
assert record.partition == 0
assert record.offset == 15
assert record.timestamp is not None
convert_time = int(round(record.timestamp.timestamp() * 1000))
assert convert_time == 1545084650987
assert record.timestampType == "CREATE_TIME"
assert record.key == b"recordKey"
assert record.value == '{"key":"value"}'
assert len(record.headers) == 1
assert record.headers[0]["headerKey"] == b"headerValue"
2 changes: 1 addition & 1 deletion tests/functional/test_data_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ def test_kafka_msk_event():

def test_kafka_self_managed_event():
event = KafkaEvent(load_event("kafkaEventSelfManaged.json"))
assert event.event_source == "aws:aws:SelfManagedKafka"
assert event.event_source == "aws:SelfManagedKafka"

bootstrap_servers_raw = "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" # noqa E501

Expand Down