Skip to content

Commit d57bb46

Browse files
docs(kafka): add kafka documentation (#6834)
Adding snippets
1 parent 9dd3376 commit d57bb46

9 files changed

+1165
-0
lines changed

docs/utilities/kafka.md

Lines changed: 973 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from aws_lambda_powertools import Logger
2+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
5+
logger = Logger()
6+
7+
# Define the Avro schema
8+
avro_schema = """
9+
{
10+
"type": "record",
11+
"name": "User",
12+
"namespace": "com.example",
13+
"fields": [
14+
{"name": "name", "type": "string"},
15+
{"name": "age", "type": "int"}
16+
]
17+
}
18+
"""
19+
20+
# Configure schema
21+
schema_config = SchemaConfig(
22+
value_schema_type="AVRO",
23+
value_schema=avro_schema,
24+
)
25+
26+
27+
@kafka_consumer(schema_config=schema_config)
28+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
29+
for record in event.records:
30+
user = record.value # Dictionary from avro message
31+
32+
logger.info(f"Processing user: {user['name']}, age {user['age']}")
33+
34+
return {"statusCode": 200}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from aws_lambda_powertools import Logger
2+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
5+
logger = Logger()
6+
7+
# Configure schema
8+
schema_config = SchemaConfig(value_schema_type="JSON")
9+
10+
11+
@kafka_consumer(schema_config=schema_config)
12+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
13+
for record in event.records:
14+
user = record.value # Dictionary from avro message
15+
16+
logger.info(f"Processing user: {user['name']}, age {user['age']}")
17+
18+
return {"statusCode": 200}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from aws_lambda_powertools import Logger
2+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
5+
# Import generated protobuf class
6+
from .user_pb2 import User # type: ignore[import-not-found]
7+
8+
logger = Logger()
9+
10+
# Configure schema for protobuf
11+
schema_config = SchemaConfig(
12+
value_schema_type="PROTOBUF",
13+
value_schema=User, # The protobuf message class
14+
)
15+
16+
17+
@kafka_consumer(schema_config=schema_config)
18+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
19+
for record in event.records:
20+
user = record.value # Dictionary from avro message
21+
22+
logger.info(f"Processing user: {user['name']}, age {user['age']}")
23+
24+
return {"statusCode": 200}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from aws_lambda_powertools import Logger
2+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
5+
logger = Logger()
6+
7+
# Define schemas for both components
8+
key_schema = """
9+
{
10+
"type": "record",
11+
"name": "ProductKey",
12+
"fields": [
13+
{"name": "product_id", "type": "string"}
14+
]
15+
}
16+
"""
17+
18+
value_schema = """
19+
{
20+
"type": "record",
21+
"name": "ProductInfo",
22+
"fields": [
23+
{"name": "name", "type": "string"},
24+
{"name": "price", "type": "double"},
25+
{"name": "in_stock", "type": "boolean"}
26+
]
27+
}
28+
"""
29+
30+
# Configure both key and value schemas
31+
schema_config = SchemaConfig(
32+
key_schema_type="AVRO",
33+
key_schema=key_schema,
34+
value_schema_type="AVRO",
35+
value_schema=value_schema,
36+
)
37+
38+
39+
@kafka_consumer(schema_config=schema_config)
40+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
41+
for record in event.records:
42+
# Access both deserialized components
43+
key = record.key
44+
value = record.value
45+
46+
logger.info(f"Processing key: {key['product_id']}")
47+
logger.info(f"Processing value: {value['name']}")
48+
49+
return {"statusCode": 200}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from aws_lambda_powertools import Logger
2+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
5+
logger = Logger()
6+
7+
# Only configure value schema
8+
schema_config = SchemaConfig(value_schema_type="JSON")
9+
10+
11+
@kafka_consumer(schema_config=schema_config)
12+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
13+
for record in event.records:
14+
# Key is automatically decoded as UTF-8 string
15+
key = record.key
16+
17+
# Value is deserialized as JSON
18+
value = record.value
19+
20+
logger.info(f"Processing key: {key}")
21+
logger.info(f"Processing value: {value['name']}")
22+
23+
return {"statusCode": 200}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from aws_lambda_powertools import Logger
2+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, kafka_consumer
3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
5+
logger = Logger()
6+
7+
8+
@kafka_consumer
9+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
10+
for record in event.records:
11+
# Key is automatically decoded as UTF-8 string
12+
key = record.key
13+
14+
# Value is automatically decoded as UTF-8 string
15+
value = record.value
16+
17+
logger.info(f"Processing key: {key}")
18+
logger.info(f"Processing value: {value}")
19+
20+
return {"statusCode": 200}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from aws_lambda_powertools import Logger
2+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
5+
logger = Logger()
6+
7+
# Configure only value schema
8+
schema_config = SchemaConfig(value_schema_type="JSON")
9+
10+
11+
@kafka_consumer(schema_config=schema_config)
12+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
13+
for record in event.records:
14+
# Key remains as string (if present)
15+
if record.key is not None:
16+
logger.info(f"Message key: {record.key}")
17+
18+
# Value is deserialized as JSON
19+
value = record.value
20+
logger.info(f"Order #{value['order_id']} - Total: ${value['total']}")
21+
22+
return {"statusCode": 200}

mkdocs.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ nav:
2828
- core/event_handler/bedrock_agents.md
2929
- utilities/parameters.md
3030
- utilities/batch.md
31+
- utilities/kafka.md
3132
- utilities/typing.md
3233
- utilities/validation.md
3334
- utilities/data_classes.md
@@ -220,6 +221,7 @@ plugins:
220221
- utilities/parameters.md
221222
- utilities/batch.md
222223
- utilities/typing.md
224+
- utilities/kafka.md
223225
- utilities/validation.md
224226
- utilities/data_classes.md
225227
- utilities/parser.md

0 commit comments

Comments
 (0)