Skip to content

Commit 41fb821

Browse files
authored
Base class for MqttServiceClients (#2)
Base class for MqttServiceClients. Handles dispatching responses to requests based on "nonce" aka "client token".
1 parent 8bdf3ec commit 41fb821

File tree

4 files changed

+472
-309
lines changed

4 files changed

+472
-309
lines changed

awsiot/__init__.py

Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,346 @@
1+
# Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# A copy of the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0
8+
#
9+
# or in the "license" file accompanying this file. This file is distributed
10+
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
# express or implied. See the License for the specific language governing
12+
# permissions and limitations under the License.
13+
114
__all__ = [
215
'iotjobs',
316
'iotshadow',
417
]
18+
19+
from aws_crt import mqtt
20+
from concurrent.futures import Future
21+
import json
22+
from threading import RLock
23+
from typing import Any, Callable, Dict, Generic, List, Optional, Type, TypeVar, Union
24+
from uuid import uuid4
25+
26+
class MqttServiceClient(object):
27+
"""
28+
Base class for an AWS MQTT Service Client
29+
"""
30+
31+
def __init__(self, mqtt_connection):
32+
# type: (mqtt.Connection) -> None
33+
self._mqtt_connection = mqtt_connection # type: mqtt.Connection
34+
self._nonce_rpc_topics = {} # type: Dict[str, _NonceRpcTopicEntry]
35+
self._nonce_rpc_lock = RLock() # type: RLock
36+
self._fifo_rpc_topics = {} # type: Dict[str, _FifoRpcTopicEntry]
37+
self._fifo_rpc_lock = RLock() # type: RLock
38+
39+
@property
40+
def mqtt_connection(self):
41+
return self._mqtt_connection
42+
43+
def _nonce_rpc_operation(self, request_topic, request_payload, request_payload_nonce_field, subscriptions):
44+
# type: (str, PayloadObj, str, List[_NonceRpcSubscription]) -> Future
45+
"""
46+
Performs a "Remote Procedure Call" style operation for an MQTT service,
47+
where a "nonce" token is used to correlate requests with responses.
48+
49+
Parameters:
50+
request_topic - Topic for request message.
51+
request_payload - Input object to be sent as JSON in the request message.
52+
request_payload_nonce_field - Name of field in the request payload which contains the "nonce",
53+
a unique string that will be copied to the response,
54+
allowing the response to be correlated with this particular request.
55+
If the field's value is empty, a uniqe value will be generated for this operation.
56+
subscriptions - List of _NonceRpcSubscriptions, one for each possible response.
57+
58+
Returns a Future that will contain the outcome of the operation.
59+
A response from a non-error topic becomes a valid result in the Future.
60+
A response from an error topic becomes an Exception in the Future.
61+
Any other exception that occurs as part of the RPC becomes an exception in the Future.
62+
"""
63+
64+
future = Future() # type: Future
65+
66+
# generate a unique nonce, if none was contained in the payload
67+
nonce_value = request_payload.get(request_payload_nonce_field)
68+
if not nonce_value:
69+
nonce_value = str(uuid4())
70+
request_payload[request_payload_nonce_field] = nonce_value
71+
72+
operation = _NonceRpcOperation(future, request_topic, json.dumps(request_payload), nonce_value, subscriptions)
73+
74+
# callback sends request when all subacks received
75+
suback_counter = [] # type: List
76+
def on_suback(packet_id):
77+
# count subacks by popping an entry out of this list
78+
if suback_counter:
79+
suback_counter.pop()
80+
if not suback_counter:
81+
# all subscriptions succeeded
82+
self.mqtt_connection.publish(operation.request_topic, operation.request_payload, 1)
83+
84+
with self._nonce_rpc_lock:
85+
topics_to_subscribe_to = []
86+
for sub in operation.subscriptions:
87+
topic_entry = self._nonce_rpc_topics.get(sub.topic)
88+
if not topic_entry:
89+
topics_to_subscribe_to.append(sub.topic)
90+
topic_entry = _NonceRpcTopicEntry(sub.response_payload_nonce_field)
91+
self._nonce_rpc_topics[sub.topic] = topic_entry
92+
93+
topic_entry.outstanding_operations[operation.nonce_value] = operation
94+
95+
if topics_to_subscribe_to:
96+
suback_counter.extend(['suback'] * len(topics_to_subscribe_to))
97+
for i in topics_to_subscribe_to:
98+
self.mqtt_connection.subscribe(i, 1, self._nonce_response_callback, on_suback)
99+
# lock released
100+
101+
# if we're not waiting on any subscribes to complete, publish the request immediately
102+
if not topics_to_subscribe_to:
103+
self.mqtt_connection.publish(operation.request_topic, operation.request_payload, 1)
104+
105+
return future
106+
107+
def _nonce_response_callback(self, topic, payload_str):
108+
# type: (str, str) -> None
109+
payload_obj = json.loads(payload_str)
110+
operation = None
111+
subscription = None
112+
try:
113+
with self._nonce_rpc_lock:
114+
# find the corresponding operation
115+
topic_entry = self._nonce_rpc_topics[topic]
116+
nonce_value = payload_obj[topic_entry.nonce_field_name_in_response]
117+
operation = topic_entry.outstanding_operations[nonce_value]
118+
119+
# operation found.
120+
# remove it from all associated topics.
121+
# unsubscribe from any topic that has no more outstanding operations.
122+
topics_to_unsubscribe_from = [] # type: List
123+
124+
for sub in operation.subscriptions:
125+
if sub.topic == topic:
126+
subscription = sub
127+
topic_entry = self._nonce_rpc_topics[sub.topic]
128+
del topic_entry.outstanding_operations[nonce_value]
129+
if not topic_entry.outstanding_operations:
130+
del self._nonce_rpc_topics[sub.topic]
131+
topics_to_unsubscribe_from.append(sub.topic)
132+
133+
for i in topics_to_unsubscribe_from:
134+
self.mqtt_connection.unsubscribe(i)
135+
136+
# lock released
137+
if subscription and operation:
138+
result = subscription.response_payload_to_class_fn(payload_obj)
139+
if isinstance(result, Exception):
140+
operation.future.set_exception(result)
141+
else:
142+
operation.future.set_result(result)
143+
else:
144+
raise RuntimeError("Cannot determine response type")
145+
146+
except Exception as e:
147+
if operation:
148+
operation.future.set_exception(e)
149+
else:
150+
raise
151+
152+
def _fifo_rpc_operation(self, request_topic, request_payload, subscriptions):
153+
# type: (str, Optional[PayloadObj], List[_FifoRpcSubscription]) -> Future
154+
"""
155+
Performs a "Remote Procedure Call" style operation for an MQTT service,
156+
where responses are correlated to requests on a first-in-first-out basis.
157+
158+
Parameters:
159+
request_topic - Topic for request message.
160+
request_payload - Input object to be sent as json string in request message.
161+
subscriptions - List of _FifoRpcSubscriptions, one for each possible response.
162+
163+
Returns a Future that will contain the outcome of the operation.
164+
A response from a non-error topic becomes a valid result in the Future.
165+
A response from an error topic becomes an Exception in the Future.
166+
Any other exception that occurs as part of the RPC becomes an exception in the Future.
167+
"""
168+
future = Future() # type: Future
169+
request_payload_str = json.dumps(request_payload) if request_payload else ""
170+
operation = _FifoRpcOperation(future, request_topic, request_payload_str, subscriptions)
171+
172+
# callback sends request when all subacks received
173+
suback_counter = [] # type: List
174+
def on_suback(packet_id):
175+
# count subacks by popping an entry out of this list
176+
if suback_counter:
177+
suback_counter.pop()
178+
if not suback_counter:
179+
# all subscriptions succeeded
180+
self.mqtt_connection.publish(operation.request_topic, operation.request_payload, 1)
181+
182+
with self._fifo_rpc_lock:
183+
topics_to_subscribe_to = []
184+
for sub in operation.subscriptions:
185+
topic_entry = self._fifo_rpc_topics.get(sub.topic)
186+
if not topic_entry:
187+
topics_to_subscribe_to.append(sub.topic)
188+
topic_entry = _FifoRpcTopicEntry()
189+
self._fifo_rpc_topics[sub.topic] = topic_entry
190+
191+
topic_entry.outstanding_operations.append(operation)
192+
193+
if topics_to_subscribe_to:
194+
suback_counter.extend(['suback'] * len(topics_to_subscribe_to))
195+
for i in topics_to_subscribe_to:
196+
self.mqtt_connection.subscribe(i, 1, self._fifo_callback, on_suback)
197+
# lock released.
198+
199+
# if we're not waiting on any subscribes to complete, publish the request immediately
200+
if not topics_to_subscribe_to:
201+
self.mqtt_connection.publish(operation.request_topic, operation.request_payload, 1)
202+
203+
return future
204+
205+
def _fifo_callback(self, topic, payload_str):
206+
# type: (str, str) -> None
207+
operation = None
208+
subscription = None
209+
try:
210+
with self._fifo_rpc_lock:
211+
# find the corresponding operation
212+
topic_entry = self._fifo_rpc_topics[topic]
213+
operation = topic_entry.outstanding_operations[0]
214+
215+
# operation found.
216+
# remove it from all associated topics.
217+
# unsubscribe from any topic that has no more outstanding operations.
218+
topics_to_unsubscribe_from = []
219+
220+
for sub in operation.subscriptions:
221+
if sub.topic == topic:
222+
subscription = sub
223+
topic_entry = self._fifo_rpc_topics[sub.topic]
224+
topic_entry.outstanding_operations.remove(operation)
225+
if not topic_entry.outstanding_operations:
226+
del self._fifo_rpc_topics[sub.topic]
227+
topics_to_unsubscribe_from.append(sub.topic)
228+
229+
for i in topics_to_unsubscribe_from:
230+
self.mqtt_connection.unsubscribe(i)
231+
# lock released
232+
233+
# sometimes the response is an empty string
234+
try:
235+
payload_obj = json.loads(payload_str)
236+
except:
237+
payload_obj = {}
238+
239+
if subscription and operation:
240+
result = subscription.response_payload_to_class_fn(payload_obj)
241+
if isinstance(result, Exception):
242+
operation.future.set_exception(result)
243+
else:
244+
operation.future.set_result(result)
245+
else:
246+
raise RuntimeError("Cannot determine response type")
247+
248+
except Exception as e:
249+
if operation:
250+
operation.future.set_exception(e)
251+
else:
252+
raise
253+
254+
def _subscribe_operation(self, subscriptions):
255+
# type: (List[_SubscriptionInfo]) -> Future
256+
"""
257+
Performs a 'Subscribe' style operation for an MQTT service.
258+
259+
Parameters:
260+
subscriptions - List of _SubscriptionInfos, one for each possible response.
261+
262+
Returns a Future that will contain None when all subscriptions have been acknowledged by the server.
263+
"""
264+
265+
future = Future() # type: Future
266+
267+
# callback informs Future when all subacks received
268+
suback_counter = ['suback'] * len(subscriptions)
269+
def on_suback(packet_id):
270+
# count supacks by popping an entry out of this list
271+
if suback_counter:
272+
suback_counter.pop()
273+
if not suback_counter:
274+
# all subscriptions succeeded
275+
future.set_result(None)
276+
277+
for sub in subscriptions:
278+
def callback_wrapper(topic, json_payload):
279+
try:
280+
payload = json.loads(json_payload)
281+
event = sub.payload_class.from_payload(payload)
282+
sub.callback(event)
283+
except:
284+
# can't deliver payload, invoke callback with None
285+
sub.callback(None)
286+
287+
self.mqtt_connection.subscribe(sub.topic, 1, callback_wrapper, on_suback)
288+
289+
return future
290+
291+
292+
T = TypeVar('T')
293+
294+
PayloadObj = Dict[str, Any]
295+
PayloadToClassFn = Callable[[PayloadObj], T]
296+
297+
class _NonceRpcSubscription(object):
298+
# type: Generic[T]
299+
def __init__(self, topic, response_payload_to_class_fn, response_payload_nonce_field):
300+
# type: (str, PayloadToClassFn, str) -> None
301+
self.topic = topic # type: str
302+
self.response_payload_to_class_fn = response_payload_to_class_fn # type: PayloadToClassFn
303+
self.response_payload_nonce_field = response_payload_nonce_field # type: str
304+
305+
class _NonceRpcOperation(object):
306+
def __init__(self, future, request_topic, request_payload, nonce_value, subscriptions):
307+
# type: (Future, str, str, str, List[_NonceRpcSubscription]) -> None
308+
self.future = future # type: Future
309+
self.request_topic = request_topic # type: str
310+
self.request_payload = request_payload # type: str
311+
self.nonce_value = nonce_value # type: str
312+
self.subscriptions = subscriptions # type: List[_NonceRpcSubscription]
313+
314+
class _FifoRpcSubscription(object):
315+
# type: Generic[T]
316+
def __init__(self, topic, response_payload_to_class_fn):
317+
# type: (str, PayloadToClassFn) -> None
318+
self.topic = topic # type: str
319+
self.response_payload_to_class_fn = response_payload_to_class_fn # type: PayloadToClassFn
320+
321+
class _FifoRpcOperation(object):
322+
def __init__(self, future, request_topic, request_payload, subscriptions):
323+
# type: (Future, str, str, List[_FifoRpcSubscription]) -> None
324+
self.future = future # type: Future
325+
self.request_topic = request_topic # type: str
326+
self.request_payload = request_payload # type: str
327+
self.subscriptions = subscriptions # type: List[_FifoRpcSubscription]
328+
329+
class _NonceRpcTopicEntry(object):
330+
def __init__(self, nonce_field_name_in_response):
331+
# type: (str) -> None
332+
self.nonce_field_name_in_response = nonce_field_name_in_response # type: str
333+
self.outstanding_operations = {} # type: Dict[str, _NonceRpcOperation]
334+
335+
class _FifoRpcTopicEntry(object):
336+
def __init__(self):
337+
# type() -> None
338+
self.outstanding_operations = [] # type: List[_FifoRpcOperation]
339+
340+
class _SubscriptionInfo(object):
341+
# type: Generic[T]
342+
def __init__(self, topic, callback, payload_class):
343+
# type: (str, Callable[[T], None], Type[T]) -> None
344+
self.topic = topic # type: str
345+
self.callback = callback # type: Callable[[T], None]
346+
self.payload_class = payload_class # Type[T]

0 commit comments

Comments
 (0)