Skip to content

feat(parser): Adds DDB deserialization to DynamoDBStreamChangedRecordModel #4401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions aws_lambda_powertools/shared/dynamodb_deserializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from decimal import Clamped, Context, Decimal, Inexact, Overflow, Rounded, Underflow
from typing import Any, Callable, Dict, Optional, Sequence, Set

# NOTE: DynamoDB supports up to 38 digits precision
# Therefore, this ensures our Decimal follows what's stored in the table
DYNAMODB_CONTEXT = Context(
Emin=-128,
Emax=126,
prec=38,
traps=[Clamped, Overflow, Inexact, Rounded, Underflow],
)


class TypeDeserializer:
"""
Deserializes DynamoDB types to Python types.

It's based on boto3's [DynamoDB TypeDeserializer](https://boto3.amazonaws.com/v1/documentation/api/latest/_modules/boto3/dynamodb/types.html).

The only notable difference is that for Binary (`B`, `BS`) values we return Python Bytes directly,
since we don't support Python 2.
"""

def deserialize(self, value: Dict) -> Any:
"""Deserialize DynamoDB data types into Python types.

Parameters
----------
value: Any
DynamoDB value to be deserialized to a python type


Here are the various conversions:

DynamoDB Python
-------- ------
{'NULL': True} None
{'BOOL': True/False} True/False
{'N': Decimal(value)} Decimal(value)
{'S': string} string
{'B': bytes} bytes
{'NS': [str(value)]} set([str(value)])
{'SS': [string]} set([string])
{'BS': [bytes]} set([bytes])
{'L': list} list
{'M': dict} dict

Parameters
----------
value: Any
DynamoDB value to be deserialized to a python type

Returns
--------
any
Python native type converted from DynamoDB type
"""

dynamodb_type = list(value.keys())[0]
deserializer: Optional[Callable] = getattr(self, f"_deserialize_{dynamodb_type}".lower(), None)
if deserializer is None:
raise TypeError(f"Dynamodb type {dynamodb_type} is not supported")

return deserializer(value[dynamodb_type])

def _deserialize_null(self, value: bool) -> None:
return None

def _deserialize_bool(self, value: bool) -> bool:
return value

def _deserialize_n(self, value: str) -> Decimal:
return DYNAMODB_CONTEXT.create_decimal(value)

def _deserialize_s(self, value: str) -> str:
return value

def _deserialize_b(self, value: bytes) -> bytes:
return value

def _deserialize_ns(self, value: Sequence[str]) -> Set[Decimal]:
return set(map(self._deserialize_n, value))

def _deserialize_ss(self, value: Sequence[str]) -> Set[str]:
return set(map(self._deserialize_s, value))

def _deserialize_bs(self, value: Sequence[bytes]) -> Set[bytes]:
return set(map(self._deserialize_b, value))

def _deserialize_l(self, value: Sequence[Dict]) -> Sequence[Any]:
return [self.deserialize(v) for v in value]

def _deserialize_m(self, value: Dict) -> Dict:
return {k: self.deserialize(v) for k, v in value.items()}
Original file line number Diff line number Diff line change
@@ -1,101 +1,9 @@
from decimal import Clamped, Context, Decimal, Inexact, Overflow, Rounded, Underflow
from enum import Enum
from typing import Any, Callable, Dict, Iterator, Optional, Sequence, Set
from typing import Any, Dict, Iterator, Optional

from aws_lambda_powertools.shared.dynamodb_deserializer import TypeDeserializer
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper

# NOTE: DynamoDB supports up to 38 digits precision
# Therefore, this ensures our Decimal follows what's stored in the table
DYNAMODB_CONTEXT = Context(
Emin=-128,
Emax=126,
prec=38,
traps=[Clamped, Overflow, Inexact, Rounded, Underflow],
)


class TypeDeserializer:
"""
Deserializes DynamoDB types to Python types.

It's based on boto3's [DynamoDB TypeDeserializer](https://boto3.amazonaws.com/v1/documentation/api/latest/_modules/boto3/dynamodb/types.html).

The only notable difference is that for Binary (`B`, `BS`) values we return Python Bytes directly,
since we don't support Python 2.
"""

def deserialize(self, value: Dict) -> Any:
"""Deserialize DynamoDB data types into Python types.

Parameters
----------
value: Any
DynamoDB value to be deserialized to a python type


Here are the various conversions:

DynamoDB Python
-------- ------
{'NULL': True} None
{'BOOL': True/False} True/False
{'N': Decimal(value)} Decimal(value)
{'S': string} string
{'B': bytes} bytes
{'NS': [str(value)]} set([str(value)])
{'SS': [string]} set([string])
{'BS': [bytes]} set([bytes])
{'L': list} list
{'M': dict} dict

Parameters
----------
value: Any
DynamoDB value to be deserialized to a python type

Returns
--------
any
Python native type converted from DynamoDB type
"""

