Skip to content

Commit 70de885

Browse files
author
Ran Isenberg
committed
feature: Kafka Parser support
1 parent f2a6c9b commit 70de885

File tree

10 files changed

+1574
-764
lines changed

10 files changed

+1574
-764
lines changed
+39-15
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,53 @@
1-
module.exports = async ({github, context, core}) => {
2-
const prBody = context.payload.body;
3-
const prNumber = context.payload.number;
4-
const releaseLabel = process.env.RELEASE_LABEL;
5-
const maintainersTeam = process.env.MAINTAINERS_TEAM
1+
const {
2+
PR_AUTHOR,
3+
PR_BODY,
4+
PR_NUMBER,
5+
IGNORE_AUTHORS,
6+
LABEL_PENDING_RELEASE,
7+
HANDLE_MAINTAINERS_TEAM,
8+
PR_IS_MERGED,
9+
} = require("./constants")
610

7-
const RELATED_ISSUE_REGEX = /Issue number:[^\d\r\n]+(?<issue>\d+)/;
11+
module.exports = async ({ github, context, core }) => {
12+
if (IGNORE_AUTHORS.includes(PR_AUTHOR)) {
13+
return core.notice("Author in IGNORE_AUTHORS list; skipping...")
14+
}
815

9-
const isMatch = RELATED_ISSUE_REGEX.exec(prBody);
16+
if (PR_IS_MERGED == "false") {
17+
return core.notice("Only merged PRs to avoid spam; skipping")
18+
}
19+
20+
const RELATED_ISSUE_REGEX = /Issue number:[^\d\r\n]+(?<issue>\d+)/;
21+
22+
const isMatch = RELATED_ISSUE_REGEX.exec(PR_BODY);
23+
24+
try {
1025
if (!isMatch) {
11-
core.setFailed(`Unable to find related issue for PR number ${prNumber}.\n\n Body details: ${prBody}`);
26+
core.setFailed(`Unable to find related issue for PR number ${PR_NUMBER}.\n\n Body details: ${PR_BODY}`);
1227
return await github.rest.issues.createComment({
1328
owner: context.repo.owner,
1429
repo: context.repo.repo,
15-
body: `${maintainersTeam} No related issues found. Please ensure '${releaseLabel}' label is applied before releasing.`,
16-
issue_number: prNumber,
30+
body: `${HANDLE_MAINTAINERS_TEAM} No related issues found. Please ensure '${LABEL_PENDING_RELEASE}' label is applied before releasing.`,
31+
issue_number: PR_NUMBER,
1732
});
1833
}
34+
} catch (error) {
35+
core.setFailed(`Unable to create comment on PR number ${PR_NUMBER}.\n\n Error details: ${error}`);
36+
throw new Error(error);
37+
}
1938

20-
const { groups: {relatedIssueNumber} } = isMatch
39+
const { groups: { issue } } = isMatch
2140

22-
core.info(`Auto-labeling related issue ${relatedIssueNumber} for release`)
41+
try {
42+
core.info(`Auto-labeling related issue ${issue} for release`)
2343
return await github.rest.issues.addLabels({
24-
issue_number: relatedIssueNumber,
44+
issue_number: issue,
2545
owner: context.repo.owner,
2646
repo: context.repo.repo,
27-
labels: [releaseLabel]
47+
labels: [LABEL_PENDING_RELEASE]
2848
})
29-
}
49+
} catch (error) {
50+
core.setFailed(`Is this issue number (${issue}) valid? Perhaps a discussion?`);
51+
throw new Error(error);
52+
}
53+
}

CHANGELOG.md

+1,330-749
Large diffs are not rendered by default.

