Skip to content

feat(data-classes): ActiveMQ and RabbitMQ support #770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 29, 2021
125 changes: 125 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/active_mq_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import base64
import json
from typing import Any, Iterator, Optional

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class ActiveMQMessage(DictWrapper):
@property
def message_id(self) -> str:
"""Unique identifier for the message"""
return self["messageID"]

@property
def message_type(self) -> str:
return self["messageType"]

@property
def data(self) -> str:
return self["data"]

@property
def decoded_data(self) -> str:
"""Decodes the data as a str"""
return base64.b64decode(self.data.encode()).decode()

@property
def json_data(self) -> Any:
"""Parses the data as json"""
return json.loads(self.decoded_data)

@property
def connection_id(self) -> str:
return self["connectionId"]

@property
def redelivered(self) -> bool:
"""true if the message is being resent to the consumer"""
return self["redelivered"]

@property
def timestamp(self) -> int:
"""Time in milliseconds."""
return self["timestamp"]

@property
def broker_in_time(self) -> int:
"""Time stamp (in milliseconds) for when the message arrived at the broker."""
return self["brokerInTime"]

@property
def broker_out_time(self) -> int:
"""Time stamp (in milliseconds) for when the message left the broker."""
return self["brokerOutTime"]

@property
def destination_physicalname(self) -> str:
return self["destination"]["physicalname"]

@property
def delivery_mode(self) -> Optional[int]:
"""persistent or non-persistent delivery"""
return self.get("deliveryMode")

@property
def correlation_id(self) -> Optional[str]:
"""User defined correlation id"""
return self.get("correlationID")

@property
def reply_to(self) -> Optional[str]:
"""User defined reply to"""
return self.get("replyTo")

@property
def get_type(self) -> Optional[str]:
"""User defined message type"""
return self.get("type")

@property
def expiration(self) -> Optional[int]:
"""Expiration attribute whose value is given in milliseconds"""
return self.get("expiration")

@property
def priority(self) -> Optional[int]:
"""
JMS defines a ten-level priority value, with 0 as the lowest priority and 9
as the highest. In addition, clients should consider priorities 0-4 as
gradations of normal priority and priorities 5-9 as gradations of expedited
priority.

JMS does not require that a provider strictly implement priority ordering
of messages; however, it should do its best to deliver expedited messages
ahead of normal messages.
"""
return self.get("priority")


class ActiveMQEvent(DictWrapper):
"""Represents an Active MQ event sent to Lambda

Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
"""

@property
def event_source(self) -> str:
return self["eventSource"]

@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceArn"]

@property
def messages(self) -> Iterator[ActiveMQMessage]:
for record in self["messages"]:
yield ActiveMQMessage(record)

@property
def message(self) -> ActiveMQMessage:
return next(self.messages)
121 changes: 121 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import base64
import json
from typing import Any, Dict, List

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class BasicProperties(DictWrapper):
@property
def content_type(self) -> str:
return self["contentType"]

@property
def content_encoding(self) -> str:
return self["contentEncoding"]

@property
def headers(self) -> Dict[str, Any]:
return self["headers"]

@property
def delivery_mode(self) -> int:
return self["deliveryMode"]

@property
def priority(self) -> int:
return self["priority"]

@property
def correlation_id(self) -> str:
return self["correlationId"]

@property
def reply_to(self) -> str:
return self["replyTo"]

@property
def expiration(self) -> str:
return self["expiration"]

@property
def message_id(self) -> str:
return self["messageId"]

@property
def timestamp(self) -> str:
return self["timestamp"]

@property
def get_type(self) -> str:
return self["type"]

@property
def user_id(self) -> str:
return self["userId"]

@property
def app_id(self) -> str:
return self["appId"]

@property
def cluster_id(self) -> str:
return self["clusterId"]

@property
def body_size(self) -> int:
return self["bodySize"]


class RabbitMessage(DictWrapper):
@property
def basic_properties(self) -> BasicProperties:
return BasicProperties(self["basicProperties"])

@property
def redelivered(self) -> bool:
return self["redelivered"]

@property
def data(self) -> str:
return self["data"]

