Skip to content

Unsubscribe() #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions awsiot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from aws_crt import mqtt
from concurrent.futures import Future
import json
from typing import Any, Callable, Dict, Optional, TypeVar
from typing import Any, Callable, Dict, Optional, Tuple, TypeVar

T = TypeVar('T')

Expand All @@ -39,6 +39,26 @@ def __init__(self, mqtt_connection):
def mqtt_connection(self):
return self._mqtt_connection

def unsubscribe(self, topic):
# type: (str) -> Future
"""
Tell the MQTT server to stop sending messages to this topic.

Returns a `Future` whose result will be `None` when the server
has acknowledged the unsubscribe.
"""
future = Future() # type: Future
try:
def on_unsuback(packet_id):
future.set_result(None)

self.mqtt_connection.unsubscribe(topic, on_unsuback)

except Exception as e:
future.set_exception(e)

return future

def _publish_operation(self, topic, payload):
# type(str, Optional[PayloadObj]) -> Future
"""
Expand All @@ -48,6 +68,10 @@ def _publish_operation(self, topic, payload):
topic - The topic to publish this message to.
payload - (Optional) If set, the message will be a string of JSON, built from this object.
If unset, an empty message is sent.

Returns a `Future` which will contain a result of `None` when the
server has acknowledged the message, or an exception if the
publish fails.
"""
future = Future() # type: Future
try:
Expand All @@ -71,7 +95,7 @@ def on_puback(packet_id):
return future

def _subscribe_operation(self, topic, callback, payload_to_class_fn):
# type: (str, Callable[[T], None], PayloadToClassFn) -> Future
# type: (str, Callable[[T], None], PayloadToClassFn) -> Tuple[Future, str]
"""
Performs a 'Subscribe' style operation for an MQTT service.
Messages received from this topic are processed as JSON,
Expand All @@ -89,9 +113,11 @@ def _subscribe_operation(self, topic, callback, payload_to_class_fn):
`callback`. The dict comes from parsing the received
message as JSON.

Returns a Future whose result will be None when the subscription
is accepted by the server. If the subscription cannot be established,
the Future's result will be an exception.
Returns two values. The first is a `Future` which will contain a result
of `None` when the server has acknowledged the subscription, or an
exception if the subscription fails. The second value is a topic which
may be passed to `unsubscribe()` to stop receiving messages.
Note that messages may arrive before the subscription is acknowledged.
"""

future = Future() # type: Future
Expand All @@ -117,7 +143,7 @@ def callback_wrapper(topic, payload_str):
except Exception as e:
future.set_exception(e)

return future
return future, topic

class ModeledClass(object):
"""
Expand Down
110 changes: 70 additions & 40 deletions awsiot/iotjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def publish_update_job_execution(self, request):
payload=request.to_payload())

def subscribe_to_describe_job_execution_accepted(self, request, on_accepted):
# type: (DescribeJobExecutionSubscriptionRequest, typing.Callable[[DescribeJobExecutionResponse], None]) -> concurrent.futures.Future
# type: (DescribeJobExecutionSubscriptionRequest, typing.Callable[[DescribeJobExecutionResponse], None]) -> typing.Tuple[concurrent.futures.Future, str]
"""
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-describejobexecution

Expand All @@ -112,9 +112,12 @@ def subscribe_to_describe_job_execution_accepted(self, request, on_accepted):
The callback should take 1 argument of type `DescribeJobExecutionResponse`.
The callback is not expected to return anything.

Returns a concurrent.futures.Future, whose result will be None if the
subscription is successful. The Future's result will be an exception
if the subscription is unsuccessful.
Returns two values immediately. The first is a `concurrent.futures.Future`
which will contain a result of `None` when the server has acknowledged
the subscription, or an exception if the subscription fails. The second
value is a topic which may be passed to `unsubscribe()` to stop
receiving messages. Note that messages may arrive before the
subscription is acknowledged.
"""
if not request.thing_name:
raise ValueError("request.thing_name is required")
Expand All @@ -130,7 +133,7 @@ def subscribe_to_describe_job_execution_accepted(self, request, on_accepted):
payload_to_class_fn=DescribeJobExecutionResponse.from_payload)

def subscribe_to_describe_job_execution_rejected(self, request, on_rejected):
# type: (DescribeJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> concurrent.futures.Future
# type: (DescribeJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> typing.Tuple[concurrent.futures.Future, str]
"""
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-describejobexecution

