Skip to content

feat(kafka): New Kafka utility #6821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f998fdf
Adding support for Kafka Consumer - first commit
leandrodamascena May 12, 2025
32dc4b2
Adding support for Kafka Consumer - first commit
leandrodamascena May 13, 2025
e3ce2a8
Adding exports
leandrodamascena May 25, 2025
5e3fa47
Refactoring functions
leandrodamascena Jun 5, 2025
e9a4d21
Refactoring functions
leandrodamascena Jun 5, 2025
3fe79fe
Adding docstring
leandrodamascena Jun 5, 2025
5e8380a
Adding docstring
leandrodamascena Jun 6, 2025
217f2a1
Adding docstring
leandrodamascena Jun 6, 2025
243d973
Fix mypy stuff
leandrodamascena Jun 9, 2025
735ce0f
Fix mypy stuff
leandrodamascena Jun 11, 2025
bc0d83a
Fix mypy stuff
leandrodamascena Jun 11, 2025
d371657
Fix mypy stuff
leandrodamascena Jun 11, 2025
cf5bd4b
Adding protobuf tests
leandrodamascena Jun 11, 2025
8ea9fc7
Adding json tests
leandrodamascena Jun 11, 2025
e93d74d
Adding docs
leandrodamascena Jun 11, 2025
cc70001
Adding docs
leandrodamascena Jun 12, 2025
1099cba
Internal refactoring
leandrodamascena Jun 18, 2025
7d2bae7
Internal refactoring
leandrodamascena Jun 18, 2025
1c9606a
Cleaning up the PR
leandrodamascena Jun 18, 2025
7ec47f1
Renaming namespace
leandrodamascena Jun 18, 2025
b5e1f40
Refactoring tests
leandrodamascena Jun 18, 2025
7c565e7
Refactoring tests
leandrodamascena Jun 18, 2025
13284b3
Make mypy happy
leandrodamascena Jun 18, 2025
2445889
Refactoring tests
leandrodamascena Jun 18, 2025
34072ed
Refactoring tests
leandrodamascena Jun 18, 2025
b0d6aed
Refactoring tests
leandrodamascena Jun 18, 2025
f7ac013
Refactoring tests
leandrodamascena Jun 18, 2025
01f72a2
Refactoring tests
leandrodamascena Jun 18, 2025
ffaf334
Refactoring tests
leandrodamascena Jun 18, 2025
8059b5f
Refactoring tests
leandrodamascena Jun 18, 2025
eb64050
Refactoring tests
leandrodamascena Jun 18, 2025
c4e1f72
Refactoring tests
leandrodamascena Jun 18, 2025
9840c70
Refactoring tests
leandrodamascena Jun 18, 2025
ab5a37d
Refactoring tests
leandrodamascena Jun 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws_lambda_powertools/event_handler/api_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def __init__(

# OpenAPI spec only understands paths with { }. So we'll have to convert Powertools' < >.
# https://swagger.io/specification/#path-templating
self.openapi_path = re.sub(r"<(.*?)>", lambda m: f"{{{''.join(m.group(1))}}}", self.path)
self.openapi_path = re.sub(r"<(.*?)>", lambda m: f"{{{''.join(m.group(1))}}}", self.path) # type: ignore[arg-type]

self.rule = rule
self.func = func
Expand Down
58 changes: 45 additions & 13 deletions aws_lambda_powertools/utilities/data_classes/kafka_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,19 @@
from collections.abc import Iterator


class KafkaEventRecord(DictWrapper):
class KafkaEventRecordSchemaMetadata(DictWrapper):
@property
def data_format(self) -> str | None:
"""The data format of the Kafka record."""
return self.get("dataFormat", None)

@property
def schema_id(self) -> str | None:
"""The schema id of the Kafka record."""
return self.get("schemaId", None)


class KafkaEventRecordBase(DictWrapper):
@property
def topic(self) -> str:
"""The Kafka topic."""
Expand All @@ -36,6 +48,24 @@ def timestamp_type(self) -> str:
"""The Kafka record timestamp type."""
return self["timestampType"]

@property
def key_schema_metadata(self) -> KafkaEventRecordSchemaMetadata | None:
"""The metadata of the Key Kafka record."""
return (
None if self.get("keySchemaMetadata") is None else KafkaEventRecordSchemaMetadata(self["keySchemaMetadata"])
)

@property
def value_schema_metadata(self) -> KafkaEventRecordSchemaMetadata | None:
"""The metadata of the Value Kafka record."""
return (
None
if self.get("valueSchemaMetadata") is None
else KafkaEventRecordSchemaMetadata(self["valueSchemaMetadata"])
)


class KafkaEventRecord(KafkaEventRecordBase):
@property
def key(self) -> str | None:
"""
Expand Down Expand Up @@ -83,18 +113,7 @@ def decoded_headers(self) -> dict[str, bytes]:
return CaseInsensitiveDict((k, bytes(v)) for chunk in self.headers for k, v in chunk.items())


class KafkaEvent(DictWrapper):
"""Self-managed or MSK Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
"""

def __init__(self, data: dict[str, Any]):
super().__init__(data)
self._records: Iterator[KafkaEventRecord] | None = None

class KafkaEventBase(DictWrapper):
@property
def event_source(self) -> str:
"""The AWS service from which the Kafka event record originated."""
Expand All @@ -115,6 +134,19 @@ def decoded_bootstrap_servers(self) -> list[str]:
"""The decoded Kafka bootstrap URL."""
return self.bootstrap_servers.split(",")


class KafkaEvent(KafkaEventBase):
"""Self-managed or MSK Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
"""

def __init__(self, data: dict[str, Any]):
super().__init__(data)
self._records: Iterator[KafkaEventRecord] | None = None

@property
def records(self) -> Iterator[KafkaEventRecord]:
"""The Kafka records."""
Expand Down
9 changes: 9 additions & 0 deletions aws_lambda_powertools/utilities/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from aws_lambda_powertools.utilities.kafka.consumer_records import ConsumerRecords
from aws_lambda_powertools.utilities.kafka.kafka_consumer import kafka_consumer
from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig

__all__ = [
"kafka_consumer",
"ConsumerRecords",
"SchemaConfig",
]
144 changes: 144 additions & 0 deletions aws_lambda_powertools/utilities/kafka/consumer_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.utilities.data_classes.common import CaseInsensitiveDict
from aws_lambda_powertools.utilities.data_classes.kafka_event import KafkaEventBase, KafkaEventRecordBase
from aws_lambda_powertools.utilities.kafka.deserializer.deserializer import get_deserializer
from aws_lambda_powertools.utilities.kafka.serialization.serialization import serialize_to_output_type

if TYPE_CHECKING:
from collections.abc import Iterator

from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig


class ConsumerRecordRecords(KafkaEventRecordBase):
"""
A Kafka Consumer Record
"""

def __init__(self, data: dict[str, Any], schema_config: SchemaConfig | None = None):
super().__init__(data)
self.schema_config = schema_config

@cached_property
def key(self) -> Any:
key = self.get("key")

# Return None if key doesn't exist
if not key:
return None

# Determine schema type and schema string
schema_type = None
schema_str = None
output_serializer = None

if self.schema_config and self.schema_config.key_schema_type:
schema_type = self.schema_config.key_schema_type
schema_str = self.schema_config.key_schema
output_serializer = self.schema_config.key_output_serializer

# Always use get_deserializer if None it will default to DEFAULT
deserializer = get_deserializer(schema_type, schema_str)
deserialized_value = deserializer.deserialize(key)

# Apply output serializer if specified
if output_serializer:
return serialize_to_output_type(deserialized_value, output_serializer)

return deserialized_value

@cached_property
def value(self) -> Any:
value = self["value"]

# Determine schema type and schema string
schema_type = None
schema_str = None
output_serializer = None

if self.schema_config and self.schema_config.value_schema_type:
schema_type = self.schema_config.value_schema_type
schema_str = self.schema_config.value_schema
output_serializer = self.schema_config.value_output_serializer

# Always use get_deserializer if None it will default to DEFAULT
deserializer = get_deserializer(schema_type, schema_str)
deserialized_value = deserializer.deserialize(value)

# Apply output serializer if specified
if output_serializer:
return serialize_to_output_type(deserialized_value, output_serializer)

return deserialized_value

@property
def original_value(self) -> str:
"""The original (base64 encoded) Kafka record value."""
return self["value"]

@property
def original_key(self) -> str | None:
"""
The original (base64 encoded) Kafka record key.

This key is optional; if not provided,
a round-robin algorithm will be used to determine
the partition for the message.
"""

return self.get("key")

@property
def original_headers(self) -> list[dict[str, list[int]]]:
"""The raw Kafka record headers."""
return self["headers"]

@cached_property
def headers(self) -> dict[str, bytes]:
"""Decodes the headers as a single dictionary."""
return CaseInsensitiveDict((k, bytes(v)) for chunk in self.original_headers for k, v in chunk.items())


class ConsumerRecords(KafkaEventBase):
"""Self-managed or MSK Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
"""

def __init__(self, data: dict[str, Any], schema_config: SchemaConfig | None = None):
super().__init__(data)
self._records: Iterator[ConsumerRecordRecords] | None = None
self.schema_config = schema_config

@property
def records(self) -> Iterator[ConsumerRecordRecords]:
"""The Kafka records."""
for chunk in self["records"].values():
for record in chunk:
yield ConsumerRecordRecords(data=record, schema_config=self.schema_config)

@property
def record(self) -> ConsumerRecordRecords:
"""
Returns the next Kafka record using an iterator.

Returns
-------
ConsumerRecordRecords
The next Kafka record.

Raises
------
StopIteration
If there are no more records available.

"""
if self._records is None:
self._records = self.records
return next(self._records)
Empty file.
71 changes: 71 additions & 0 deletions aws_lambda_powertools/utilities/kafka/deserializer/avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

import io

from avro.io import BinaryDecoder, DatumReader
from avro.schema import parse as parse_schema

from aws_lambda_powertools.utilities.kafka.deserializer.base import DeserializerBase
from aws_lambda_powertools.utilities.kafka.exceptions import (
KafkaConsumerAvroSchemaParserError,
KafkaConsumerDeserializationError,
)


class AvroDeserializer(DeserializerBase):
"""
Deserializer for Apache Avro formatted data.

This class provides functionality to deserialize Avro binary data using
a provided Avro schema definition.
"""

def __init__(self, schema_str: str):
try:
self.parsed_schema = parse_schema(schema_str)
self.reader = DatumReader(self.parsed_schema)
except Exception as e:
raise KafkaConsumerAvroSchemaParserError(
f"Invalid Avro schema. Please ensure the provided avro schema is valid: {type(e).__name__}: {str(e)}",
) from e

def deserialize(self, data: bytes | str) -> object:
"""
Deserialize Avro binary data to a Python dictionary.

Parameters
----------
data : bytes or str
The Avro binary data to deserialize. If provided as a string,
it will be decoded to bytes first.

Returns
-------
dict[str, Any]
Deserialized data as a dictionary.

Raises
------
KafkaConsumerDeserializationError
When the data cannot be deserialized according to the schema,
typically due to data format incompatibility.

Examples
--------
>>> deserializer = AvroDeserializer(schema_str)
>>> avro_data = b'...' # binary Avro data
>>> try:
... result = deserializer.deserialize(avro_data)
... # Process the deserialized data
... except KafkaConsumerDeserializationError as e:
... print(f"Failed to deserialize: {e}")
"""
try:
value = self._decode_input(data)
bytes_reader = io.BytesIO(value)
decoder = BinaryDecoder(bytes_reader)
return self.reader.read(decoder)
except Exception as e:
raise KafkaConsumerDeserializationError(
f"Error trying to deserialize avro data - {type(e).__name__}: {str(e)}",
) from e
52 changes: 52 additions & 0 deletions aws_lambda_powertools/utilities/kafka/deserializer/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from __future__ import annotations

import base64
from abc import ABC, abstractmethod
from typing import Any


class DeserializerBase(ABC):
"""
Abstract base class for deserializers.

This class defines the interface for all deserializers in the Kafka consumer utility
and provides a common method for decoding input data.

Methods
-------
deserialize(data)
Abstract method that must be implemented by subclasses to deserialize data.
_decode_input(data)
Helper method to decode input data to bytes.

Examples
--------
>>> class MyDeserializer(DeserializerBase):
... def deserialize(self, data: bytes | str) -> dict[str, Any]:
... value = self._decode_input(data)
... # Custom deserialization logic here
... return {"key": "value"}
"""

@abstractmethod
def deserialize(self, data: str) -> dict[str, Any] | str | object:
"""
Deserialize input data to a Python dictionary.

This abstract method must be implemented by subclasses to provide
specific deserialization logic.

Parameters
----------
data : str
The data to deserialize, it's always a base64 encoded string

Returns
-------
dict[str, Any]
The deserialized data as a dictionary.
"""
raise NotImplementedError("Subclasses must implement the deserialize method")

def _decode_input(self, data: bytes | str) -> bytes:
return base64.b64decode(data)
Loading
Loading