Skip to content

Commit 9dd3376

Browse files
feat(kafka): add support for Confluence Producers (#6833)
* Adding support for Confluence Producers * Adding support for Confluence Producers
1 parent fe9bdcb commit 9dd3376

File tree

4 files changed

+190
-1
lines changed

4 files changed

+190
-1
lines changed

aws_lambda_powertools/utilities/kafka/deserializer/protobuf.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from typing import Any
44

5+
from google.protobuf.internal.decoder import _DecodeVarint # type: ignore[attr-defined]
56
from google.protobuf.json_format import MessageToDict
67

78
from aws_lambda_powertools.utilities.kafka.deserializer.base import DeserializerBase
@@ -43,6 +44,12 @@ def deserialize(self, data: bytes | str) -> dict:
4344
When the data cannot be deserialized according to the message class,
4445
typically due to data format incompatibility or incorrect message class.
4546
47+
Notes
48+
-----
49+
This deserializer handles both standard Protocol Buffer format and the Confluent
50+
Schema Registry format which includes message index information. It will first try
51+
standard deserialization and fall back to message index handling if needed.
52+
4653
Example
4754
--------
4855
>>> # Assuming proper protobuf setup
@@ -54,11 +61,56 @@ def deserialize(self, data: bytes | str) -> dict:
5461
... except KafkaConsumerDeserializationError as e:
5562
... print(f"Failed to deserialize: {e}")
5663
"""
64+
value = self._decode_input(data)
5765
try:
58-
value = self._decode_input(data)
5966
message = self.message_class()
6067
message.ParseFromString(value)
6168
return MessageToDict(message, preserving_proto_field_name=True)
69+
except Exception:
70+
return self._deserialize_with_message_index(value, self.message_class())
71+
72+
def _deserialize_with_message_index(self, data: bytes, parser: Any) -> dict:
73+
"""
74+
Deserialize protobuf message with Confluent message index handling.
75+
76+
Parameters
77+
----------
78+
data : bytes
79+
data
80+
parser : google.protobuf.message.Message
81+
Protobuf message instance to parse the data into
82+
83+
Returns
84+
-------
85+
dict
86+
Dictionary representation of the parsed protobuf message with original field names
87+
88+
Raises
89+
------
90+
KafkaConsumerDeserializationError
91+
If deserialization fails
92+
93+
Notes
94+
-----
95+
This method handles the special case of Confluent Schema Registry's message index
96+
format, where the message is prefixed with either a single 0 (for the first schema)
97+
or a list of schema indexes. The actual protobuf message follows these indexes.
98+
"""
99+
100+
buffer = memoryview(data)
101+
pos = 0
102+
103+
try:
104+
first_value, new_pos = _DecodeVarint(buffer, pos)
105+
pos = new_pos
106+
107+
if first_value != 0:
108+
for _ in range(first_value):
109+
_, new_pos = _DecodeVarint(buffer, pos)
110+
pos = new_pos
111+
112+
parser.ParseFromString(data[pos:])
113+
return MessageToDict(parser, preserving_proto_field_name=True)
62114
except Exception as e:
63115
raise KafkaConsumerDeserializationError(
64116
f"Error trying to deserialize protobuf data - {type(e).__name__}: {str(e)}",
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
syntax = "proto3";
2+
3+
package org.demo.kafka.protobuf;
4+
5+
option java_package = "org.demo.kafka.protobuf";
6+
option java_outer_classname = "ProtobufProductOuterClass";
7+
option java_multiple_files = true;
8+
9+
message ProtobufProduct {
10+
int32 id = 1;
11+
string name = 2;
12+
double price = 3;
13+
}

tests/functional/kafka_consumer/_protobuf/confluent_protobuf_pb2.py

Lines changed: 37 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/functional/kafka_consumer/_protobuf/test_kafka_consumer_with_protobuf.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
from aws_lambda_powertools.utilities.kafka.kafka_consumer import kafka_consumer
1313
from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig
1414

15+
# Import confluent complex schema
16+
from .confluent_protobuf_pb2 import ProtobufProduct
17+
1518
# Import the generated protobuf classes
1619
from .user_pb2 import Key, User
1720

@@ -335,3 +338,87 @@ def test_kafka_consumer_without_protobuf_key_schema():
335338
# Verify the error message mentions the missing key schema
336339
assert "key_schema" in str(excinfo.value)
337340
assert "PROTOBUF" in str(excinfo.value)
341+
342+
343+
def test_confluent_complex_schema(lambda_context):
344+
# GIVEN
345+
# A scenario where a complex schema is used with the PROTOBUF schema type
346+
complex_event = {
347+
"eventSource": "aws:kafka",
348+
"eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234",
349+
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
350+
"records": {
351+
"mytopic-0": [
352+
{
353+
"topic": "mytopic",
354+
"partition": 0,
355+
"offset": 15,
356+
"timestamp": 1545084650987,
357+
"timestampType": "CREATE_TIME",
358+
"key": "NDI=",
359+
"value": "COkHEgZMYXB0b3AZUrgehes/j0A=",
360+
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}],
361+
},
362+
{
363+
"topic": "mytopic",
364+
"partition": 0,
365+
"offset": 16,
366+
"timestamp": 1545084650988,
367+
"timestampType": "CREATE_TIME",
368+
"key": "NDI=",
369+
"value": "AAjpBxIGTGFwdG9wGVK4HoXrP49A",
370+
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}],
371+
},
372+
{
373+
"topic": "mytopic",
374+
"partition": 0,
375+
"offset": 17,
376+
"timestamp": 1545084650989,
377+
"timestampType": "CREATE_TIME",
378+
"key": "NDI=",
379+
"value": "AgEACOkHEgZMYXB0b3AZUrgehes/j0A=",
380+
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}],
381+
},
382+
],
383+
},
384+
}
385+
386+
# GIVEN A Kafka consumer configured to deserialize Protobuf data
387+
# using the User protobuf message type as the schema
388+
schema_config = SchemaConfig(
389+
value_schema_type="PROTOBUF",
390+
value_schema=ProtobufProduct,
391+
)
392+
393+
processed_records = []
394+
395+
@kafka_consumer(schema_config=schema_config)
396+
def handler(event: ConsumerRecords, context):
397+
for record in event.records:
398+
processed_records.append(
399+
{"id": record.value["id"], "name": record.value["name"], "price": record.value["price"]},
400+
)
401+
return {"processed": len(processed_records)}
402+
403+
# WHEN The handler processes a Kafka event containing Protobuf-encoded data
404+
result = handler(complex_event, lambda_context)
405+
406+
# THEN
407+
# The handler should successfully process both records
408+
# and return the correct count
409+
assert result == {"processed": 3}
410+
411+
# All records should be correctly deserialized with proper values
412+
assert len(processed_records) == 3
413+
414+
# First record should contain decoded values
415+
assert processed_records[0]["id"] == 1001
416+
assert processed_records[0]["name"] == "Laptop"
417+
418+
# Second record should contain decoded values
419+
assert processed_records[1]["id"] == 1001
420+
assert processed_records[1]["name"] == "Laptop"
421+
422+
# Third record should contain decoded values
423+
assert processed_records[2]["id"] == 1001
424+
assert processed_records[2]["name"] == "Laptop"

0 commit comments

Comments
 (0)