Skip to content

Commit 1dc1891

Browse files
authored
Merge pull request #32 from bcmi-labs/reconnects
ucloud: Restart MQTT tasks on disconnects.
2 parents 5ed94d3 + c03c4f5 commit 1dc1891

File tree

3 files changed

+46
-30
lines changed

3 files changed

+46
-30
lines changed

.github/workflows/python-linter.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,4 @@ jobs:
3737
run: |
3838
# stop the build if there are Python syntax errors or undefined names
3939
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
40-
flake8 . --count --max-complexity=10 --max-line-length=120 --statistics
40+
flake8 . --count --max-complexity=15 --max-line-length=120 --statistics

arduino_iot_cloud/ucloud.py

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ class InvalidStateError(Exception):
4949
_DEFAULT_PORT = (8883, 8884)
5050

5151

52+
class DoneException(Exception):
53+
pass
54+
55+
5256
def timestamp():
5357
return int(time.time())
5458

@@ -278,12 +282,16 @@ def mqtt_callback(self, topic, message):
278282
logging.debug(f"mqtt topic: {topic[-8:]}... message: {message[:8]}...")
279283
self.senmlpack.clear()
280284
for record in self.records.values():
285+
# If the object is uninitialized, updates are always allowed even if it's a read-only
286+
# object. Otherwise, for initialized objects, updates are only allowed if the object
287+
# is writable (on_write function is set) and the value is received from the out topic.
281288
if not record.initialized or (record.on_write is not None and b"shadow" not in topic):
282289
record.add_to_pack(self.senmlpack)
283290
self.senmlpack.from_cbor(message)
284291
self.senmlpack.clear()
285292

286293
async def discovery_task(self, interval=0.100):
294+
self.mqtt.subscribe(self.device_topic, qos=1)
287295
while self.thing_id is None:
288296
self.mqtt.check_msg()
289297
if self.records.get("thing_id").value is not None:
@@ -300,6 +308,25 @@ async def discovery_task(self, interval=0.100):
300308
self.mqtt.publish(self.create_topic("shadow", "o"), self.senmlpack.to_cbor(), qos=1)
301309
logging.info("Device configured via discovery protocol.")
302310
await asyncio.sleep(interval)
311+
raise DoneException()
312+
313+
async def conn_task(self, interval=1.0, backoff=1.2):
314+
logging.info("Connecting to Arduino IoT cloud...")
315+
while True:
316+
try:
317+
self.mqtt.connect()
318+
break
319+
except Exception as e:
320+
logging.warning(f"Connection failed {e}, retrying after {interval}s")
321+
await asyncio.sleep(interval)
322+
interval = min(interval * backoff, 4.0)
323+
324+
if self.thing_id is None:
325+
self.create_task("discovery", self.discovery_task)
326+
else:
327+
self.mqtt.subscribe(self.create_topic("e", "i"))
328+
self.create_task("mqtt_task", self.mqtt_task)
329+
raise DoneException()
303330

304331
async def mqtt_task(self, interval=0.100):
305332
while True:
@@ -320,34 +347,33 @@ async def mqtt_task(self, interval=0.100):
320347
self.last_ping = timestamp()
321348
logging.debug("No records to push, sent a ping request.")
322349
await asyncio.sleep(interval)
350+
raise DoneException()
323351

324352
async def run(self, user_main=None):
325-
logging.info("Connecting to Arduino IoT cloud...")
326-
if not self.mqtt.connect():
327-
logging.error("Failed to connect Arduino IoT cloud.")
328-
return
329-
330-
self.mqtt.subscribe(self.device_topic, qos=1)
353+
self.create_task("conn_task", self.conn_task)
331354
if user_main is not None:
332355
self.create_task("user_main", user_main, self)
333-
self.create_task("mqtt_task", self.mqtt_task)
334-
self.create_task("discovery", self.discovery_task)
335356

336357
while True:
358+
task_except = None
337359
try:
338360
await asyncio.gather(*self.tasks.values(), return_exceptions=False)
339-
logging.info("All tasks finished!")
340-
break
361+
break # All tasks are done, not likely.
341362
except Exception as e:
342-
except_msg = str(e)
343-
pass # import traceback; traceback.print_exc()
363+
task_except = e
364+
pass # import traceback; traceback.print_exc()
344365

345366
for name in list(self.tasks):
346367
task = self.tasks[name]
347368
try:
348369
if task.done():
349370
self.tasks.pop(name)
350371
self.records.pop(name, None)
351-
logging.error(f"Removed task: {name}. Raised exception: {except_msg}.")
372+
if isinstance(task_except, DoneException):
373+
logging.error(f"task: {name} complete.")
374+
elif task_except is not None:
375+
logging.error(f"task: {name} raised exception: {str(task_except)}.")
376+
if name == "mqtt_task":
377+
self.create_task("conn_task", self.conn_task)
352378
except (CancelledError, InvalidStateError):
353379
pass

arduino_iot_cloud/umqtt.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
#
2323
# Based on: https://github.com/micropython/micropython-lib/tree/master/micropython/umqtt.simple
2424

25-
import time
26-
2725
try:
2826
from ussl import wrap_socket
2927
import usocket as socket
@@ -92,8 +90,13 @@ def set_last_will(self, topic, msg, retain=False, qos=0):
9290
self.lw_qos = qos
9391
self.lw_retain = retain
9492

95-
def _connect(self, clean_session=True):
93+
def connect(self, clean_session=True):
9694
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
95+
96+
if self.sock is not None:
97+
self.sock.close()
98+
self.sock = None
99+
97100
try:
98101
self.sock = socket.socket()
99102
self.sock = wrap_socket(self.sock, **self.ssl_params)
@@ -130,7 +133,6 @@ def _connect(self, clean_session=True):
130133

131134
self.sock.write(premsg[0:i + 2])
132135
self.sock.write(msg)
133-
# print(hex(len(msg)), hexlify(msg, ":"))
134136
self._send_str(self.client_id)
135137
if self.lw_topic:
136138
self._send_str(self.lw_topic)
@@ -144,18 +146,6 @@ def _connect(self, clean_session=True):
144146
raise MQTTException(resp[3])
145147
return resp[2] & 1
146148

147-
def connect(self, retry=10, interval=1.0, clean_session=True):
148-
for i in range(0, retry):
149-
try:
150-
self._connect(clean_session)
151-
return True
152-
except Exception as e:
153-
if self.sock is not None:
154-
self.sock.close()
155-
logging.warning(f"Connection failed {e}, retrying after {interval}s")
156-
time.sleep(interval)
157-
return False
158-
159149
def disconnect(self):
160150
self.sock.write(b"\xe0\0")
161151
self.sock.close()

0 commit comments

Comments
 (0)