Skip to content

Commit 0eb4dd8

Browse files
committed
ucloud: Poll records while connecting in sync mode.
Signed-off-by: iabdalkader <[email protected]>
1 parent a8bf94c commit 0eb4dd8

File tree

1 file changed

+36
-27
lines changed

1 file changed

+36
-27
lines changed

src/arduino_iot_cloud/ucloud.py

+36-27
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def __init__(self, name, **kwargs):
6363
self._updated = False
6464
self.on_write_scheduled = False
6565
self.timestamp = timestamp()
66-
self.last_run = timestamp_ms()
66+
self.last_poll = timestamp_ms()
6767
self.runnable = any((self.on_run, self.on_read, self.on_write))
6868
callback = kwargs.pop("callback", self.senml_callback)
6969
for key in kwargs: # kwargs should be empty by now, unless a wrong attr was used.
@@ -193,7 +193,6 @@ def __init__(
193193
self.thing_id = None
194194
self.keepalive = keepalive
195195
self.last_ping = timestamp()
196-
self.last_run = timestamp()
197196
self.senmlpack = SenmlPack("", self.senml_generic_callback)
198197
self.ntp_server = ntp_server
199198
self.ntp_timeout = ntp_timeout
@@ -322,8 +321,20 @@ def mqtt_callback(self, topic, message):
322321
self.senmlpack.from_cbor(message)
323322
self.senmlpack.clear()
324323

325-
def ts_expired(self, record, ts):
326-
return (ts - record.last_run) > int(record.interval * 1000)
324+
def ts_expired(self, ts, last_ts_ms, interval_s):
325+
return last_ts_ms == 0 or (ts - last_ts_ms) > int(interval_s * 1000)
326+
327+
def poll_records(self):
328+
ts = timestamp_ms()
329+
try:
330+
for record in self.records.values():
331+
if record.runnable and self.ts_expired(ts, record.last_poll, record.interval):
332+
record.run_sync(self)
333+
record.last_poll = ts
334+
except Exception as e:
335+
self.records.pop(record.name)
336+
if log_level_enabled(logging.ERROR):
337+
logging.error(f"task: {record.name} raised exception: {str(e)}.")
327338

328339
def poll_connect(self, aiot=None):
329340
logging.info("Connecting to Arduino IoT cloud...")
@@ -332,7 +343,7 @@ def poll_connect(self, aiot=None):
332343
except Exception as e:
333344
if log_level_enabled(logging.WARNING):
334345
logging.warning(f"Connection failed {e}, retrying...")
335-
return False
346+
return
336347

337348
if self.thing_id is None:
338349
self.mqtt.subscribe(self.device_topic, qos=1)
@@ -341,10 +352,10 @@ def poll_connect(self, aiot=None):
341352

342353
if self.async_mode:
343354
if self.thing_id is None:
344-
self.register("discovery", on_run=self.poll_discovery, interval=0.100)
355+
self.register("discovery", on_run=self.poll_discovery, interval=0.200)
345356
self.register("mqtt_task", on_run=self.poll_mqtt, interval=0.100)
346357
raise DoneException()
347-
return True
358+
self.connected = True
348359

349360
def poll_discovery(self, aiot=None):
350361
self.mqtt.check_msg()
@@ -429,17 +440,26 @@ async def run(self, interval, backoff):
429440
def start(self, interval=1.0, backoff=1.2):
430441
if self.async_mode:
431442
asyncio.run(self.run(interval, backoff))
432-
else:
433-
# Synchronous mode.
434-
while not self.poll_connect():
435-
time.sleep(interval)
436-
interval = min(interval * backoff, 5.0)
443+
return
437444

438-
while self.thing_id is None:
445+
last_conn_ms = 0
446+
last_disc_ms = 0
447+
448+
while True:
449+
ts = timestamp_ms()
450+
if not self.connected and self.ts_expired(ts, last_conn_ms, interval):
451+
self.poll_connect()
452+
if last_conn_ms != 0:
453+
interval = min(interval * backoff, 5.0)
454+
last_conn_ms = ts
455+
456+
if self.connected and self.thing_id is None and self.ts_expired(ts, last_disc_ms, 0.250):
439457
self.poll_discovery()
440-
time.sleep(0.100)
458+
last_disc_ms = ts
441459

442-
self.connected = True
460+
if self.connected and self.thing_id is not None:
461+
break
462+
self.poll_records()
443463

444464
def update(self):
445465
if self.async_mode:
@@ -448,25 +468,14 @@ def update(self):
448468
if not self.connected:
449469
try:
450470
self.start()
451-
self.connected = True
452471
except Exception as e:
453472
raise e
454473

455-
try:
456-
ts = timestamp_ms()
457-
for record in self.records.values():
458-
if record.runnable and self.ts_expired(record, ts):
459-
record.run_sync(self)
460-
record.last_run = ts
461-
except Exception as e:
462-
self.records.pop(record.name)
463-
if log_level_enabled(logging.ERROR):
464-
logging.error(f"task: {record.name} raised exception: {str(e)}.")
474+
self.poll_records()
465475

466476
try:
467477
self.poll_mqtt()
468478
except Exception as e:
469479
self.connected = False
470480
if log_level_enabled(logging.WARNING):
471481
logging.warning(f"Connection lost {e}")
472-
raise e

0 commit comments

Comments
 (0)