forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsns.py
47 lines (37 loc) · 1.57 KB
/
sns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from datetime import datetime
from typing import Dict, List, Literal, Optional, Union
from typing import Type as TypingType
from pydantic import BaseModel, model_validator
from pydantic.networks import HttpUrl
class SnsMsgAttributeModel(BaseModel):
Type: str
Value: str
class SnsNotificationModel(BaseModel):
Subject: Optional[str] = None
TopicArn: str
UnsubscribeUrl: HttpUrl
Type: Literal["Notification"]
MessageAttributes: Optional[Dict[str, SnsMsgAttributeModel]] = None
Message: Union[str, TypingType[BaseModel]]
MessageId: str
SigningCertUrl: Optional[HttpUrl] = None # NOTE: FIFO opt-in removes attribute
Signature: Optional[str] = None # NOTE: FIFO opt-in removes attribute
Timestamp: datetime
SignatureVersion: Optional[str] = None # NOTE: FIFO opt-in removes attribute
@model_validator(mode="before")
def check_sqs_protocol(cls, values):
sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL")
if any(key in sqs_rewritten_keys for key in values):
# The sentinel value 'None' forces the validator to fail with
# ValidatorError instead of KeyError when the key is missing from
# the SQS payload
values["UnsubscribeUrl"] = values.pop("UnsubscribeURL", None)
values["SigningCertUrl"] = values.pop("SigningCertURL", None)
return values
class SnsRecordModel(BaseModel):
EventSource: Literal["aws:sns"]
EventVersion: str
EventSubscriptionArn: str
Sns: SnsNotificationModel
class SnsModel(BaseModel):
Records: List[SnsRecordModel]