Skip to content

React to API changes in aws-crt-python v0.2.0 #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ is provided by code that been generated from a model of the service.

## Build from source
```
git clone --branch v0.1.0 https://github.com/awslabs/aws-crt-python.git --recursive
git clone --branch v0.2.0 https://github.com/awslabs/aws-crt-python.git --recursive
git clone https://github.com/awslabs/aws-iot-device-sdk-python-v2.git
pip install ./aws-crt-python
pip install ./aws-iot-device-sdk-python-v2
Expand Down
38 changes: 25 additions & 13 deletions awsiot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ def unsubscribe(self, topic):
"""
future = Future() # type: Future
try:
def on_unsuback(packet_id):
future.set_result(None)
def on_unsuback(unsuback_future):
if unsuback_future.exception():
future.set_exception(unsuback_future.exception())
else:
future.set_result(None)

self.mqtt_connection.unsubscribe(topic, on_unsuback)
unsub_future = self.mqtt_connection.unsubscribe(topic)
unsub_future.add_done_callback(on_unsuback)

except Exception as e:
future.set_exception(e)
Expand All @@ -75,19 +79,23 @@ def _publish_operation(self, topic, payload):
"""
future = Future() # type: Future
try:
def on_puback(packet_id):
future.set_result(None)
def on_puback(puback_future):
if puback_future.exception():
future.set_exception(puback_future.exception())
else:
future.set_result(None)

if payload is None:
payload_str = ""
else:
payload_str = json.dumps(payload)

self.mqtt_connection.publish(
pub_future, _ = self.mqtt_connection.publish(
topic=topic,
payload=payload_str,
qos=mqtt.QoS.AtLeastOnce,
puback_callback=on_puback)
qos=mqtt.QoS.AT_LEAST_ONCE,
)
pub_future.add_done_callback(on_puback)

except Exception as e:
future.set_exception(e)
Expand Down Expand Up @@ -122,8 +130,11 @@ def _subscribe_operation(self, topic, callback, payload_to_class_fn):

future = Future() # type: Future
try:
def on_suback(packet_id, topic, qos):
future.set_result(None)
def on_suback(suback_future):
if suback_future.exception():
future.set_exception(suback_future.exception())
else:
future.set_result(None)

def callback_wrapper(topic, payload_str):
try:
Expand All @@ -134,11 +145,12 @@ def callback_wrapper(topic, payload_str):
event = None
callback(event)

self.mqtt_connection.subscribe(
sub_future, _ = self.mqtt_connection.subscribe(
topic=topic,
qos=mqtt.QoS.AtLeastOnce,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=callback_wrapper,
suback_callback=on_suback)
)
sub_future.add_done_callback(on_suback)

except Exception as e:
future.set_exception(e)
Expand Down
44 changes: 14 additions & 30 deletions samples/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import argparse
from aws_crt import io, mqtt
from awsiot import iotjobs
from concurrent import futures
from concurrent.futures import Future
import sys
import threading
import time
Expand Down Expand Up @@ -56,7 +56,6 @@
parser.add_argument('--job-time', default=5, type=float, help="Emulate working on job by sleeping this many seconds.")

# Using globals to simplify sample code
connected_future = futures.Future()
is_sample_done = threading.Event()

mqtt_connection = None
Expand Down Expand Up @@ -84,7 +83,8 @@ def exit(msg_or_exception):
if not locked_data.disconnect_called:
print("Disconnecting...")
locked_data.disconnect_called = True
mqtt_connection.disconnect()
future = mqtt_connection.disconnect()
future.add_done_callback(on_disconnected)

def try_start_next_job():
print("Trying to start the next job...")
Expand Down Expand Up @@ -113,26 +113,12 @@ def done_working_on_job():
if try_again:
try_start_next_job()

def on_connected(return_code, session_present):
# type: (int, bool) -> None
print("Connect completed with code: {}".format(return_code))
if return_code == 0:
connected_future.set_result(None)
else:
connected_future.set_exception(RuntimeError("Connection failed with code: {}".format(return_code)))
def on_disconnected(disconnect_future):
# type: (Future) -> None
print("Disconnected.")

def on_disconnected(return_code):
# type: (int) -> bool
print("Disconnected with code: {}".format(return_code))
with locked_data.lock:
if locked_data.disconnect_called:
# Signal that sample is finished
is_sample_done.set()
# Don't attempt to reconnect
return False
else:
# Attempt to reconnect
return True
# Signal that sample is finished
is_sample_done.set()

def on_next_job_execution_changed(event):
# type: (iotjobs.NextJobExecutionChangedEvent) -> None
Expand Down Expand Up @@ -160,7 +146,7 @@ def on_next_job_execution_changed(event):
exit(e)

def on_publish_start_next_pending_job_execution(future):
# type: (futures.Future) -> None
# type: (Future) -> None
try:
future.result() # raises exception if publish failed

Expand Down Expand Up @@ -212,7 +198,7 @@ def job_thread_fn(job_id, job_document):
exit(e)

def on_publish_update_job_execution(future):
# type: (futures.Future) -> None
# type: (Future) -> None
try:
future.result() # raises exception if publish failed
print("Published request to update job.")
Expand Down Expand Up @@ -252,15 +238,12 @@ def on_update_job_execution_rejected(rejected):
port = 443 if io.is_alpn_available() else 8883
print("Connecting to {} on port {}...".format(args.endpoint, port))
mqtt_connection = mqtt.Connection(
client=mqtt_client,
client_id=args.client_id)
mqtt_connection.connect(
client=mqtt_client)
connected_future = mqtt_connection.connect(
client_id=args.client_id,
host_name = args.endpoint,
port = port,
on_connect=on_connected,
on_disconnect=on_disconnected,
use_websocket=False,
alpn=None,
clean_session=True,
keep_alive=6000)

Expand All @@ -272,6 +255,7 @@ def on_update_job_execution_rejected(rejected):
# But this sample waits here so it's obvious when a connection
# fails or succeeds.
connected_future.result()
print("Connected!")

try:
# Subscribe to necessary topics.
Expand Down
79 changes: 23 additions & 56 deletions samples/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,45 +41,16 @@
# Using globals to simplify sample code
args = parser.parse_args()

connected_results = {}
connected_event = threading.Event()

disconnect_called = False
disconnected_results = {}
disconnected_event = threading.Event()

subscribed_event = threading.Event()

received_count = 0
received_all_event = threading.Event()

# Callback when connection is established.
# This may be an initial connection, or a reconnect
# after an accidentally losing the connection.
def on_connected(return_code, session_present):
if not connected_results:
connected_results['return_code'] = return_code
connected_results['session_present'] = session_present
connected_event.set()
else:
print("Connection re-established!")

# Callback when connection is lost.
# This may be due to a purposeful disconnect() call,
# or it may be an accidental loss of the connection.
# Return True to attempt reconnecting.
def on_disconnected(return_code):
if disconnect_called:
disconnected_results['return_code'] = return_code
disconnected_event.set()
return False
else:
print("Connection lost. Attempting to reconnect...")
return True

# Callback when async subscribe operation completes
def on_subscribed(packet_id, topic, qos):
subscribed_event.set()
# Callback when connection is accidentally lost.
def on_connection_interrupted(error_code):
print("Connection interrupted. error_code:{}".format(error_code))

# Callback when an interrupted connection is re-established.
def on_connection_resumed(error_code, session_present):
print("Connection resumed. error_code:{} session_present:{}".format(error_code, session_present))

# Callback when the subscribed topic receives a message
def on_message_received(topic, message):
Expand Down Expand Up @@ -109,29 +80,29 @@ def on_message_received(topic, message):

mqtt_connection = mqtt.Connection(
client=mqtt_client,
client_id=args.client_id)
mqtt_connection.connect(
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed)

connect_future = mqtt_connection.connect(
client_id=args.client_id,
host_name = args.endpoint,
port = port,
on_connect=on_connected,
on_disconnect=on_disconnected,
use_websocket=False,
alpn=None,
clean_session=True,
keep_alive=6000)

connected_event.wait()
connected_code = connected_results['return_code']
if connected_code != 0:
raise RuntimeError("Connection failed with return code: {}".format(connected_code))
print('Connected!')
# Future.result() waits until a result is available
connect_future.result()
print("Connected!")

# Subscribe
print("Subscribing to topic '{}'...".format(args.topic))
mqtt_connection.subscribe(topic=args.topic, qos=1,
suback_callback=on_subscribed, callback=on_message_received)
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=args.topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received)

subscribed_event.wait()
subscribe_future.result()
print("Subscribed!")

# Publish message to server desired number of times.
Expand All @@ -150,7 +121,7 @@ def on_message_received(topic, message):
mqtt_connection.publish(
topic=args.topic,
payload=message,
qos=1)
qos=mqtt.QoS.AT_LEAST_ONCE)
time.sleep(1)
publish_count += 1

Expand All @@ -164,10 +135,6 @@ def on_message_received(topic, message):

# Disconnect
print("Disconnecting...")
disconnect_called = True
mqtt_connection.disconnect()
disconnected_event.wait()
disconnected_code = disconnected_results['return_code']
if disconnected_code != 0:
raise RuntimeError("Disconnected with return code: {}".format(disconnected_code))
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
print("Disconnected!")
Loading