forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrabbit_mq_event.py
121 lines (91 loc) · 2.85 KB
/
rabbit_mq_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import base64
import json
from typing import Any, Dict, List
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
class BasicProperties(DictWrapper):
@property
def content_type(self) -> str:
return self["contentType"]
@property
def content_encoding(self) -> str:
return self["contentEncoding"]
@property
def headers(self) -> Dict[str, Any]:
return self["headers"]
@property
def delivery_mode(self) -> int:
return self["deliveryMode"]
@property
def priority(self) -> int:
return self["priority"]
@property
def correlation_id(self) -> str:
return self["correlationId"]
@property
def reply_to(self) -> str:
return self["replyTo"]
@property
def expiration(self) -> str:
return self["expiration"]
@property
def message_id(self) -> str:
return self["messageId"]
@property
def timestamp(self) -> str:
return self["timestamp"]
@property
def get_type(self) -> str:
return self["type"]
@property
def user_id(self) -> str:
return self["userId"]
@property
def app_id(self) -> str:
return self["appId"]
@property
def cluster_id(self) -> str:
return self["clusterId"]
@property
def body_size(self) -> int:
return self["bodySize"]
class RabbitMessage(DictWrapper):
@property
def basic_properties(self) -> BasicProperties:
return BasicProperties(self["basicProperties"])
@property
def redelivered(self) -> bool:
return self["redelivered"]
@property
def data(self) -> str:
return self["data"]
@property
def decoded_data(self) -> str:
"""Decodes the data as a str"""
return base64.b64decode(self.data.encode()).decode()
@property
def json_data(self) -> Any:
"""Parses the data as json"""
return json.loads(self.decoded_data)
class RabbitMQEvent(DictWrapper):
"""Represents a Rabbit MQ event sent to Lambda
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/
"""
def __init__(self, data: Dict[str, Any]):
super().__init__(data)
self._rmq_messages_by_queue = {
key: [RabbitMessage(message) for message in messages]
for key, messages in self["rmqMessagesByQueue"].items()
}
@property
def event_source(self) -> str:
return self["eventSource"]
@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceArn"]
@property
def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
return self._rmq_messages_by_queue