Skip to content

Commit a61fad3

Browse files
authored
React to API changes in aws-crt-python v0.2.0 (#15)
1 parent b5f0b76 commit a61fad3

File tree

5 files changed

+126
-165
lines changed

5 files changed

+126
-165
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ is provided by code that been generated from a model of the service.
2929

3030
## Build from source
3131
```
32-
git clone --branch v0.1.0 https://github.com/awslabs/aws-crt-python.git --recursive
32+
git clone --branch v0.2.0 https://github.com/awslabs/aws-crt-python.git --recursive
3333
git clone https://github.com/awslabs/aws-iot-device-sdk-python-v2.git
3434
pip install ./aws-crt-python
3535
pip install ./aws-iot-device-sdk-python-v2

awsiot/__init__.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,14 @@ def unsubscribe(self, topic):
4949
"""
5050
future = Future() # type: Future
5151
try:
52-
def on_unsuback(packet_id):
53-
future.set_result(None)
52+
def on_unsuback(unsuback_future):
53+
if unsuback_future.exception():
54+
future.set_exception(unsuback_future.exception())
55+
else:
56+
future.set_result(None)
5457

55-
self.mqtt_connection.unsubscribe(topic, on_unsuback)
58+
unsub_future = self.mqtt_connection.unsubscribe(topic)
59+
unsub_future.add_done_callback(on_unsuback)
5660

5761
except Exception as e:
5862
future.set_exception(e)
@@ -75,19 +79,23 @@ def _publish_operation(self, topic, payload):
7579
"""
7680
future = Future() # type: Future
7781
try:
78-
def on_puback(packet_id):
79-
future.set_result(None)
82+
def on_puback(puback_future):
83+
if puback_future.exception():
84+
future.set_exception(puback_future.exception())
85+
else:
86+
future.set_result(None)
8087

8188
if payload is None:
8289
payload_str = ""
8390
else:
8491
payload_str = json.dumps(payload)
8592

86-
self.mqtt_connection.publish(
93+
pub_future, _ = self.mqtt_connection.publish(
8794
topic=topic,
8895
payload=payload_str,
89-
qos=mqtt.QoS.AtLeastOnce,
90-
puback_callback=on_puback)
96+
qos=mqtt.QoS.AT_LEAST_ONCE,
97+
)
98+
pub_future.add_done_callback(on_puback)
9199

92100
except Exception as e:
93101
future.set_exception(e)
@@ -122,8 +130,11 @@ def _subscribe_operation(self, topic, callback, payload_to_class_fn):
122130

123131
future = Future() # type: Future
124132
try:
125-
def on_suback(packet_id, topic, qos):
126-
future.set_result(None)
133+
def on_suback(suback_future):
134+
if suback_future.exception():
135+
future.set_exception(suback_future.exception())
136+
else:
137+
future.set_result(None)
127138

128139
def callback_wrapper(topic, payload_str):
129140
try:
@@ -134,11 +145,12 @@ def callback_wrapper(topic, payload_str):
134145
event = None
135146
callback(event)
136147

137-
self.mqtt_connection.subscribe(
148+
sub_future, _ = self.mqtt_connection.subscribe(
138149
topic=topic,
139-
qos=mqtt.QoS.AtLeastOnce,
150+
qos=mqtt.QoS.AT_LEAST_ONCE,
140151
callback=callback_wrapper,
141-
suback_callback=on_suback)
152+
)
153+
sub_future.add_done_callback(on_suback)
142154

143155
except Exception as e:
144156
future.set_exception(e)

samples/jobs.py

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import argparse
1717
from aws_crt import io, mqtt
1818
from awsiot import iotjobs
19-
from concurrent import futures
19+
from concurrent.futures import Future
2020
import sys
2121
import threading
2222
import time
@@ -56,7 +56,6 @@
5656
parser.add_argument('--job-time', default=5, type=float, help="Emulate working on job by sleeping this many seconds.")
5757

5858
# Using globals to simplify sample code
59-
connected_future = futures.Future()
6059
is_sample_done = threading.Event()
6160

6261
mqtt_connection = None
@@ -84,7 +83,8 @@ def exit(msg_or_exception):
8483
if not locked_data.disconnect_called:
8584
print("Disconnecting...")
8685
locked_data.disconnect_called = True
87-
mqtt_connection.disconnect()
86+
future = mqtt_connection.disconnect()
87+
future.add_done_callback(on_disconnected)
8888

8989
def try_start_next_job():
9090
print("Trying to start the next job...")
@@ -113,26 +113,12 @@ def done_working_on_job():
113113
if try_again:
114114
try_start_next_job()
115115

116-
def on_connected(return_code, session_present):
117-
# type: (int, bool) -> None
118-
print("Connect completed with code: {}".format(return_code))
119-
if return_code == 0:
120-
connected_future.set_result(None)
121-
else:
122-
connected_future.set_exception(RuntimeError("Connection failed with code: {}".format(return_code)))
116+
def on_disconnected(disconnect_future):
117+
# type: (Future) -> None
118+
print("Disconnected.")
123119

124-
def on_disconnected(return_code):
125-
# type: (int) -> bool
126-
print("Disconnected with code: {}".format(return_code))
127-
with locked_data.lock:
128-
if locked_data.disconnect_called:
129-
# Signal that sample is finished
130-
is_sample_done.set()
131-
# Don't attempt to reconnect
132-
return False
133-
else:
134-
# Attempt to reconnect
135-
return True
120+
# Signal that sample is finished
121+
is_sample_done.set()
136122

137123
def on_next_job_execution_changed(event):
138124
# type: (iotjobs.NextJobExecutionChangedEvent) -> None
@@ -160,7 +146,7 @@ def on_next_job_execution_changed(event):
160146
exit(e)
161147

162148
def on_publish_start_next_pending_job_execution(future):
163-
# type: (futures.Future) -> None
149+
# type: (Future) -> None
164150
try:
165151
future.result() # raises exception if publish failed
166152

@@ -212,7 +198,7 @@ def job_thread_fn(job_id, job_document):
212198
exit(e)
213199

214200
def on_publish_update_job_execution(future):
215-
# type: (futures.Future) -> None
201+
# type: (Future) -> None
216202
try:
217203
future.result() # raises exception if publish failed
218204
print("Published request to update job.")
@@ -252,15 +238,12 @@ def on_update_job_execution_rejected(rejected):
252238
port = 443 if io.is_alpn_available() else 8883
253239
print("Connecting to {} on port {}...".format(args.endpoint, port))
254240
mqtt_connection = mqtt.Connection(
255-
client=mqtt_client,
256-
client_id=args.client_id)
257-
mqtt_connection.connect(
241+
client=mqtt_client)
242+
connected_future = mqtt_connection.connect(
243+
client_id=args.client_id,
258244
host_name = args.endpoint,
259245
port = port,
260-
on_connect=on_connected,
261-
on_disconnect=on_disconnected,
262246
use_websocket=False,
263-
alpn=None,
264247
clean_session=True,
265248
keep_alive=6000)
266249

@@ -272,6 +255,7 @@ def on_update_job_execution_rejected(rejected):
272255
# But this sample waits here so it's obvious when a connection
273256
# fails or succeeds.
274257
connected_future.result()
258+
print("Connected!")
275259

276260
try:
277261
# Subscribe to necessary topics.

samples/pubsub.py

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -41,45 +41,16 @@
4141
# Using globals to simplify sample code
4242
args = parser.parse_args()
4343

44-
connected_results = {}
45-
connected_event = threading.Event()
46-
47-
disconnect_called = False
48-
disconnected_results = {}
49-
disconnected_event = threading.Event()
50-
51-
subscribed_event = threading.Event()
52-
5344
received_count = 0
5445
received_all_event = threading.Event()
5546

56-
# Callback when connection is established.
57-
# This may be an initial connection, or a reconnect
58-
# after an accidentally losing the connection.
59-
def on_connected(return_code, session_present):
60-
if not connected_results:
61-
connected_results['return_code'] = return_code
62-
connected_results['session_present'] = session_present
63-
connected_event.set()
64-
else:
65-
print("Connection re-established!")
66-
67-
# Callback when connection is lost.
68-
# This may be due to a purposeful disconnect() call,
69-
# or it may be an accidental loss of the connection.
70-
# Return True to attempt reconnecting.
71-
def on_disconnected(return_code):
72-
if disconnect_called:
73-
disconnected_results['return_code'] = return_code
74-
disconnected_event.set()
75-
return False
76-
else:
77-
print("Connection lost. Attempting to reconnect...")
78-
return True
79-
80-
# Callback when async subscribe operation completes
81-
def on_subscribed(packet_id, topic, qos):
82-
subscribed_event.set()
47+
# Callback when connection is accidentally lost.
48+
def on_connection_interrupted(error_code):
49+
print("Connection interrupted. error_code:{}".format(error_code))
50+
51+
# Callback when an interrupted connection is re-established.
52+
def on_connection_resumed(error_code, session_present):
53+
print("Connection resumed. error_code:{} session_present:{}".format(error_code, session_present))
8354

8455
# Callback when the subscribed topic receives a message
8556
def on_message_received(topic, message):
@@ -109,29 +80,29 @@ def on_message_received(topic, message):
10980

11081
mqtt_connection = mqtt.Connection(
11182
client=mqtt_client,
112-
client_id=args.client_id)
113-
mqtt_connection.connect(
83+
on_connection_interrupted=on_connection_interrupted,
84+
on_connection_resumed=on_connection_resumed)
85+
86+
connect_future = mqtt_connection.connect(
87+
client_id=args.client_id,
11488
host_name = args.endpoint,
11589
port = port,
116-
on_connect=on_connected,
117-
on_disconnect=on_disconnected,
11890
use_websocket=False,
119-
alpn=None,
12091
clean_session=True,
12192
keep_alive=6000)
12293

123-
connected_event.wait()
124-
connected_code = connected_results['return_code']
125-
if connected_code != 0:
126-
raise RuntimeError("Connection failed with return code: {}".format(connected_code))
127-
print('Connected!')
94+
# Future.result() waits until a result is available
95+
connect_future.result()
96+
print("Connected!")
12897

12998
# Subscribe
13099
print("Subscribing to topic '{}'...".format(args.topic))
131-
mqtt_connection.subscribe(topic=args.topic, qos=1,
132-
suback_callback=on_subscribed, callback=on_message_received)
100+
subscribe_future, packet_id = mqtt_connection.subscribe(
101+
topic=args.topic,
102+
qos=mqtt.QoS.AT_LEAST_ONCE,
103+
callback=on_message_received)
133104

134-
subscribed_event.wait()
105+
subscribe_future.result()
135106
print("Subscribed!")
136107

137108
# Publish message to server desired number of times.
@@ -150,7 +121,7 @@ def on_message_received(topic, message):
150121
mqtt_connection.publish(
151122
topic=args.topic,
152123
payload=message,
153-
qos=1)
124+
qos=mqtt.QoS.AT_LEAST_ONCE)
154125
time.sleep(1)
155126
publish_count += 1
156127

@@ -164,10 +135,6 @@ def on_message_received(topic, message):
164135

165136
# Disconnect
166137
print("Disconnecting...")
167-
disconnect_called = True
168-
mqtt_connection.disconnect()
169-
disconnected_event.wait()
170-
disconnected_code = disconnected_results['return_code']
171-
if disconnected_code != 0:
172-
raise RuntimeError("Disconnected with return code: {}".format(disconnected_code))
138+
disconnect_future = mqtt_connection.disconnect()
139+
disconnect_future.result()
173140
print("Disconnected!")

0 commit comments

Comments
 (0)