Skip to content

Commit e86512a

Browse files
authored
Unsubscribe() (#12)
Unsubscribe for service clients.
1 parent 1c504f6 commit e86512a

File tree

5 files changed

+168
-88
lines changed

5 files changed

+168
-88
lines changed

awsiot/__init__.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from aws_crt import mqtt
2020
from concurrent.futures import Future
2121
import json
22-
from typing import Any, Callable, Dict, Optional, TypeVar
22+
from typing import Any, Callable, Dict, Optional, Tuple, TypeVar
2323

2424
T = TypeVar('T')
2525

@@ -39,6 +39,26 @@ def __init__(self, mqtt_connection):
3939
def mqtt_connection(self):
4040
return self._mqtt_connection
4141

42+
def unsubscribe(self, topic):
43+
# type: (str) -> Future
44+
"""
45+
Tell the MQTT server to stop sending messages to this topic.
46+
47+
Returns a `Future` whose result will be `None` when the server
48+
has acknowledged the unsubscribe.
49+
"""
50+
future = Future() # type: Future
51+
try:
52+
def on_unsuback(packet_id):
53+
future.set_result(None)
54+
55+
self.mqtt_connection.unsubscribe(topic, on_unsuback)
56+
57+
except Exception as e:
58+
future.set_exception(e)
59+
60+
return future
61+
4262
def _publish_operation(self, topic, payload):
4363
# type(str, Optional[PayloadObj]) -> Future
4464
"""
@@ -48,6 +68,10 @@ def _publish_operation(self, topic, payload):
4868
topic - The topic to publish this message to.
4969
payload - (Optional) If set, the message will be a string of JSON, built from this object.
5070
If unset, an empty message is sent.
71+
72+
Returns a `Future` which will contain a result of `None` when the
73+
server has acknowledged the message, or an exception if the
74+
publish fails.
5175
"""
5276
future = Future() # type: Future
5377
try:
@@ -71,7 +95,7 @@ def on_puback(packet_id):
7195
return future
7296

7397
def _subscribe_operation(self, topic, callback, payload_to_class_fn):
74-
# type: (str, Callable[[T], None], PayloadToClassFn) -> Future
98+
# type: (str, Callable[[T], None], PayloadToClassFn) -> Tuple[Future, str]
7599
"""
76100
Performs a 'Subscribe' style operation for an MQTT service.
77101
Messages received from this topic are processed as JSON,
@@ -89,9 +113,11 @@ def _subscribe_operation(self, topic, callback, payload_to_class_fn):
89113
`callback`. The dict comes from parsing the received
90114
message as JSON.
91115
92-
Returns a Future whose result will be None when the subscription
93-
is accepted by the server. If the subscription cannot be established,
94-
the Future's result will be an exception.
116+
Returns two values. The first is a `Future` which will contain a result
117+
of `None` when the server has acknowledged the subscription, or an
118+
exception if the subscription fails. The second value is a topic which
119+
may be passed to `unsubscribe()` to stop receiving messages.
120+
Note that messages may arrive before the subscription is acknowledged.
95121
"""
96122

97123
future = Future() # type: Future
@@ -117,7 +143,7 @@ def callback_wrapper(topic, payload_str):
117143
except Exception as e:
118144
future.set_exception(e)
119145

120-
return future
146+
return future, topic
121147

122148
class ModeledClass(object):
123149
"""

awsiot/iotjobs.py

Lines changed: 70 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def publish_update_job_execution(self, request):
102102
payload=request.to_payload())
103103

104104
def subscribe_to_describe_job_execution_accepted(self, request, on_accepted):
105-
# type: (DescribeJobExecutionSubscriptionRequest, typing.Callable[[DescribeJobExecutionResponse], None]) -> concurrent.futures.Future
105+
# type: (DescribeJobExecutionSubscriptionRequest, typing.Callable[[DescribeJobExecutionResponse], None]) -> typing.Tuple[concurrent.futures.Future, str]
106106
"""
107107
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-describejobexecution
108108
@@ -112,9 +112,12 @@ def subscribe_to_describe_job_execution_accepted(self, request, on_accepted):
112112
The callback should take 1 argument of type `DescribeJobExecutionResponse`.
113113
The callback is not expected to return anything.
114114
115-
Returns a concurrent.futures.Future, whose result will be None if the
116-
subscription is successful. The Future's result will be an exception
117-
if the subscription is unsuccessful.
115+
Returns two values immediately. The first is a `concurrent.futures.Future`
116+
which will contain a result of `None` when the server has acknowledged
117+
the subscription, or an exception if the subscription fails. The second
118+
value is a topic which may be passed to `unsubscribe()` to stop
119+
receiving messages. Note that messages may arrive before the
120+
subscription is acknowledged.
118121
"""
119122
if not request.thing_name:
120123
raise ValueError("request.thing_name is required")
@@ -130,7 +133,7 @@ def subscribe_to_describe_job_execution_accepted(self, request, on_accepted):
130133
payload_to_class_fn=DescribeJobExecutionResponse.from_payload)
131134

132135
def subscribe_to_describe_job_execution_rejected(self, request, on_rejected):
133-
# type: (DescribeJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> concurrent.futures.Future
136+
# type: (DescribeJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> typing.Tuple[concurrent.futures.Future, str]
134137
"""
135138
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-describejobexecution
136139
@@ -140,9 +143,12 @@ def subscribe_to_describe_job_execution_rejected(self, request, on_rejected):
140143
The callback should take 1 argument of type `RejectedError`.
141144
The callback is not expected to return anything.
142145
143-
Returns a concurrent.futures.Future, whose result will be None if the
144-
subscription is successful. The Future's result will be an exception
145-
if the subscription is unsuccessful.
146+
Returns two values immediately. The first is a `concurrent.futures.Future`
147+
which will contain a result of `None` when the server has acknowledged
148+
the subscription, or an exception if the subscription fails. The second
149+
value is a topic which may be passed to `unsubscribe()` to stop
150+
receiving messages. Note that messages may arrive before the
151+
subscription is acknowledged.
146152
"""
147153
if not request.thing_name:
148154
raise ValueError("request.thing_name is required")
@@ -158,7 +164,7 @@ def subscribe_to_describe_job_execution_rejected(self, request, on_rejected):
158164
payload_to_class_fn=RejectedError.from_payload)
159165

160166
def subscribe_to_get_pending_job_executions_accepted(self, request, on_accepted):
161-
# type: (GetPendingJobExecutionsSubscriptionRequest, typing.Callable[[GetPendingJobExecutionsResponse], None]) -> concurrent.futures.Future
167+
# type: (GetPendingJobExecutionsSubscriptionRequest, typing.Callable[[GetPendingJobExecutionsResponse], None]) -> typing.Tuple[concurrent.futures.Future, str]
162168
"""
163169
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-getpendingjobexecutions
164170
@@ -168,9 +174,12 @@ def subscribe_to_get_pending_job_executions_accepted(self, request, on_accepted)
168174
The callback should take 1 argument of type `GetPendingJobExecutionsResponse`.
169175
The callback is not expected to return anything.
170176
171-
Returns a concurrent.futures.Future, whose result will be None if the
172-
subscription is successful. The Future's result will be an exception
173-
if the subscription is unsuccessful.
177+
Returns two values immediately. The first is a `concurrent.futures.Future`
178+
which will contain a result of `None` when the server has acknowledged
179+
the subscription, or an exception if the subscription fails. The second
180+
value is a topic which may be passed to `unsubscribe()` to stop
181+
receiving messages. Note that messages may arrive before the
182+
subscription is acknowledged.
174183
"""
175184
if not request.thing_name:
176185
raise ValueError("request.thing_name is required")
@@ -184,7 +193,7 @@ def subscribe_to_get_pending_job_executions_accepted(self, request, on_accepted)
184193
payload_to_class_fn=GetPendingJobExecutionsResponse.from_payload)
185194

186195
def subscribe_to_get_pending_job_executions_rejected(self, request, on_rejected):
187-
# type: (GetPendingJobExecutionsSubscriptionRequest, typing.Callable[[RejectedError], None]) -> concurrent.futures.Future
196+
# type: (GetPendingJobExecutionsSubscriptionRequest, typing.Callable[[RejectedError], None]) -> typing.Tuple[concurrent.futures.Future, str]
188197
"""
189198
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-getpendingjobexecutions
190199
@@ -194,9 +203,12 @@ def subscribe_to_get_pending_job_executions_rejected(self, request, on_rejected)
194203
The callback should take 1 argument of type `RejectedError`.
195204
The callback is not expected to return anything.
196205
197-
Returns a concurrent.futures.Future, whose result will be None if the
198-
subscription is successful. The Future's result will be an exception
199-
if the subscription is unsuccessful.
206+
Returns two values immediately. The first is a `concurrent.futures.Future`
207+
which will contain a result of `None` when the server has acknowledged
208+
the subscription, or an exception if the subscription fails. The second
209+
value is a topic which may be passed to `unsubscribe()` to stop
210+
receiving messages. Note that messages may arrive before the
211+
subscription is acknowledged.
200212
"""
201213
if not request.thing_name:
202214
raise ValueError("request.thing_name is required")
@@ -210,7 +222,7 @@ def subscribe_to_get_pending_job_executions_rejected(self, request, on_rejected)
210222
payload_to_class_fn=RejectedError.from_payload)
211223

212224
def subscribe_to_job_executions_changed_events(self, request, on_event):
213-
# type: (JobExecutionsChangedSubscriptionRequest, typing.Callable[[JobExecutionsChangedEvent], None]) -> concurrent.futures.Future
225+
# type: (JobExecutionsChangedSubscriptionRequest, typing.Callable[[JobExecutionsChangedEvent], None]) -> typing.Tuple[concurrent.futures.Future, str]
214226
"""
215227
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-jobexecutionschanged
216228
@@ -220,9 +232,12 @@ def subscribe_to_job_executions_changed_events(self, request, on_event):
220232
The callback should take 1 argument of type `JobExecutionsChangedEvent`.
221233
The callback is not expected to return anything.
222234
223-
Returns a concurrent.futures.Future, whose result will be None if the
224-
subscription is successful. The Future's result will be an exception
225-
if the subscription is unsuccessful.
235+
Returns two values immediately. The first is a `concurrent.futures.Future`
236+
which will contain a result of `None` when the server has acknowledged
237+
the subscription, or an exception if the subscription fails. The second
238+
value is a topic which may be passed to `unsubscribe()` to stop
239+
receiving messages. Note that messages may arrive before the
240+
subscription is acknowledged.
226241
"""
227242
if not request.thing_name:
228243
raise ValueError("request.thing_name is required")
@@ -236,7 +251,7 @@ def subscribe_to_job_executions_changed_events(self, request, on_event):
236251
payload_to_class_fn=JobExecutionsChangedEvent.from_payload)
237252

238253
def subscribe_to_next_job_execution_changed_events(self, request, on_event):
239-
# type: (NextJobExecutionChangedSubscriptionRequest, typing.Callable[[NextJobExecutionChangedEvent], None]) -> concurrent.futures.Future
254+
# type: (NextJobExecutionChangedSubscriptionRequest, typing.Callable[[NextJobExecutionChangedEvent], None]) -> typing.Tuple[concurrent.futures.Future, str]
240255
"""
241256
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-nextjobexecutionchanged
242257
@@ -246,9 +261,12 @@ def subscribe_to_next_job_execution_changed_events(self, request, on_event):
246261
The callback should take 1 argument of type `NextJobExecutionChangedEvent`.
247262
The callback is not expected to return anything.
248263
249-
Returns a concurrent.futures.Future, whose result will be None if the
250-
subscription is successful. The Future's result will be an exception
251-
if the subscription is unsuccessful.
264+
Returns two values immediately. The first is a `concurrent.futures.Future`
265+
which will contain a result of `None` when the server has acknowledged
266+
the subscription, or an exception if the subscription fails. The second
267+
value is a topic which may be passed to `unsubscribe()` to stop
268+
receiving messages. Note that messages may arrive before the
269+
subscription is acknowledged.
252270
"""
253271
if not request.thing_name:
254272
raise ValueError("request.thing_name is required")
@@ -262,7 +280,7 @@ def subscribe_to_next_job_execution_changed_events(self, request, on_event):
262280
payload_to_class_fn=NextJobExecutionChangedEvent.from_payload)
263281

264282
def subscribe_to_start_next_pending_job_execution_accepted(self, request, on_accepted):
265-
# type: (StartNextPendingJobExecutionSubscriptionRequest, typing.Callable[[StartNextJobExecutionResponse], None]) -> concurrent.futures.Future
283+
# type: (StartNextPendingJobExecutionSubscriptionRequest, typing.Callable[[StartNextJobExecutionResponse], None]) -> typing.Tuple[concurrent.futures.Future, str]
266284
"""
267285
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-startnextpendingjobexecution
268286
@@ -272,9 +290,12 @@ def subscribe_to_start_next_pending_job_execution_accepted(self, request, on_acc
272290
The callback should take 1 argument of type `StartNextJobExecutionResponse`.
273291
The callback is not expected to return anything.
274292
275-
Returns a concurrent.futures.Future, whose result will be None if the
276-
subscription is successful. The Future's result will be an exception
277-
if the subscription is unsuccessful.
293+
Returns two values immediately. The first is a `concurrent.futures.Future`
294+
which will contain a result of `None` when the server has acknowledged
295+
the subscription, or an exception if the subscription fails. The second
296+
value is a topic which may be passed to `unsubscribe()` to stop
297+
receiving messages. Note that messages may arrive before the
298+
subscription is acknowledged.
278299
"""
279300
if not request.thing_name:
280301
raise ValueError("request.thing_name is required")
@@ -288,7 +309,7 @@ def subscribe_to_start_next_pending_job_execution_accepted(self, request, on_acc
288309
payload_to_class_fn=StartNextJobExecutionResponse.from_payload)
289310

290311
def subscribe_to_start_next_pending_job_execution_rejected(self, request, on_rejected):
291-
# type: (StartNextPendingJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> concurrent.futures.Future
312+
# type: (StartNextPendingJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> typing.Tuple[concurrent.futures.Future, str]
292313
"""
293314
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-startnextpendingjobexecution
294315
@@ -298,9 +319,12 @@ def subscribe_to_start_next_pending_job_execution_rejected(self, request, on_rej
298319
The callback should take 1 argument of type `RejectedError`.
299320
The callback is not expected to return anything.
300321
301-
Returns a concurrent.futures.Future, whose result will be None if the
302-
subscription is successful. The Future's result will be an exception
303-
if the subscription is unsuccessful.
322+
Returns two values immediately. The first is a `concurrent.futures.Future`
323+
which will contain a result of `None` when the server has acknowledged
324+
the subscription, or an exception if the subscription fails. The second
325+
value is a topic which may be passed to `unsubscribe()` to stop
326+
receiving messages. Note that messages may arrive before the
327+
subscription is acknowledged.
304328
"""
305329
if not request.thing_name:
306330
raise ValueError("request.thing_name is required")
@@ -314,7 +338,7 @@ def subscribe_to_start_next_pending_job_execution_rejected(self, request, on_rej
314338
payload_to_class_fn=RejectedError.from_payload)
315339

316340
def subscribe_to_update_job_execution_accepted(self, request, on_accepted):
317-
# type: (UpdateJobExecutionSubscriptionRequest, typing.Callable[[UpdateJobExecutionResponse], None]) -> concurrent.futures.Future
341+
# type: (UpdateJobExecutionSubscriptionRequest, typing.Callable[[UpdateJobExecutionResponse], None]) -> typing.Tuple[concurrent.futures.Future, str]
318342
"""
319343
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-updatejobexecution
320344
@@ -324,9 +348,12 @@ def subscribe_to_update_job_execution_accepted(self, request, on_accepted):
324348
The callback should take 1 argument of type `UpdateJobExecutionResponse`.
325349
The callback is not expected to return anything.
326350
327-
Returns a concurrent.futures.Future, whose result will be None if the
328-
subscription is successful. The Future's result will be an exception
329-
if the subscription is unsuccessful.
351+
Returns two values immediately. The first is a `concurrent.futures.Future`
352+
which will contain a result of `None` when the server has acknowledged
353+
the subscription, or an exception if the subscription fails. The second
354+
value is a topic which may be passed to `unsubscribe()` to stop
355+
receiving messages. Note that messages may arrive before the
356+
subscription is acknowledged.
330357
"""
331358
if not request.thing_name:
332359
raise ValueError("request.thing_name is required")
@@ -342,7 +369,7 @@ def subscribe_to_update_job_execution_accepted(self, request, on_accepted):
342369
payload_to_class_fn=UpdateJobExecutionResponse.from_payload)
343370

344371
def subscribe_to_update_job_execution_rejected(self, request, on_rejected):
345-
# type: (UpdateJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> concurrent.futures.Future
372+
# type: (UpdateJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> typing.Tuple[concurrent.futures.Future, str]
346373
"""
347374
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-updatejobexecution
348375
@@ -352,9 +379,12 @@ def subscribe_to_update_job_execution_rejected(self, request, on_rejected):
352379
The callback should take 1 argument of type `RejectedError`.
353380
The callback is not expected to return anything.
354381
355-
Returns a concurrent.futures.Future, whose result will be None if the
356-
subscription is successful. The Future's result will be an exception
357-
if the subscription is unsuccessful.
382+
Returns two values immediately. The first is a `concurrent.futures.Future`
383+
which will contain a result of `None` when the server has acknowledged
384+
the subscription, or an exception if the subscription fails. The second
385+
value is a topic which may be passed to `unsubscribe()` to stop
386+
receiving messages. Note that messages may arrive before the
387+
subscription is acknowledged.
358388
"""
359389
if not request.thing_name:
360390
raise ValueError("request.thing_name is required")

0 commit comments

Comments
 (0)