Skip to content

feat: add support for SNS->SQS protocol #272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
from .kinesis import KinesisDataStreamEnvelope
from .sns import SnsEnvelope
from .sns import SnsEnvelope, SnsSqsEnvelope
from .sqs import SqsEnvelope

__all__ = [
Expand All @@ -12,6 +12,7 @@
"EventBridgeEnvelope",
"KinesisDataStreamEnvelope",
"SnsEnvelope",
"SnsSqsEnvelope",
"SqsEnvelope",
"BaseEnvelope",
]
37 changes: 36 additions & 1 deletion aws_lambda_powertools/utilities/parser/envelopes/sns.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import Any, Dict, List, Optional, Union

from ..models import SnsModel
from ..models import SnsModel, SnsNotificationModel, SqsModel
from ..types import Model
from .base import BaseEnvelope

Expand Down Expand Up @@ -37,3 +37,38 @@ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> Lis
parsed_envelope = SnsModel.parse_obj(data)
logger.debug(f"Parsing SNS records in `body` with {model}")
return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]


class SnsSqsEnvelope(BaseEnvelope):
"""SNS plus SQS Envelope to extract array of Records

Published messages from SNS to SQS has a slightly different payload.
Since SNS payload is marshalled into `Record` key in SQS, we have to:

1. Parse SQS schema with incoming data
2. Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record
3. Finally, parse provided model against payload extracted
"""

def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
"""Parses records found with model provided

Parameters
----------
data : Dict
Lambda event to be parsed
model : Model
Data model provided to parse after extracting data using envelope

Returns
-------
List
List of records parsed with model provided
"""
logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
parsed_envelope = SqsModel.parse_obj(data)
output = []
for record in parsed_envelope.Records:
sns_notification = SnsNotificationModel.parse_raw(record.body)
output.append(self._parse(data=sns_notification.Message, model=model))
return output
12 changes: 10 additions & 2 deletions aws_lambda_powertools/utilities/parser/models/sns.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from typing import Dict, List, Optional

from pydantic import BaseModel
from pydantic import BaseModel, root_validator
from pydantic.networks import HttpUrl
from typing_extensions import Literal

Expand All @@ -16,14 +16,22 @@ class SnsNotificationModel(BaseModel):
TopicArn: str
UnsubscribeUrl: HttpUrl
Type: Literal["Notification"]
MessageAttributes: Dict[str, SnsMsgAttributeModel]
MessageAttributes: Optional[Dict[str, SnsMsgAttributeModel]]
Message: str
MessageId: str
SigningCertUrl: HttpUrl
Signature: str
Timestamp: datetime
SignatureVersion: str

@root_validator(pre=True)
def check_sqs_protocol(cls, values):
sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL")
if any(key in sqs_rewritten_keys for key in values):
values["UnsubscribeUrl"] = values.pop("UnsubscribeURL")
values["SigningCertUrl"] = values.pop("SigningCertURL")
return values


class SnsRecordModel(BaseModel):
EventSource: Literal["aws:sns"]
Expand Down
16 changes: 10 additions & 6 deletions docs/content/utilities/parser.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def my_function():
}
```

## Extending built-in models
## Built-in models

Parser comes with the following built-in models:

Expand All @@ -163,6 +163,8 @@ Model name | Description
**SesModel** | Lambda Event Source payload for Amazon Simple Email Service
**SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service

### extending built-in models

You can extend them to include your own models, and yet have all other known fields parsed along the way.

**EventBridge example**
Expand Down Expand Up @@ -289,7 +291,7 @@ def handler(event: UserModel, context: LambdaContext):
4. Parser then parsed the `detail` key using `UserModel`


### Built-in envelopes
### built-in envelopes

Parser comes with the following built-in envelopes, where `Model` in the return section is your given model.

Expand All @@ -300,8 +302,10 @@ Envelope name | Behaviour | Return
**SqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
**CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it. <br/> 2. Parses records in `message` key using your model and return them in a list. | `List[Model]`
**KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]`
**SnsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
**SnsSqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses SNS records in `body` key using `SnsNotificationModel`. <br/> 3. Parses data in `Message` key using your model and return them in a list. | `List[Model]`

### Bringing your own envelope
### bringing your own envelope

You can create your own Envelope model and logic by inheriting from `BaseEnvelope`, and implementing the `parse` method.

Expand Down Expand Up @@ -366,7 +370,7 @@ class EventBridgeEnvelope(BaseEnvelope): # highlight-line
3. Then, we parsed the incoming data with our envelope to confirm it matches EventBridge's structure defined in `EventBridgeModel`
4. Lastly, we call `_parse` from `BaseEnvelope` to parse the data in our envelope (.detail) using the customer model