aws_lambda_powertools/utilities/parser/envelopes/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .cloudwatch import CloudWatchLogsEnvelope
55
from .dynamodb import DynamoDBStreamEnvelope
66
from .event_bridge import EventBridgeEnvelope
7+
from .kafka import KafkaEnvelope
78
from .kinesis import KinesisDataStreamEnvelope
89
from .lambda_function_url import LambdaFunctionUrlEnvelope
910
from .sns import SnsEnvelope, SnsSqsEnvelope
@@ -20,5 +21,6 @@
2021
"SnsEnvelope",
2122
"SnsSqsEnvelope",
2223
"SqsEnvelope",
24+
"KafkaEnvelope",
2325
"BaseEnvelope",
2426
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import logging
2+
from typing import Any, Dict, List, Optional, Type, Union
3+
4+
from ..models import KafkaEventModel
5+
from ..types import Model
6+
from .base import BaseEnvelope
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class KafkaEnvelope(BaseEnvelope):
12+
"""Kafka event envelope to extract data within body key
13+
The record's body parameter is a string, though it can also be a JSON encoded string.
14+
Regardless of its type it'll be parsed into a BaseModel object.
15+
16+
Note: Records will be parsed the same way so if model is str,
17+
all items in the list will be parsed as str and npt as JSON (and vice versa)
18+
"""
19+
20+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
21+
"""Parses data found with model provided
22+
23+
Parameters
24+
----------
25+
data : Dict
26+
Lambda event to be parsed
27+
model : Type[Model]
28+
Data model provided to parse after extracting data using envelope
29+
30+
Returns
31+
-------
32+
Any
33+
Parsed detail payload with model provided
34+
"""
35+
logger.debug(f"Parsing incoming data with Kafka event model {KafkaEventModel}")
36+
parsed_envelope: KafkaEventModel = KafkaEventModel.parse_obj(data)
37+
logger.debug(f"Parsing Kafka event records in `value` with {model}")
38+
ret_list = []
39+
for records in parsed_envelope.records.values():
40+
ret_list += [self._parse(data=record.value, model=model) for record in records]
41+
return ret_list

aws_lambda_powertools/utilities/parser/models/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
1818
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
1919
from .event_bridge import EventBridgeModel
20+
from .kafka import KafkaEventModel, KafkaRecordModel
2021
from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload
2122
from .lambda_function_url import LambdaFunctionUrlModel
2223
from .s3 import S3Model, S3RecordModel
@@ -98,4 +99,6 @@
9899
"APIGatewayEventRequestContext",
99100
"APIGatewayEventAuthorizer",
100101
"APIGatewayEventIdentity",
102+
"KafkaEventModel",
103+
"KafkaRecordModel",
101104
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import base64
2+
import logging
3+
from binascii import Error as BinAsciiError
4+
from datetime import datetime
5+
from typing import Dict, List, Optional, Type, Union
6+
7+
from pydantic import BaseModel, validator
8+
9+
from aws_lambda_powertools.utilities.parser.types import Literal
10+
11+
SERVERS_DELIMITER = ","
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def _base64_decode(value: str) -> bytes:
17+
try:
18+
logger.debug("Decoding base64 Kafka record item before parsing")
19+
return base64.b64decode(value)
20+
except (BinAsciiError, TypeError):
21+
raise ValueError("base64 decode failed")
22+
23+
24+
def _bytes_to_string(value: bytes) -> str:
25+
try:
26+
return value.decode("utf-8")
27+
except (BinAsciiError, TypeError):
28+
raise ValueError("base64 UTF-8 decode failed")
29+
30+
31+
class KafkaRecordModel(BaseModel):
32+
topic: str
33+
partition: int
34+
offset: int
35+
timestamp: datetime
36+
timestampType: str
37+
key: bytes
38+
value: Union[str, Type[BaseModel]]
39+
headers: List[Dict[str, bytes]]
40+
41+
# validators
42+
_decode_key = validator("key", allow_reuse=True)(_base64_decode)
43+
44+
@validator("value", pre=True, allow_reuse=True)
45+
def data_base64_decode(cls, value):
46+
as_bytes = _base64_decode(value)
47+
return _bytes_to_string(as_bytes)
48+
49+
@validator("headers", pre=True, allow_reuse=True)
50+
def decode_headers_list(cls, value):
51+
for header in value:
52+
for key, values in header.items():
53+
header[key] = bytes(values)
54+
return value
55+
56+
57+
class KafkaEventModel(BaseModel):
58+
"""Self-managed Apache Kafka event trigger
59+
Documentation:
60+
--------------
61+
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
62+
"""
63+
64+
eventSource: Literal["aws:SelfManagedKafka"]
65+
bootstrapServers: Optional[List[str]]
66+
records: Dict[str, List[KafkaRecordModel]]
67+
68+
@validator("bootstrapServers", pre=True, allow_reuse=True)
69+
def split_servers(cls, value):
70+
return None if not value else value.split(SERVERS_DELIMITER)