Expand All @@ -140,9 +143,12 @@ def subscribe_to_describe_job_execution_rejected(self, request, on_rejected):
The callback should take 1 argument of type `RejectedError`.
The callback is not expected to return anything.

Returns a concurrent.futures.Future, whose result will be None if the
subscription is successful. The Future's result will be an exception
if the subscription is unsuccessful.
Returns two values immediately. The first is a `concurrent.futures.Future`
which will contain a result of `None` when the server has acknowledged
the subscription, or an exception if the subscription fails. The second
value is a topic which may be passed to `unsubscribe()` to stop
receiving messages. Note that messages may arrive before the
subscription is acknowledged.
"""
if not request.thing_name:
raise ValueError("request.thing_name is required")
Expand All @@ -158,7 +164,7 @@ def subscribe_to_describe_job_execution_rejected(self, request, on_rejected):
payload_to_class_fn=RejectedError.from_payload)

def subscribe_to_get_pending_job_executions_accepted(self, request, on_accepted):
# type: (GetPendingJobExecutionsSubscriptionRequest, typing.Callable[[GetPendingJobExecutionsResponse], None]) -> concurrent.futures.Future
# type: (GetPendingJobExecutionsSubscriptionRequest, typing.Callable[[GetPendingJobExecutionsResponse], None]) -> typing.Tuple[concurrent.futures.Future, str]
"""
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-getpendingjobexecutions

Expand All @@ -168,9 +174,12 @@ def subscribe_to_get_pending_job_executions_accepted(self, request, on_accepted)
The callback should take 1 argument of type `GetPendingJobExecutionsResponse`.
The callback is not expected to return anything.

Returns a concurrent.futures.Future, whose result will be None if the
subscription is successful. The Future's result will be an exception
if the subscription is unsuccessful.
Returns two values immediately. The first is a `concurrent.futures.Future`
which will contain a result of `None` when the server has acknowledged
the subscription, or an exception if the subscription fails. The second
value is a topic which may be passed to `unsubscribe()` to stop
receiving messages. Note that messages may arrive before the
subscription is acknowledged.
"""
if not request.thing_name:
raise ValueError("request.thing_name is required")
Expand All @@ -184,7 +193,7 @@ def subscribe_to_get_pending_job_executions_accepted(self, request, on_accepted)
payload_to_class_fn=GetPendingJobExecutionsResponse.from_payload)

def subscribe_to_get_pending_job_executions_rejected(self, request, on_rejected):
# type: (GetPendingJobExecutionsSubscriptionRequest, typing.Callable[[RejectedError], None]) -> concurrent.futures.Future
# type: (GetPendingJobExecutionsSubscriptionRequest, typing.Callable[[RejectedError], None]) -> typing.Tuple[concurrent.futures.Future, str]
"""
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-getpendingjobexecutions

Expand All @@ -194,9 +203,12 @@ def subscribe_to_get_pending_job_executions_rejected(self, request, on_rejected)
The callback should take 1 argument of type `RejectedError`.
The callback is not expected to return anything.

Returns a concurrent.futures.Future, whose result will be None if the
subscription is successful. The Future's result will be an exception
if the subscription is unsuccessful.
Returns two values immediately. The first is a `concurrent.futures.Future`
which will contain a result of `None` when the server has acknowledged
the subscription, or an exception if the subscription fails. The second
value is a topic which may be passed to `unsubscribe()` to stop
receiving messages. Note that messages may arrive before the
subscription is acknowledged.
"""
if not request.thing_name:
raise ValueError("request.thing_name is required")
Expand All @@ -210,7 +222,7 @@ def subscribe_to_get_pending_job_executions_rejected(self, request, on_rejected)
payload_to_class_fn=RejectedError.from_payload)

def subscribe_to_job_executions_changed_events(self, request, on_event):
# type: (JobExecutionsChangedSubscriptionRequest, typing.Callable[[JobExecutionsChangedEvent], None]) -> concurrent.futures.Future
# type: (JobExecutionsChangedSubscriptionRequest, typing.Callable[[JobExecutionsChangedEvent], None]) -> typing.Tuple[concurrent.futures.Future, str]
"""
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-jobexecutionschanged

