@@ -132,7 +132,6 @@ def _create_mqtt_client(self) -> None:
132
132
133
133
# set actions to take throughout connection lifecycle
134
134
self ._mqtts .on_connect = self ._on_connect
135
- self ._mqtts .on_message = self ._on_message
136
135
self ._mqtts .on_log = self ._on_log
137
136
self ._mqtts .on_publish = self ._on_publish
138
137
self ._mqtts .on_disconnect = self ._on_disconnect
@@ -172,7 +171,7 @@ def _on_publish(self, client, data, topic, msg_id) -> None:
172
171
self ._logger .info ("- iot_mqtt :: _on_publish :: " + str (data ) + " on topic " + str (topic ))
173
172
174
173
# pylint: disable=W0703
175
- def _handle_device_twin_update (self , msg : str , topic : str ) -> None :
174
+ def _handle_device_twin_update (self , client , topic : str , msg : str ) -> None :
176
175
self ._logger .debug ("- iot_mqtt :: _echo_desired :: " + topic )
177
176
twin = None
178
177
desired = None
@@ -213,7 +212,7 @@ def _handle_device_twin_update(self, msg: str, topic: str) -> None:
213
212
for property_name , value in desired .items ():
214
213
self ._callback .device_twin_desired_updated (property_name , value , desired_version )
215
214
216
- def _handle_direct_method (self , msg : str , topic : str ) -> None :
215
+ def _handle_direct_method (self , client , topic : str , msg : str ) -> None :
217
216
index = topic .find ("$rid=" )
218
217
method_id = 1
219
218
method_name = "None"
@@ -244,7 +243,7 @@ def _handle_direct_method(self, msg: str, topic: str) -> None:
244
243
self ._logger .info ("C2D: => " + next_topic + " with data " + ret_message + " and name => " + method_name )
245
244
self ._send_common (next_topic , ret_message )
246
245
247
- def _handle_cloud_to_device_message (self , msg : str , topic : str ) -> None :
246
+ def _handle_cloud_to_device_message (self , client , topic : str , msg : str ) -> None :
248
247
parts = topic .split ("&" )[1 :]
249
248
250
249
properties = {}
@@ -255,38 +254,6 @@ def _handle_cloud_to_device_message(self, msg: str, topic: str) -> None:
255
254
self ._callback .cloud_to_device_message_received (msg , properties )
256
255
gc .collect ()
257
256
258
- # pylint: disable=W0702, R0912
259
- def _on_message (self , client , msg_topic , payload ) -> None :
260
- topic = ""
261
- msg = None
262
-
263
- self ._logger .info ("- iot_mqtt :: _on_message" )
264
-
265
- if payload is not None :
266
- try :
267
- msg = payload .decode ("utf-8" )
268
- except :
269
- msg = str (payload )
270
-
271
- if msg_topic is not None :
272
- try :
273
- topic = msg_topic .decode ("utf-8" )
274
- except :
275
- topic = str (msg_topic )
276
-
277
- if topic .startswith ("$iothub/" ):
278
- if topic .startswith ("$iothub/twin/PATCH/properties/desired/" ) or topic .startswith ("$iothub/twin/res/200/?$rid=" ):
279
- self ._handle_device_twin_update (str (msg ), topic )
280
- elif topic .startswith ("$iothub/methods" ):
281
- self ._handle_direct_method (str (msg ), topic )
282
- else :
283
- if not topic .startswith ("$iothub/twin/res/" ): # not twin response
284
- self ._logger .error ("ERROR: unknown twin! - {}" .format (msg ))
285
- elif topic .startswith ("devices/{}/messages/devicebound" .format (self ._device_id )):
286
- self ._handle_cloud_to_device_message (str (msg ), topic )
287
- else :
288
- self ._logger .error ("ERROR: (unknown message) - {}" .format (msg ))
289
-
290
257
def _send_common (self , topic : str , data ) -> None :
291
258
# Convert data to a string
292
259
if isinstance (data , dict ):
@@ -363,13 +330,19 @@ def __init__(
363
330
self ._is_subscribed_to_twins = False
364
331
365
332
def _subscribe_to_core_topics (self ):
366
- self ._mqtts .subscribe ("devices/{}/messages/events/#" .format (self ._device_id ))
367
- self ._mqtts .subscribe ("devices/{}/messages/devicebound/#" .format (self ._device_id ))
333
+ device_bound_topic = "devices/{}/messages/devicebound/#" .format (self ._device_id )
334
+ self ._mqtts .add_topic_callback (device_bound_topic , self ._handle_cloud_to_device_message )
335
+ self ._mqtts .subscribe (device_bound_topic )
336
+
337
+ self ._mqtts .add_topic_callback ("$iothub/methods/#" , self ._handle_direct_method )
368
338
self ._mqtts .subscribe ("$iothub/methods/#" )
369
339
370
340
def _subscribe_to_twin_topics (self ):
341
+ self ._mqtts .add_topic_callback ("$iothub/twin/PATCH/properties/desired/#" , self ._handle_device_twin_update )
371
342
self ._mqtts .subscribe ("$iothub/twin/PATCH/properties/desired/#" ) # twin desired property changes
372
- self ._mqtts .subscribe ("$iothub/twin/res/#" ) # twin properties response
343
+
344
+ self ._mqtts .add_topic_callback ("$iothub/twin/res/200/#" , self ._handle_device_twin_update )
345
+ self ._mqtts .subscribe ("$iothub/twin/res/200/#" ) # twin properties response
373
346
374
347
def connect (self ) -> bool :
375
348
"""Connects to the MQTT broker
0 commit comments