-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathactive_mq_event.py
154 lines (120 loc) · 4.33 KB
/
active_mq_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from __future__ import annotations
from functools import cached_property
from typing import TYPE_CHECKING, Any
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.shared_functions import base64_decode
if TYPE_CHECKING:
from collections.abc import Iterator
class ActiveMQMessage(DictWrapper):
@property
def message_id(self) -> str:
"""Unique identifier for the message"""
return self["messageID"]
@property
def message_type(self) -> str:
return self["messageType"]
@property
def data(self) -> str:
return self["data"]
@property
def decoded_data(self) -> str:
"""Decodes the data as a str"""
return base64_decode(self.data)
@cached_property
def json_data(self) -> Any:
return self._json_deserializer(self.decoded_data)
@property
def connection_id(self) -> str:
return self["connectionId"]
@property
def redelivered(self) -> bool:
"""true if the message is being resent to the consumer"""
return self["redelivered"]
@property
def timestamp(self) -> int:
"""Time in milliseconds."""
return self["timestamp"]
@property
def broker_in_time(self) -> int:
"""Time stamp (in milliseconds) for when the message arrived at the broker."""
return self["brokerInTime"]
@property
def broker_out_time(self) -> int:
"""Time stamp (in milliseconds) for when the message left the broker."""
return self["brokerOutTime"]
@property
def properties(self) -> dict:
"""Custom properties"""
return self["properties"]
@property
def destination_physicalname(self) -> str:
return self["destination"]["physicalName"]
@property
def delivery_mode(self) -> int | None:
"""persistent or non-persistent delivery"""
return self.get("deliveryMode")
@property
def correlation_id(self) -> str | None:
"""User defined correlation id"""
return self.get("correlationID")
@property
def reply_to(self) -> str | None:
"""User defined reply to"""
return self.get("replyTo")
@property
def get_type(self) -> str | None:
"""User defined message type"""
return self.get("type")
@property
def expiration(self) -> int | None:
"""Expiration attribute whose value is given in milliseconds"""
return self.get("expiration")
@property
def priority(self) -> int | None:
"""
JMS defines a ten-level priority value, with 0 as the lowest priority and 9
as the highest. In addition, clients should consider priorities 0-4 as
gradations of normal priority and priorities 5-9 as gradations of expedited
priority.
JMS does not require that a provider strictly implement priority ordering
of messages; however, it should do its best to deliver expedited messages
ahead of normal messages.
"""
return self.get("priority")
class ActiveMQEvent(DictWrapper):
"""Represents an Active MQ event sent to Lambda
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
"""
def __init__(self, data: dict[str, Any]):
super().__init__(data)
self._messages: Iterator[ActiveMQMessage] | None = None
@property
def event_source(self) -> str:
return self["eventSource"]
@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceArn"]
@property
def messages(self) -> Iterator[ActiveMQMessage]:
for record in self["messages"]:
yield ActiveMQMessage(record, json_deserializer=self._json_deserializer)
@property
def message(self) -> ActiveMQMessage:
"""
Returns the next ActiveMQ message using an iterator
Returns
-------
ActiveMQMessage
The next activemq message.
Raises
------
StopIteration
If there are no more records available.
"""
if self._messages is None:
self._messages = self.messages
return next(self._messages)