Skip to content

Commit 4fe1db0

Browse files
authored
Use token to correlate "requests" and "responses". (#210)
**Issue:** MQTT is not a request/response protocol. Messages for the Shadow service have a "client token" field to help users correlate "request" messages to "response" messages, but this sample script wasn't using the token field. This led to confusion when running Device Advisor tests which send several `shadow/update` "request" messages during setup. The sample was receiving the `shadow/update/accepted` "response" messages unexpectedly leading to crashes. **Changes:** Set token on all "request" messages. Ignore any "response" message with an unexpected token.
1 parent 8b60066 commit 4fe1db0

File tree

1 file changed

+87
-27
lines changed

1 file changed

+87
-27
lines changed

samples/shadow.py

Lines changed: 87 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def __init__(self):
6666
self.lock = threading.Lock()
6767
self.shadow_value = None
6868
self.disconnect_called = False
69+
self.request_tokens = set()
6970

7071
locked_data = LockedData()
7172

@@ -95,9 +96,15 @@ def on_disconnected(disconnect_future):
9596
def on_get_shadow_accepted(response):
9697
# type: (iotshadow.GetShadowResponse) -> None
9798
try:
98-
print("Finished getting initial shadow state.")
99-
10099
with locked_data.lock:
100+
# check that this is a response to a request from this session
101+
try:
102+
locked_data.request_tokens.remove(response.client_token)
103+
except KeyError:
104+
print("Ignoring get_shadow_accepted message due to unexpected token.")
105+
return
106+
107+
print("Finished getting initial shadow state.")
101108
if locked_data.shadow_value is not None:
102109
print(" Ignoring initial query because a delta event has already been received.")
103110
return
@@ -126,12 +133,24 @@ def on_get_shadow_accepted(response):
126133

127134
def on_get_shadow_rejected(error):
128135
# type: (iotshadow.ErrorResponse) -> None
129-
if error.code == 404:
130-
print("Thing has no shadow document. Creating with defaults...")
131-
change_shadow_value(SHADOW_VALUE_DEFAULT)
132-
else:
133-
exit("Get request was rejected. code:{} message:'{}'".format(
134-
error.code, error.message))
136+
try:
137+
# check that this is a response to a request from this session
138+
with locked_data.lock:
139+
try:
140+
locked_data.request_tokens.remove(error.client_token)
141+
except KeyError:
142+
print("Ignoring get_shadow_rejected message due to unexpected token.")
143+
return
144+
145+
if error.code == 404:
146+
print("Thing has no shadow document. Creating with defaults...")
147+
change_shadow_value(SHADOW_VALUE_DEFAULT)
148+
else:
149+
exit("Get request was rejected. code:{} message:'{}'".format(
150+
error.code, error.message))
151+
152+
except Exception as e:
153+
exit(e)
135154

136155
def on_shadow_delta_updated(delta):
137156
# type: (iotshadow.ShadowDeltaUpdatedEvent) -> None
@@ -164,15 +183,39 @@ def on_publish_update_shadow(future):
164183
def on_update_shadow_accepted(response):
165184
# type: (iotshadow.UpdateShadowResponse) -> None
166185
try:
167-
print("Finished updating reported shadow value to '{}'.".format(response.state.reported[shadow_property])) # type: ignore
168-
print("Enter desired value: ") # remind user they can input new values
169-
except:
170-
exit("Updated shadow is missing the target property.")
186+
# check that this is a response to a request from this session
187+
with locked_data.lock:
188+
try:
189+
locked_data.request_tokens.remove(response.client_token)
190+
except KeyError:
191+
print("Ignoring update_shadow_accepted message due to unexpected token.")
192+
return
193+
194+
try:
195+
print("Finished updating reported shadow value to '{}'.".format(response.state.reported[shadow_property])) # type: ignore
196+
print("Enter desired value: ") # remind user they can input new values
197+
except:
198+
exit("Updated shadow is missing the target property.")
199+
200+
except Exception as e:
201+
exit(e)
171202

172203
def on_update_shadow_rejected(error):
173204
# type: (iotshadow.ErrorResponse) -> None
174-
exit("Update request was rejected. code:{} message:'{}'".format(
175-
error.code, error.message))
205+
try:
206+
# check that this is a response to a request from this session
207+
with locked_data.lock:
208+
try:
209+
locked_data.request_tokens.remove(error.client_token)
210+
except KeyError:
211+
print("Ignoring update_shadow_rejected message due to unexpected token.")
212+
return
213+
214+
exit("Update request was rejected. code:{} message:'{}'".format(
215+
error.code, error.message))
216+
217+
except Exception as e:
218+
exit(e)
176219

177220
def set_local_value_due_to_initial_query(reported_value):
178221
with locked_data.lock:
@@ -189,16 +232,25 @@ def change_shadow_value(value):
189232
print("Changed local shadow value to '{}'.".format(value))
190233
locked_data.shadow_value = value
191234

192-
print("Updating reported shadow value to '{}'...".format(value))
193-
request = iotshadow.UpdateShadowRequest(
194-
thing_name=thing_name,
195-
state=iotshadow.ShadowState(
196-
reported={ shadow_property: value },
197-
desired={ shadow_property: value },
235+
print("Updating reported shadow value to '{}'...".format(value))
236+
237+
# use a unique token so we can correlate this "request" message to
238+
# any "response" messages received on the /accepted and /rejected topics
239+
token = str(uuid4())
240+
241+
request = iotshadow.UpdateShadowRequest(
242+
thing_name=thing_name,
243+
state=iotshadow.ShadowState(
244+
reported={ shadow_property: value },
245+
desired={ shadow_property: value },
246+
),
247+
client_token=token,
198248
)
199-
)
200-
future = shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)
201-
future.add_done_callback(on_publish_update_shadow)
249+
future = shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)
250+
251+
locked_data.request_tokens.add(token)
252+
253+
future.add_done_callback(on_publish_update_shadow)
202254

203255
def user_input_thread_fn():
204256
while True:
@@ -318,14 +370,22 @@ def user_input_thread_fn():
318370
# Wait for subscription to succeed
319371
delta_subscribed_future.result()
320372

321-
# The rest of the sample runs asyncronously.
373+
# The rest of the sample runs asynchronously.
322374

323375
# Issue request for shadow's current state.
324376
# The response will be received by the on_get_accepted() callback
325377
print("Requesting current shadow state...")
326-
publish_get_future = shadow_client.publish_get_shadow(
327-
request=iotshadow.GetShadowRequest(thing_name=args.thing_name),
328-
qos=mqtt.QoS.AT_LEAST_ONCE)
378+
379+
with locked_data.lock:
380+
# use a unique token so we can correlate this "request" message to
381+
# any "response" messages received on the /accepted and /rejected topics
382+
token = str(uuid4())
383+
384+
publish_get_future = shadow_client.publish_get_shadow(
385+
request=iotshadow.GetShadowRequest(thing_name=args.thing_name, client_token=token),
386+
qos=mqtt.QoS.AT_LEAST_ONCE)
387+
388+
locked_data.request_tokens.add(token)
329389

330390
# Ensure that publish succeeds
331391
publish_get_future.result()

0 commit comments

Comments
 (0)