dynamodb_type = list(value.keys())[0]
deserializer: Optional[Callable] = getattr(self, f"_deserialize_{dynamodb_type}".lower(), None)
if deserializer is None:
raise TypeError(f"Dynamodb type {dynamodb_type} is not supported")

return deserializer(value[dynamodb_type])

def _deserialize_null(self, value: bool) -> None:
return None

def _deserialize_bool(self, value: bool) -> bool:
return value

def _deserialize_n(self, value: str) -> Decimal:
return DYNAMODB_CONTEXT.create_decimal(value)

def _deserialize_s(self, value: str) -> str:
return value

def _deserialize_b(self, value: bytes) -> bytes:
return value

def _deserialize_ns(self, value: Sequence[str]) -> Set[Decimal]:
return set(map(self._deserialize_n, value))

def _deserialize_ss(self, value: Sequence[str]) -> Set[str]:
return set(map(self._deserialize_s, value))

def _deserialize_bs(self, value: Sequence[bytes]) -> Set[bytes]:
return set(map(self._deserialize_b, value))

def _deserialize_l(self, value: Sequence[Dict]) -> Sequence[Any]:
return [self.deserialize(v) for v in value]

def _deserialize_m(self, value: Dict) -> Dict:
return {k: self.deserialize(v) for k, v in value.items()}


class StreamViewType(Enum):
"""The type of data from the modified DynamoDB item that was captured in this stream record"""
Expand Down
64 changes: 64 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

from pydantic import BaseModel

from aws_lambda_powertools.shared.dynamodb_deserializer import TypeDeserializer
from aws_lambda_powertools.utilities.parser.types import Literal


class DynamoDBStreamChangedRecordModel(BaseModel):
_deserializer = TypeDeserializer()

ApproximateCreationDateTime: Optional[datetime] = None
Keys: Dict[str, Dict[str, Any]]
NewImage: Optional[Union[Dict[str, Any], Type[BaseModel], BaseModel]] = None
Expand All @@ -26,6 +29,67 @@ class DynamoDBStreamChangedRecordModel(BaseModel):
# raise TypeError("DynamoDB streams model failed validation, missing both new & old stream images") # noqa: ERA001,E501
# return values # noqa: ERA001

def __init__(self, **data: Any):
"""StreamRecord constructor
Parameters
----------
data: Dict[str, Any]
Represents the dynamodb dict inside DynamoDBStreamEvent's records
"""
super().__init__(**data)
self._deserializer = TypeDeserializer()

def _deserialize_dynamodb_dict(self, key: str) -> Optional[Dict[str, Any]]:
"""Deserialize DynamoDB records available in `Keys`, `NewImage`, and `OldImage`

Parameters
----------
key : str
DynamoDB key (e.g., Keys, NewImage, or OldImage)

Returns
-------
Optional[Dict[str, Any]]
Deserialized records in Python native types
"""
dynamodb_dict = getattr(self, key)
if dynamodb_dict is None:
return None

return {k: self._deserializer.deserialize(v) for k, v in dynamodb_dict.items()}

@property
def approximate_creation_date_time(self) -> Optional[datetime]:
"""The approximate date and time when the stream record was created, in UNIX epoch time format."""
item = self.ApproximateCreationDateTime
return None if item is None else item

@property
def keys(self) -> Optional[Dict[str, Any]]:
"""The primary key attribute(s) for the DynamoDB item that was modified."""
return self._deserialize_dynamodb_dict("Keys")

@property
def new_image(self) -> Optional[Dict[str, Any]]:
"""The item in the DynamoDB table as it appeared after it was modified."""
return self._deserialize_dynamodb_dict("NewImage")

@property
def old_image(self) -> Optional[Dict[str, Any]]:
"""The item in the DynamoDB table as it appeared before it was modified."""
return self._deserialize_dynamodb_dict("OldImage")

@property
def sequence_number(self) -> Optional[str]:
"""The sequence number of the stream record."""
return self.SequenceNumber

@property
def size_bytes(self) -> Optional[int]:
"""The size of the stream record, in bytes."""
item = self.SizeBytes
return None if item is None else int(item)


class UserIdentity(BaseModel):
type: Literal["Service"] # noqa: VNE003, A003
Expand Down
Loading