@@ -325,14 +325,26 @@ def mqtt_callback(self, topic, message):
325
325
def ts_expired (self , record , ts ):
326
326
return (ts - record .last_run ) > int (record .interval * 1000 )
327
327
328
+ def poll_records (self ):
329
+ ts = timestamp_ms ()
330
+ try :
331
+ for record in self .records .values ():
332
+ if record .runnable and self .ts_expired (record , ts ):
333
+ record .run_sync (self )
334
+ record .last_run = ts
335
+ except Exception as e :
336
+ self .records .pop (record .name )
337
+ if log_level_enabled (logging .ERROR ):
338
+ logging .error (f"task: { record .name } raised exception: { str (e )} ." )
339
+
328
340
def poll_connect (self , aiot = None ):
329
341
logging .info ("Connecting to Arduino IoT cloud..." )
330
342
try :
331
343
self .mqtt .connect ()
332
344
except Exception as e :
333
345
if log_level_enabled (logging .WARNING ):
334
346
logging .warning (f"Connection failed { e } , retrying..." )
335
- return False
347
+ return
336
348
337
349
if self .thing_id is None :
338
350
self .mqtt .subscribe (self .device_topic , qos = 1 )
@@ -341,10 +353,10 @@ def poll_connect(self, aiot=None):
341
353
342
354
if self .async_mode :
343
355
if self .thing_id is None :
344
- self .register ("discovery" , on_run = self .poll_discovery , interval = 0.100 )
356
+ self .register ("discovery" , on_run = self .poll_discovery , interval = 0.200 )
345
357
self .register ("mqtt_task" , on_run = self .poll_mqtt , interval = 0.100 )
346
358
raise DoneException ()
347
- return True
359
+ self . connected = True
348
360
349
361
def poll_discovery (self , aiot = None ):
350
362
self .mqtt .check_msg ()
@@ -429,17 +441,29 @@ async def run(self, interval, backoff):
429
441
def start (self , interval = 1.0 , backoff = 1.2 ):
430
442
if self .async_mode :
431
443
asyncio .run (self .run (interval , backoff ))
432
- else :
433
- # Synchronous mode.
434
- while not self .poll_connect ():
435
- time .sleep (interval )
436
- interval = min (interval * backoff , 5.0 )
444
+ return
445
+
446
+ def poll_needed (ts , last_poll , interval ):
447
+ return last_poll == 0 or (ts - last_poll ) > interval
437
448
438
- while self .thing_id is None :
449
+ last_conn_poll = 0
450
+ last_disc_poll = 0
451
+
452
+ while True :
453
+ ts = time .time ()
454
+ if not self .connected and poll_needed (ts , last_conn_poll , interval ):
455
+ self .poll_connect ()
456
+ if last_conn_poll != 0 :
457
+ interval = min (interval * backoff , 5.0 )
458
+ last_conn_poll = ts
459
+
460
+ if self .connected and self .thing_id is None and poll_needed (ts , last_disc_poll , 0.200 ):
439
461
self .poll_discovery ()
440
- time . sleep ( 0.100 )
462
+ last_disc_poll = ts
441
463
442
- self .connected = True
464
+ if self .connected and self .thing_id is not None :
465
+ break
466
+ self .poll_records ()
443
467
444
468
def update (self ):
445
469
if self .async_mode :
@@ -448,25 +472,14 @@ def update(self):
448
472
if not self .connected :
449
473
try :
450
474
self .start ()
451
- self .connected = True
452
475
except Exception as e :
453
476
raise e
454
477
455
- try :
456
- ts = timestamp_ms ()
457
- for record in self .records .values ():
458
- if record .runnable and self .ts_expired (record , ts ):
459
- record .run_sync (self )
460
- record .last_run = ts
461
- except Exception as e :
462
- self .records .pop (record .name )
463
- if log_level_enabled (logging .ERROR ):
464
- logging .error (f"task: { record .name } raised exception: { str (e )} ." )
478
+ self .poll_records ()
465
479
466
480
try :
467
481
self .poll_mqtt ()
468
482
except Exception as e :
469
483
self .connected = False
470
484
if log_level_enabled (logging .WARNING ):
471
485
logging .warning (f"Connection lost { e } " )
472
- raise e
0 commit comments