@property
def decoded_data(self) -> str:
"""Decodes the data as a str"""
return base64.b64decode(self.data.encode()).decode()

@property
def json_data(self) -> Any:
"""Parses the data as json"""
return json.loads(self.decoded_data)


class RabbitMQEvent(DictWrapper):
"""Represents a Rabbit MQ event sent to Lambda

Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/
"""

def __init__(self, data: Dict[str, Any]):
super().__init__(data)
self._rmq_messages_by_queue = {
key: [RabbitMessage(message) for message in messages]
for key, messages in self["rmqMessagesByQueue"].items()
}

@property
def event_source(self) -> str:
return self["eventSource"]

@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceArn"]

@property
def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
return self._rmq_messages_by_queue
54 changes: 54 additions & 0 deletions docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Same example as above, but using the `event_source` decorator

Event Source | Data_class
------------------------------------------------- | ---------------------------------------------------------------------------------
[Active MQ](#active-mq) | `ActiveMQEvent`
[API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent`
[API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2`
[API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent`
Expand All @@ -72,6 +73,7 @@ Event Source | Data_class
[DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName`
[EventBridge](#eventbridge) | `EventBridgeEvent`
[Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent`
[Rabbit MQ](#rabbit-mq) | `RabbitMQEvent`
[S3](#s3) | `S3Event`
[S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent`
[SES](#ses) | `SESEvent`
Expand All @@ -82,6 +84,31 @@ Event Source | Data_class
The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of
documentation inherently (via autocompletion, types and docstrings).

### Active MQ

It is used for [Active MQ payloads](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html){target="_blank"}, also see
the [AWS blog post](https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/){target="_blank"}
for more details.

=== "app.py"

```python hl_lines="4-5 9-10"
from typing import Dict

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent

logger = Logger()

@event_source(data_class=ActiveMQEvent)
def lambda_handler(event: ActiveMQEvent, context):
for message in event.messages:
logger.debug(f"MessageID: {message.message_id}")
data: Dict = message.json_data
logger.debug("Process json in base64 encoded data str", data)
```

### API Gateway Authorizer

> New in 1.20.0
Expand Down Expand Up @@ -810,6 +837,33 @@ or plain text, depending on the original payload.
do_something_with(data)
```

### Rabbit MQ

It is used for [Rabbit MQ payloads](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html){target="_blank"}, also see
the [blog post](https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/){target="_blank"}
for more details.

=== "app.py"

```python hl_lines="4-5 9-10"
from typing import Dict

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent

logger = Logger()

@event_source(data_class=RabbitMQEvent)
def lambda_handler(event: RabbitMQEvent, context):
for queue_name, messages in event.rmq_messages_by_queue.items():
logger.debug(f"Messages for queue: {queue_name}")
for message in messages:
logger.debug(f"MessageID: {message.basic_properties.message_id}")
data: Dict = message.json_data
logger.debug("Process json in base64 encoded data str", data)
```

### S3

=== "app.py"
Expand Down
45 changes: 45 additions & 0 deletions tests/events/activeMQEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"eventSource": "aws:amq",
"eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",
"messages": [
{
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
"messageType": "jms/text-message",
"data": "QUJDOkFBQUE=",
"connectionId": "myJMSCoID",
"redelivered": false,
"destination": {
"physicalname": "testQueue"
},
"timestamp": 1598827811958,
"brokerInTime": 1598827811958,
"brokerOutTime": 1598827811959
},
{
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
"messageType": "jms/text-message",
"data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==",
"connectionId": "myJMSCoID2",
"redelivered": false,
"destination": {
"physicalname": "testQueue"
},
"timestamp": 1598827811958,
"brokerInTime": 1598827811958,
"brokerOutTime": 1598827811959
},
{
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
"messageType": "jms/bytes-message",
"data": "3DTOOW7crj51prgVLQaGQ82S48k=",
"connectionId": "myJMSCoID1",
"persistent": false,
"destination": {
"physicalname": "testQueue"
},
"timestamp": 1598827811958,
"brokerInTime": 1598827811958,
"brokerOutTime": 1598827811959
}
]
}
Loading