Expand All @@ -220,9 +232,12 @@ def subscribe_to_job_executions_changed_events(self, request, on_event):
The callback should take 1 argument of type `JobExecutionsChangedEvent`.
The callback is not expected to return anything.

Returns a concurrent.futures.Future, whose result will be None if the
subscription is successful. The Future's result will be an exception
if the subscription is unsuccessful.
Returns two values immediately. The first is a `concurrent.futures.Future`
which will contain a result of `None` when the server has acknowledged
the subscription, or an exception if the subscription fails. The second
value is a topic which may be passed to `unsubscribe()` to stop
receiving messages. Note that messages may arrive before the
subscription is acknowledged.
"""
if not request.thing_name:
raise ValueError("request.thing_name is required")
Expand All @@ -236,7 +251,7 @@ def subscribe_to_job_executions_changed_events(self, request, on_event):
payload_to_class_fn=JobExecutionsChangedEvent.from_payload)

def subscribe_to_next_job_execution_changed_events(self, request, on_event):
# type: (NextJobExecutionChangedSubscriptionRequest, typing.Callable[[NextJobExecutionChangedEvent], None]) -> concurrent.futures.Future
# type: (NextJobExecutionChangedSubscriptionRequest, typing.Callable[[NextJobExecutionChangedEvent], None]) -> typing.Tuple[concurrent.futures.Future, str]
"""
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-nextjobexecutionchanged

Expand All @@ -246,9 +261,12 @@ def subscribe_to_next_job_execution_changed_events(self, request, on_event):
The callback should take 1 argument of type `NextJobExecutionChangedEvent`.
The callback is not expected to return anything.

Returns a concurrent.futures.Future, whose result will be None if the
subscription is successful. The Future's result will be an exception
if the subscription is unsuccessful.
Returns two values immediately. The first is a `concurrent.futures.Future`
which will contain a result of `None` when the server has acknowledged
the subscription, or an exception if the subscription fails. The second
value is a topic which may be passed to `unsubscribe()` to stop
receiving messages. Note that messages may arrive before the
subscription is acknowledged.
"""
if not request.thing_name:
raise ValueError("request.thing_name is required")
Expand All @@ -262,7 +280,7 @@ def subscribe_to_next_job_execution_changed_events(self, request, on_event):
payload_to_class_fn=NextJobExecutionChangedEvent.from_payload)

def subscribe_to_start_next_pending_job_execution_accepted(self, request, on_accepted):
# type: (StartNextPendingJobExecutionSubscriptionRequest, typing.Callable[[StartNextJobExecutionResponse], None]) -> concurrent.futures.Future
# type: (StartNextPendingJobExecutionSubscriptionRequest, typing.Callable[[StartNextJobExecutionResponse], None]) -> typing.Tuple[concurrent.futures.Future, str]
"""
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-startnextpendingjobexecution