docs/utilities/parser.md

+2
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ Parser comes with the following built-in models:
168168
| **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway |
169169
| **APIGatewayProxyEventV2Model** | Lambda Event Source payload for Amazon API Gateway v2 payload |
170170
| **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload |
171+
| **KafkaModel** | Lambda Event Source payload for self managed Kafka payload |
171172

172173
### extending built-in models
173174

@@ -308,6 +309,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return
308309
| **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
309310
| **ApiGatewayV2Envelope** | 1. Parses data using `APIGatewayProxyEventV2Model`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
310311
| **LambdaFunctionUrlEnvelope** | 1. Parses data using `LambdaFunctionUrlModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
312+
| **KafkaEnvelope** | 1. Parses data using `KafkaRecordModel`. <br/> 2. Parses `value` key using your model and returns it. | `Model` |
311313

312314
### Bringing your own envelope
313315

tests/events/kafkaEvent.json

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{
2+
"eventSource": "aws:SelfManagedKafka",
3+
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
4+
"records": {
5+
"mytopic-0": [
6+
{
7+
"topic": "mytopic",
8+
"partition": 0,
9+
"offset": 15,
10+
"timestamp": 1545084650987,
11+
"timestampType": "CREATE_TIME",
12+
"key": "cmVjb3JkS2V5",
13+
"value": "eyJrZXkiOiJ2YWx1ZSJ9",
14+
"headers": [
15+
{
16+
"headerKey": [
17+
104,
18+
101,
19+
97,
20+
100,
21+
101,
22+
114,
23+
86,
24+
97,
25+
108,
26+
117,
27+
101
28+
]
29+
}
30+
]
31+
}
32+
]
33+
}
34+
}

tests/functional/parser/schemas.py

+4
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,7 @@ class MyApiGatewayBusiness(BaseModel):
9191
class MyALambdaFuncUrlBusiness(BaseModel):
9292
message: str
9393
username: str
94+
95+
96+
class MyALambdaKafkaBusiness(BaseModel):
97+
key: str

tests/functional/parser/test_kafka.py

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from typing import List
2+
3+
from aws_lambda_powertools.utilities.parser import envelopes, event_parser
4+
from aws_lambda_powertools.utilities.parser.models import KafkaEventModel, KafkaRecordModel
5+
from aws_lambda_powertools.utilities.typing import LambdaContext
6+
from tests.functional.parser.schemas import MyALambdaKafkaBusiness
7+
from tests.functional.utils import load_event
8+
9+
10+
@event_parser(model=MyALambdaKafkaBusiness, envelope=envelopes.KafkaEnvelope)
11+
def handle_lambda_kafka_with_envelope(event: List[MyALambdaKafkaBusiness], _: LambdaContext):
12+
assert event[0].key == "value"
13+
assert len(event) == 1
14+
15+
16+
@event_parser(model=KafkaEventModel)
17+
def handle_kafka_event(event: KafkaEventModel, _: LambdaContext):
18+
return event
19+
20+
21+
def test_kafka_event_with_envelope():
22+
event = load_event("kafkaEvent.json")
23+
handle_lambda_kafka_with_envelope(event, LambdaContext())
24+
25+
26+
def test_kafka_event():
27+
json_event = load_event("kafkaEvent.json")
28+
event: KafkaEventModel = handle_kafka_event(json_event, LambdaContext())
29+
assert event.eventSource == "aws:SelfManagedKafka"
30+
bootstrap_servers = [
31+
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
32+
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
33+
]
34+
assert event.bootstrapServers == bootstrap_servers
35+
36+
records = list(event.records["mytopic-0"])
37+
assert len(records) == 1
38+
record: KafkaRecordModel = records[0]
39+
assert record.topic == "mytopic"
40+
assert record.partition == 0
41+
assert record.offset == 15
42+
assert record.timestamp is not None
43+
convert_time = int(round(record.timestamp.timestamp() * 1000))
44+
assert convert_time == 1545084650987
45+
assert record.timestampType == "CREATE_TIME"
46+
assert record.key == b"recordKey"
47+
assert record.value == '{"key":"value"}'
48+
assert len(record.headers) == 1
49+
assert record.headers[0]["headerKey"] == b"headerValue"

0 commit comments

Comments
 (0)