1
- import base64
2
- import logging
3
- from binascii import Error as BinAsciiError
4
1
from datetime import datetime
5
- from typing import Dict , List , Optional , Type , Union
2
+ from typing import Dict , List , Type , Union
6
3
7
4
from pydantic import BaseModel , validator
8
5
6
+ from aws_lambda_powertools .shared .functions import base64_decode , bytes_to_string
9
7
from aws_lambda_powertools .utilities .parser .types import Literal
10
8
11
9
SERVERS_DELIMITER = ","
12
10
13
- logger = logging .getLogger (__name__ )
14
-
15
-
16
- def _base64_decode (value : str ) -> bytes :
17
- try :
18
- logger .debug ("Decoding base64 Kafka record item before parsing" )
19
- return base64 .b64decode (value )
20
- except (BinAsciiError , TypeError ):
21
- raise ValueError ("base64 decode failed" )
22
-
23
-
24
- def _bytes_to_string (value : bytes ) -> str :
25
- try :
26
- return value .decode ("utf-8" )
27
- except (BinAsciiError , TypeError ):
28
- raise ValueError ("base64 UTF-8 decode failed" )
29
-
30
11
31
12
class KafkaRecordModel (BaseModel ):
32
13
topic : str
@@ -39,12 +20,12 @@ class KafkaRecordModel(BaseModel):
39
20
headers : List [Dict [str , bytes ]]
40
21
41
22
# validators
42
- _decode_key = validator ("key" , allow_reuse = True )(_base64_decode )
23
+ _decode_key = validator ("key" , allow_reuse = True )(base64_decode )
43
24
44
25
@validator ("value" , pre = True , allow_reuse = True )
45
26
def data_base64_decode (cls , value ):
46
- as_bytes = _base64_decode (value )
47
- return _bytes_to_string (as_bytes )
27
+ as_bytes = base64_decode (value )
28
+ return bytes_to_string (as_bytes )
48
29
49
30
@validator ("headers" , pre = True , allow_reuse = True )
50
31
def decode_headers_list (cls , value ):
@@ -55,7 +36,7 @@ def decode_headers_list(cls, value):
55
36
56
37
57
38
class KafkaBaseEventModel (BaseModel ):
58
- bootstrapServers : Optional [ List [str ] ]
39
+ bootstrapServers : List [str ]
59
40
records : Dict [str , List [KafkaRecordModel ]]
60
41
61
42
@validator ("bootstrapServers" , pre = True , allow_reuse = True )
0 commit comments