Expand All @@ -272,9 +290,12 @@ def subscribe_to_start_next_pending_job_execution_accepted(self, request, on_acc
The callback should take 1 argument of type `StartNextJobExecutionResponse`.
The callback is not expected to return anything.

Returns a concurrent.futures.Future, whose result will be None if the
subscription is successful. The Future's result will be an exception
if the subscription is unsuccessful.
Returns two values immediately. The first is a `concurrent.futures.Future`
which will contain a result of `None` when the server has acknowledged
the subscription, or an exception if the subscription fails. The second
value is a topic which may be passed to `unsubscribe()` to stop
receiving messages. Note that messages may arrive before the
subscription is acknowledged.
"""
if not request.thing_name:
raise ValueError("request.thing_name is required")
Expand All @@ -288,7 +309,7 @@ def subscribe_to_start_next_pending_job_execution_accepted(self, request, on_acc
payload_to_class_fn=StartNextJobExecutionResponse.from_payload)

def subscribe_to_start_next_pending_job_execution_rejected(self, request, on_rejected):
# type: (StartNextPendingJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> concurrent.futures.Future
# type: (StartNextPendingJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> typing.Tuple[concurrent.futures.Future, str]
"""
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-startnextpendingjobexecution

Expand All @@ -298,9 +319,12 @@ def subscribe_to_start_next_pending_job_execution_rejected(self, request, on_rej
The callback should take 1 argument of type `RejectedError`.
The callback is not expected to return anything.

Returns a concurrent.futures.Future, whose result will be None if the
subscription is successful. The Future's result will be an exception
if the subscription is unsuccessful.
Returns two values immediately. The first is a `concurrent.futures.Future`
which will contain a result of `None` when the server has acknowledged
the subscription, or an exception if the subscription fails. The second
value is a topic which may be passed to `unsubscribe()` to stop
receiving messages. Note that messages may arrive before the
subscription is acknowledged.
"""
if not request.thing_name:
raise ValueError("request.thing_name is required")
Expand All @@ -314,7 +338,7 @@ def subscribe_to_start_next_pending_job_execution_rejected(self, request, on_rej
payload_to_class_fn=RejectedError.from_payload)

def subscribe_to_update_job_execution_accepted(self, request, on_accepted):
# type: (UpdateJobExecutionSubscriptionRequest, typing.Callable[[UpdateJobExecutionResponse], None]) -> concurrent.futures.Future
# type: (UpdateJobExecutionSubscriptionRequest, typing.Callable[[UpdateJobExecutionResponse], None]) -> typing.Tuple[concurrent.futures.Future, str]
"""
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-updatejobexecution

Expand All @@ -324,9 +348,12 @@ def subscribe_to_update_job_execution_accepted(self, request, on_accepted):
The callback should take 1 argument of type `UpdateJobExecutionResponse`.
The callback is not expected to return anything.

Returns a concurrent.futures.Future, whose result will be None if the
subscription is successful. The Future's result will be an exception
if the subscription is unsuccessful.
Returns two values immediately. The first is a `concurrent.futures.Future`
which will contain a result of `None` when the server has acknowledged
the subscription, or an exception if the subscription fails. The second
value is a topic which may be passed to `unsubscribe()` to stop
receiving messages. Note that messages may arrive before the
subscription is acknowledged.
"""
if not request.thing_name:
raise ValueError("request.thing_name is required")
Expand All @@ -342,7 +369,7 @@ def subscribe_to_update_job_execution_accepted(self, request, on_accepted):
payload_to_class_fn=UpdateJobExecutionResponse.from_payload)

def subscribe_to_update_job_execution_rejected(self, request, on_rejected):
# type: (UpdateJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> concurrent.futures.Future
# type: (UpdateJobExecutionSubscriptionRequest, typing.Callable[[RejectedError], None]) -> typing.Tuple[concurrent.futures.Future, str]
"""
API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/jobs-api.html#mqtt-updatejobexecution

Expand All @@ -352,9 +379,12 @@ def subscribe_to_update_job_execution_rejected(self, request, on_rejected):
The callback should take 1 argument of type `RejectedError`.
The callback is not expected to return anything.

Returns a concurrent.futures.Future, whose result will be None if the
subscription is successful. The Future's result will be an exception
if the subscription is unsuccessful.
Returns two values immediately. The first is a `concurrent.futures.Future`
which will contain a result of `None` when the server has acknowledged
the subscription, or an exception if the subscription fails. The second
value is a topic which may be passed to `unsubscribe()` to stop
receiving messages. Note that messages may arrive before the
subscription is acknowledged.
"""
if not request.thing_name:
raise ValueError("request.thing_name is required")
Expand Down
Loading