Skip to content

Commit b538772

Browse files
authored
feat: add support for SNS->SQS protocol (#272)
* feat: add support for SNS->SQS protocol * docs: add SnsEnvelope, SnsSqsEnvelope; fix headers Signed-off-by: heitorlessa <[email protected]>
1 parent 722b4a3 commit b538772

File tree

6 files changed

+90
-10
lines changed

6 files changed

+90
-10
lines changed

aws_lambda_powertools/utilities/parser/envelopes/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from .dynamodb import DynamoDBStreamEnvelope
44
from .event_bridge import EventBridgeEnvelope
55
from .kinesis import KinesisDataStreamEnvelope
6-
from .sns import SnsEnvelope
6+
from .sns import SnsEnvelope, SnsSqsEnvelope
77
from .sqs import SqsEnvelope
88

99
__all__ = [
@@ -12,6 +12,7 @@
1212
"EventBridgeEnvelope",
1313
"KinesisDataStreamEnvelope",
1414
"SnsEnvelope",
15+
"SnsSqsEnvelope",
1516
"SqsEnvelope",
1617
"BaseEnvelope",
1718
]

aws_lambda_powertools/utilities/parser/envelopes/sns.py

+36-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from typing import Any, Dict, List, Optional, Union
33

4-
from ..models import SnsModel
4+
from ..models import SnsModel, SnsNotificationModel, SqsModel
55
from ..types import Model
66
from .base import BaseEnvelope
77

@@ -37,3 +37,38 @@ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> Lis
3737
parsed_envelope = SnsModel.parse_obj(data)
3838
logger.debug(f"Parsing SNS records in `body` with {model}")
3939
return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]
40+
41+
42+
class SnsSqsEnvelope(BaseEnvelope):
43+
"""SNS plus SQS Envelope to extract array of Records
44+
45+
Published messages from SNS to SQS has a slightly different payload.
46+
Since SNS payload is marshalled into `Record` key in SQS, we have to:
47+
48+
1. Parse SQS schema with incoming data
49+
2. Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record
50+
3. Finally, parse provided model against payload extracted
51+
"""
52+
53+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
54+
"""Parses records found with model provided
55+
56+
Parameters
57+
----------
58+
data : Dict
59+
Lambda event to be parsed
60+
model : Model
61+
Data model provided to parse after extracting data using envelope
62+
63+
Returns
64+
-------
65+
List
66+
List of records parsed with model provided
67+
"""
68+
logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
69+
parsed_envelope = SqsModel.parse_obj(data)
70+
output = []
71+
for record in parsed_envelope.Records:
72+
sns_notification = SnsNotificationModel.parse_raw(record.body)
73+
output.append(self._parse(data=sns_notification.Message, model=model))
74+
return output

aws_lambda_powertools/utilities/parser/models/sns.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import datetime
22
from typing import Dict, List, Optional
33

4-
from pydantic import BaseModel
4+
from pydantic import BaseModel, root_validator
55
from pydantic.networks import HttpUrl
66
from typing_extensions import Literal
77

@@ -16,14 +16,22 @@ class SnsNotificationModel(BaseModel):
1616
TopicArn: str
1717
UnsubscribeUrl: HttpUrl
1818
Type: Literal["Notification"]
19-
MessageAttributes: Dict[str, SnsMsgAttributeModel]
19+
MessageAttributes: Optional[Dict[str, SnsMsgAttributeModel]]
2020
Message: str
2121
MessageId: str
2222
SigningCertUrl: HttpUrl
2323
Signature: str
2424
Timestamp: datetime
2525
SignatureVersion: str
2626

27+
@root_validator(pre=True)
28+
def check_sqs_protocol(cls, values):
29+
sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL")
30+
if any(key in sqs_rewritten_keys for key in values):
31+
values["UnsubscribeUrl"] = values.pop("UnsubscribeURL")
32+
values["SigningCertUrl"] = values.pop("SigningCertURL")
33+
return values
34+
2735

2836
class SnsRecordModel(BaseModel):
2937
EventSource: Literal["aws:sns"]

docs/content/utilities/parser.mdx

+10-6
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def my_function():
147147
}
148148
```
149149

150-
## Extending built-in models
150+
## Built-in models
151151

152152
Parser comes with the following built-in models:
153153

@@ -163,6 +163,8 @@ Model name | Description
163163
**SesModel** | Lambda Event Source payload for Amazon Simple Email Service
164164
**SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service
165165

166+
### extending built-in models
167+
166168
You can extend them to include your own models, and yet have all other known fields parsed along the way.
167169

168170
**EventBridge example**
@@ -289,7 +291,7 @@ def handler(event: UserModel, context: LambdaContext):
289291
4. Parser then parsed the `detail` key using `UserModel`
290292

291293

292-
### Built-in envelopes
294+
### built-in envelopes
293295

294296
Parser comes with the following built-in envelopes, where `Model` in the return section is your given model.
295297

@@ -300,8 +302,10 @@ Envelope name | Behaviour | Return
300302
**SqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
301303
**CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it. <br/> 2. Parses records in `message` key using your model and return them in a list. | `List[Model]`
302304
**KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]`
305+
**SnsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
306+
**SnsSqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses SNS records in `body` key using `SnsNotificationModel`. <br/> 3. Parses data in `Message` key using your model and return them in a list. | `List[Model]`
303307

304-
### Bringing your own envelope
308+
### bringing your own envelope
305309

306310
You can create your own Envelope model and logic by inheriting from `BaseEnvelope`, and implementing the `parse` method.
307311