### Data model validation
## Data model validation

<Note type="warning">
This is radically different from the <strong>Validator utility</strong> which validates events against JSON Schema.
Expand All @@ -384,7 +388,7 @@ Keep the following in mind regardless of which decorator you end up using it:
* You must raise either `ValueError`, `TypeError`, or `AssertionError` when value is not compliant
* You must return the value(s) itself if compliant

#### Validating fields
### validating fields

Quick validation to verify whether the field `message` has the value of `hello world`.

Expand Down Expand Up @@ -429,7 +433,7 @@ class HelloWorldModel(BaseModel):
parse(model=HelloWorldModel, event={"message": "hello universe", "sender": "universe"})
```

#### Validating entire model
### validating entire model

`root_validator` can help when you have a complex validation mechanism. For example finding whether data has been omitted, comparing field values, etc.

Expand Down
20 changes: 20 additions & 0 deletions tests/events/snsSqsEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"Records": [
{
"messageId": "79406a00-bf15-46ca-978c-22c3613fcb30",
"receiptHandle": "AQEB3fkqlBqq239bMCAHIr5mZkxJYKtxsTTy1lMImmpY7zqpQdfcAE8zFiuRh7X5ciROy24taT2rRXfuJFN/yEUVcQ6d5CIOCEK4htmRJJOHIyGdZPAm2NUUG5nNn2aEzgfzVvrkPBsrCbr7XTzK5s6eUZNH/Nn9AJtHKHpzweRK34Bon9OU/mvyIT7EJbwHPsdhL14NrCp8pLWBiIhkaJkG2G6gPO89dwHtGVUARJL+zP70AuIu/f7QgmPtY2eeE4AVbcUT1qaIlSGHUXxoHq/VMHLd/c4zWl0EXQOo/90DbyCUMejTIKL7N15YfkHoQDHprvMiAr9S75cdMiNOduiHzZLg/qVcv4kxsksKLFMKjwlzmYuQYy2KslVGwoHMd4PD",
"body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"d88d4479-6ec0-54fe-b63f-1cf9df4bb16e\",\n \"TopicArn\" : \"arn:aws:sns:eu-west-1:231436140809:powertools265\",\n \"Message\" : \"{\\\"message\\\": \\\"hello world\\\", \\\"username\\\": \\\"lessa\\\"}\",\n \"Timestamp\" : \"2021-01-19T10:07:07.287Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"tEo2i6Lw6/Dr7Jdlulh0sXgnkF0idd3hqs8QZCorQpzkIWVOuu583NT0Gv0epuZD1Bo+tex6NgP5p6415yNVujGHJKnkrA9ztzXaVgFiol8rf8AFGQbmb7RsM9BqATQUJeg9nCTe0jksmWXmjxEFr8XKyyRuQBwSlRTngAvOw8jUnCe1vyYD5xPec1xpfOEGLi5BqSog+6tBtsry3oAtcENX8SV1tVuMpp6D+UrrU8xNT/5D70uRDppkPE3vq+t7rR0fVSdQRdUV9KmQD2bflA1Dyb2y37EzwJOMHDDQ82aOhj/JmPxvEAlV8RkZl6J0HIveraRy9wbNLbI7jpiOCw==\",\n \"SigningCertURL\" : \"https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem\",\n \"UnsubscribeURL\" : \"https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:231436140809:powertools265:15189ad7-870e-40e5-a7dd-a48898cd9f86\"\n}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1611050827340",
"SenderId": "AIDAISMY7JYY5F7RTT6AO",
"ApproximateFirstReceiveTimestamp": "1611050827344"
},
"messageAttributes": {},
"md5OfBody": "8910bdaaf9a30a607f7891037d4af0b0",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:eu-west-1:231436140809:powertools265",
"awsRegion": "eu-west-1"
}
]
}
12 changes: 12 additions & 0 deletions tests/functional/parser/test_sns.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,15 @@ def handle_sns_no_envelope(event: MyAdvancedSnsBusiness, _: LambdaContext):
def test_handle_sns_trigger_event_no_envelope():
event_dict = load_event("snsEvent.json")
handle_sns_no_envelope(event_dict, LambdaContext())


@event_parser(model=MySnsBusiness, envelope=envelopes.SnsSqsEnvelope)
def handle_sns_sqs_json_body(event: List[MySnsBusiness], _: LambdaContext):
assert len(event) == 1
assert event[0].message == "hello world"
assert event[0].username == "lessa"


def test_handle_sns_sqs_trigger_event_json_body(): # noqa: F811
event_dict = load_event("snsSqsEvent.json")
handle_sns_sqs_json_body(event_dict, LambdaContext())