forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathactive_mq_event.py
125 lines (98 loc) · 3.55 KB
/
active_mq_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import base64
import json
from typing import Any, Iterator, Optional
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
class ActiveMQMessage(DictWrapper):
@property
def message_id(self) -> str:
"""Unique identifier for the message"""
return self["messageID"]
@property
def message_type(self) -> str:
return self["messageType"]
@property
def data(self) -> str:
return self["data"]
@property
def decoded_data(self) -> str:
"""Decodes the data as a str"""
return base64.b64decode(self.data.encode()).decode()
@property
def json_data(self) -> Any:
"""Parses the data as json"""
return json.loads(self.decoded_data)
@property
def connection_id(self) -> str:
return self["connectionId"]
@property
def redelivered(self) -> bool:
"""true if the message is being resent to the consumer"""
return self["redelivered"]
@property
def timestamp(self) -> int:
"""Time in milliseconds."""
return self["timestamp"]
@property
def broker_in_time(self) -> int:
"""Time stamp (in milliseconds) for when the message arrived at the broker."""
return self["brokerInTime"]
@property
def broker_out_time(self) -> int:
"""Time stamp (in milliseconds) for when the message left the broker."""
return self["brokerOutTime"]
@property
def destination_physicalname(self) -> str:
return self["destination"]["physicalname"]
@property
def delivery_mode(self) -> Optional[int]:
"""persistent or non-persistent delivery"""
return self.get("deliveryMode")
@property
def correlation_id(self) -> Optional[str]:
"""User defined correlation id"""
return self.get("correlationID")
@property
def reply_to(self) -> Optional[str]:
"""User defined reply to"""
return self.get("replyTo")
@property
def get_type(self) -> Optional[str]:
"""User defined message type"""
return self.get("type")
@property
def expiration(self) -> Optional[int]:
"""Expiration attribute whose value is given in milliseconds"""
return self.get("expiration")
@property
def priority(self) -> Optional[int]:
"""
JMS defines a ten-level priority value, with 0 as the lowest priority and 9
as the highest. In addition, clients should consider priorities 0-4 as
gradations of normal priority and priorities 5-9 as gradations of expedited
priority.
JMS does not require that a provider strictly implement priority ordering
of messages; however, it should do its best to deliver expedited messages
ahead of normal messages.
"""
return self.get("priority")
class ActiveMQEvent(DictWrapper):
"""Represents an Active MQ event sent to Lambda
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
"""
@property
def event_source(self) -> str:
return self["eventSource"]
@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceArn"]
@property
def messages(self) -> Iterator[ActiveMQMessage]:
for record in self["messages"]:
yield ActiveMQMessage(record)
@property
def message(self) -> ActiveMQMessage:
return next(self.messages)