@@ -366,7 +370,7 @@ class EventBridgeEnvelope(BaseEnvelope): # highlight-line
366370
3. Then, we parsed the incoming data with our envelope to confirm it matches EventBridge's structure defined in `EventBridgeModel`
367371
4. Lastly, we call `_parse` from `BaseEnvelope` to parse the data in our envelope (.detail) using the customer model
368372

369-
### Data model validation
373+
## Data model validation
370374

371375
<Note type="warning">
372376
This is radically different from the <strong>Validator utility</strong> which validates events against JSON Schema.
@@ -384,7 +388,7 @@ Keep the following in mind regardless of which decorator you end up using it:
384388
* You must raise either `ValueError`, `TypeError`, or `AssertionError` when value is not compliant
385389
* You must return the value(s) itself if compliant
386390

387-
#### Validating fields
391+
### validating fields
388392

389393
Quick validation to verify whether the field `message` has the value of `hello world`.
390394

@@ -429,7 +433,7 @@ class HelloWorldModel(BaseModel):
429433
parse(model=HelloWorldModel, event={"message": "hello universe", "sender": "universe"})
430434
```
431435

432-
#### Validating entire model
436+
### validating entire model
433437

434438
`root_validator` can help when you have a complex validation mechanism. For example finding whether data has been omitted, comparing field values, etc.
435439

tests/events/snsSqsEvent.json

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"Records": [
3+
{
4+
"messageId": "79406a00-bf15-46ca-978c-22c3613fcb30",
5+
"receiptHandle": "AQEB3fkqlBqq239bMCAHIr5mZkxJYKtxsTTy1lMImmpY7zqpQdfcAE8zFiuRh7X5ciROy24taT2rRXfuJFN/yEUVcQ6d5CIOCEK4htmRJJOHIyGdZPAm2NUUG5nNn2aEzgfzVvrkPBsrCbr7XTzK5s6eUZNH/Nn9AJtHKHpzweRK34Bon9OU/mvyIT7EJbwHPsdhL14NrCp8pLWBiIhkaJkG2G6gPO89dwHtGVUARJL+zP70AuIu/f7QgmPtY2eeE4AVbcUT1qaIlSGHUXxoHq/VMHLd/c4zWl0EXQOo/90DbyCUMejTIKL7N15YfkHoQDHprvMiAr9S75cdMiNOduiHzZLg/qVcv4kxsksKLFMKjwlzmYuQYy2KslVGwoHMd4PD",
6+
"body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"d88d4479-6ec0-54fe-b63f-1cf9df4bb16e\",\n \"TopicArn\" : \"arn:aws:sns:eu-west-1:231436140809:powertools265\",\n \"Message\" : \"{\\\"message\\\": \\\"hello world\\\", \\\"username\\\": \\\"lessa\\\"}\",\n \"Timestamp\" : \"2021-01-19T10:07:07.287Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"tEo2i6Lw6/Dr7Jdlulh0sXgnkF0idd3hqs8QZCorQpzkIWVOuu583NT0Gv0epuZD1Bo+tex6NgP5p6415yNVujGHJKnkrA9ztzXaVgFiol8rf8AFGQbmb7RsM9BqATQUJeg9nCTe0jksmWXmjxEFr8XKyyRuQBwSlRTngAvOw8jUnCe1vyYD5xPec1xpfOEGLi5BqSog+6tBtsry3oAtcENX8SV1tVuMpp6D+UrrU8xNT/5D70uRDppkPE3vq+t7rR0fVSdQRdUV9KmQD2bflA1Dyb2y37EzwJOMHDDQ82aOhj/JmPxvEAlV8RkZl6J0HIveraRy9wbNLbI7jpiOCw==\",\n \"SigningCertURL\" : \"https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem\",\n \"UnsubscribeURL\" : \"https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:231436140809:powertools265:15189ad7-870e-40e5-a7dd-a48898cd9f86\"\n}",
7+
"attributes": {
8+
"ApproximateReceiveCount": "1",
9+
"SentTimestamp": "1611050827340",
10+
"SenderId": "AIDAISMY7JYY5F7RTT6AO",
11+
"ApproximateFirstReceiveTimestamp": "1611050827344"
12+
},
13+
"messageAttributes": {},
14+
"md5OfBody": "8910bdaaf9a30a607f7891037d4af0b0",
15+
"eventSource": "aws:sqs",
16+
"eventSourceARN": "arn:aws:sqs:eu-west-1:231436140809:powertools265",
17+
"awsRegion": "eu-west-1"
18+
}
19+
]
20+
}

tests/functional/parser/test_sns.py

+12
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,15 @@ def handle_sns_no_envelope(event: MyAdvancedSnsBusiness, _: LambdaContext):
9191
def test_handle_sns_trigger_event_no_envelope():
9292
event_dict = load_event("snsEvent.json")
9393
handle_sns_no_envelope(event_dict, LambdaContext())
94+
95+
96+
@event_parser(model=MySnsBusiness, envelope=envelopes.SnsSqsEnvelope)
97+
def handle_sns_sqs_json_body(event: List[MySnsBusiness], _: LambdaContext):
98+
assert len(event) == 1
99+
assert event[0].message == "hello world"
100+
assert event[0].username == "lessa"
101+
102+
103+
def test_handle_sns_sqs_trigger_event_json_body(): # noqa: F811
104+
event_dict = load_event("snsSqsEvent.json")
105+
handle_sns_sqs_json_body(event_dict, LambdaContext())

0 commit comments

Comments
 (0)