@@ -35,6 +35,10 @@ def timestamp():
35
35
return int (time .time ())
36
36
37
37
38
+ def timestamp_ms ():
39
+ return time .time_ns ()// 1000000
40
+
41
+
38
42
def log_level_enabled (level ):
39
43
return logging .getLogger ().isEnabledFor (level )
40
44
@@ -43,8 +47,9 @@ class ArduinoCloudObject(SenmlRecord):
43
47
def __init__ (self , name , ** kwargs ):
44
48
self .on_read = kwargs .pop ("on_read" , None )
45
49
self .on_write = kwargs .pop ("on_write" , None )
50
+ self .on_run = kwargs .pop ("on_run" , None )
46
51
self .interval = kwargs .pop ("interval" , 1.0 )
47
- self ._runnable = kwargs .pop ("runnable " , False )
52
+ self .backoff = kwargs .pop ("backoff " , None )
48
53
value = kwargs .pop ("value" , None )
49
54
if keys := kwargs .pop ("keys" , {}):
50
55
value = { # Create a complex object (with sub-records).
@@ -54,6 +59,8 @@ def __init__(self, name, **kwargs):
54
59
self ._updated = False
55
60
self .on_write_scheduled = False
56
61
self .timestamp = timestamp ()
62
+ self .last_run = timestamp_ms ()
63
+ self .runnable = any ((self .on_run , self .on_read , self .on_write ))
57
64
callback = kwargs .pop ("callback" , self .senml_callback )
58
65
for key in kwargs : # kwargs should be empty by now, unless a wrong attr was used.
59
66
raise TypeError (f"'{ self .__class__ .__name__ } ' got an unexpected keyword argument '{ key } '" )
@@ -84,10 +91,6 @@ def initialized(self):
84
91
return all (r .initialized for r in self .value .values ())
85
92
return self .value is not None
86
93
87
- @property
88
- def runnable (self ):
89
- return self .on_read is not None or self .on_write is not None or self ._runnable
90
-
91
94
@SenmlRecord .value .setter
92
95
def value (self , value ):
93
96
if value is not None :
@@ -152,12 +155,19 @@ def senml_callback(self, record, **kwargs):
152
155
153
156
async def run (self , client ):
154
157
while True :
155
- if self .on_read is not None :
156
- self .value = self .on_read (client )
157
- if self .on_write is not None and self .on_write_scheduled :
158
- self .on_write_scheduled = False
159
- self .on_write (client , self if isinstance (self .value , dict ) else self .value )
158
+ self .run_sync (client )
160
159
await asyncio .sleep (self .interval )
160
+ if self .backoff is not None :
161
+ self .interval = min (self .interval * self .backoff , 5.0 )
162
+
163
+ def run_sync (self , client ):
164
+ if self .on_run is not None :
165
+ self .on_run (client )
166
+ if self .on_read is not None :
167
+ self .value = self .on_read (client )
168
+ if self .on_write is not None and self .on_write_scheduled :
169
+ self .on_write_scheduled = False
170
+ self .on_write (client , self if isinstance (self .value , dict ) else self .value )
161
171
162
172
163
173
class ArduinoCloudClient :
@@ -171,17 +181,20 @@ def __init__(
171
181
port = None ,
172
182
keepalive = 10 ,
173
183
ntp_server = "pool.ntp.org" ,
174
- ntp_timeout = 3
184
+ ntp_timeout = 3 ,
185
+ sync_mode = False
175
186
):
176
187
self .tasks = {}
177
188
self .records = {}
178
189
self .thing_id = None
179
190
self .keepalive = keepalive
180
191
self .last_ping = timestamp ()
192
+ self .last_run = timestamp ()
181
193
self .senmlpack = SenmlPack ("" , self .senml_generic_callback )
182
- self .started = False
183
194
self .ntp_server = ntp_server
184
195
self .ntp_timeout = ntp_timeout
196
+ self .async_mode = not sync_mode
197
+ self .connected = False
185
198
186
199
if "pin" in ssl_params :
187
200
try :
@@ -213,7 +226,7 @@ def __init__(
213
226
214
227
# Create MQTT client.
215
228
self .mqtt = MQTTClient (
216
- device_id , server , port , ssl_params , username , password , keepalive , self .mqtt_callback
229
+ device_id , server , port , ssl_params , username , password , keepalive , self .mqtt_callback
217
230
)
218
231
219
232
# Add internal objects initialized by the cloud.
@@ -252,11 +265,12 @@ def update_systime(self, server=None, timeout=None):
252
265
def create_task (self , name , coro , * args , ** kwargs ):
253
266
if callable (coro ):
254
267
coro = coro (* args )
255
- if self .started :
268
+ try :
269
+ asyncio .get_event_loop ()
256
270
self .tasks [name ] = asyncio .create_task (coro )
257
271
if log_level_enabled (logging .INFO ):
258
272
logging .info (f"task: { name } created." )
259
- else :
273
+ except Exception :
260
274
# Defer task creation until there's a running event loop.
261
275
self .tasks [name ] = coro
262
276
@@ -272,14 +286,14 @@ def register(self, aiotobj, coro=None, **kwargs):
272
286
# Register the ArduinoCloudObject
273
287
self .records [aiotobj .name ] = aiotobj
274
288
275
- # Create a task for this object if it has any callbacks.
276
- if aiotobj .runnable :
277
- self .create_task (aiotobj .name , aiotobj .run , self )
278
-
279
289
# Check if object needs to be initialized from the cloud.
280
290
if not aiotobj .initialized and "r:m" not in self .records :
281
291
self .register ("r:m" , value = "getLastValues" )
282
292
293
+ # Create a task for this object if it has any callbacks.
294
+ if self .async_mode and aiotobj .runnable :
295
+ self .create_task (aiotobj .name , aiotobj .run , self )
296
+
283
297
def senml_generic_callback (self , record , ** kwargs ):
284
298
# This callback catches all unknown/umatched sub/records that were not part of the pack.
285
299
rname , sname = record .name .split (":" ) if ":" in record .name else [record .name , None ]
@@ -303,76 +317,75 @@ def mqtt_callback(self, topic, message):
303
317
self .senmlpack .from_cbor (message )
304
318
self .senmlpack .clear ()
305
319
306
- async def discovery_task (self , interval = 0.100 ):
307
- self .mqtt .subscribe (self .device_topic , qos = 1 )
308
- while self .thing_id is None :
309
- self .mqtt .check_msg ()
310
- if self .records .get ("thing_id" ).value is not None :
311
- self .thing_id = self .records .pop ("thing_id" ).value
312
- if not self .thing_id : # Empty thing ID should not happen.
313
- raise (Exception ("Device is not linked to a Thing ID." ))
314
-
315
- self .topic_out = self .create_topic ("e" , "o" )
316
- self .mqtt .subscribe (self .create_topic ("e" , "i" ))
317
-
318
- if lastval_record := self .records .pop ("r:m" , None ):
319
- lastval_record .add_to_pack (self .senmlpack )
320
- self .mqtt .subscribe (self .create_topic ("shadow" , "i" ), qos = 1 )
321
- self .mqtt .publish (self .create_topic ("shadow" , "o" ), self .senmlpack .to_cbor (), qos = 1 )
322
- logging .info ("Device configured via discovery protocol." )
323
- await asyncio .sleep (interval )
324
- raise DoneException ()
325
-
326
- async def conn_task (self , interval = 1.0 , backoff = 1.2 ):
320
+ def ts_expired (self , record , ts ):
321
+ return (ts - record .last_run ) > int (record .interval * 1000 )
322
+
323
+ def poll_connect (self , aiot = None ):
327
324
logging .info ("Connecting to Arduino IoT cloud..." )
328
- while True :
329
- try :
330
- self .mqtt .connect ()
331
- break
332
- except Exception as e :
333
- if log_level_enabled (logging .WARNING ):
334
- logging .warning (f"Connection failed { e } , retrying after { interval } s" )
335
- await asyncio .sleep (interval )
336
- interval = min (interval * backoff , 4.0 )
325
+ try :
326
+ self .mqtt .connect ()
327
+ except Exception as e :
328
+ if log_level_enabled (logging .WARNING ):
329
+ logging .warning (f"Connection failed { e } , retrying..." )
330
+ return False
337
331
338
332
if self .thing_id is None :
339
- self .create_task ( "discovery" , self .discovery_task )
333
+ self .mqtt . subscribe ( self .device_topic , qos = 1 )
340
334
else :
341
335
self .mqtt .subscribe (self .create_topic ("e" , "i" ))
342
- self .create_task ("mqtt_task" , self .mqtt_task )
343
- raise DoneException ()
344
336
345
- async def mqtt_task (self , interval = 0.100 ):
346
- while True :
347
- self .mqtt .check_msg ()
348
- if self .thing_id is not None :
349
- self .senmlpack .clear ()
350
- for record in self .records .values ():
351
- if record .updated :
352
- record .add_to_pack (self .senmlpack , push = True )
353
- if len (self .senmlpack ._data ):
354
- logging .debug ("Pushing records to Arduino IoT cloud:" )
355
- if log_level_enabled (logging .DEBUG ):
356
- for record in self .senmlpack ._data :
357
- logging .debug (f" ==> record: { record .name } value: { str (record .value )[:48 ]} ..." )
358
- self .mqtt .publish (self .topic_out , self .senmlpack .to_cbor (), qos = 1 )
359
- self .last_ping = timestamp ()
360
- elif self .keepalive and (timestamp () - self .last_ping ) > self .keepalive :
361
- self .mqtt .ping ()
362
- self .last_ping = timestamp ()
363
- logging .debug ("No records to push, sent a ping request." )
364
- await asyncio .sleep (interval )
365
- raise DoneException ()
366
-
367
- async def run (self ):
368
- self .started = True
337
+ if self .async_mode :
338
+ if self .thing_id is None :
339
+ self .register ("discovery" , on_run = self .poll_discovery , interval = 0.100 )
340
+ self .register ("mqtt_task" , on_run = self .poll_mqtt , interval = 0.100 )
341
+ raise DoneException ()
342
+ return True
343
+
344
+ def poll_discovery (self , aiot = None ):
345
+ self .mqtt .check_msg ()
346
+ if self .records .get ("thing_id" ).value is not None :
347
+ self .thing_id = self .records .pop ("thing_id" ).value
348
+ if not self .thing_id : # Empty thing ID should not happen.
349
+ raise Exception ("Device is not linked to a Thing ID." )
350
+
351
+ self .topic_out = self .create_topic ("e" , "o" )
352
+ self .mqtt .subscribe (self .create_topic ("e" , "i" ))
353
+
354
+ if lastval_record := self .records .pop ("r:m" , None ):
355
+ lastval_record .add_to_pack (self .senmlpack )
356
+ self .mqtt .subscribe (self .create_topic ("shadow" , "i" ), qos = 1 )
357
+ self .mqtt .publish (self .create_topic ("shadow" , "o" ), self .senmlpack .to_cbor (), qos = 1 )
358
+ logging .info ("Device configured via discovery protocol." )
359
+ if self .async_mode :
360
+ raise DoneException ()
361
+
362
+ def poll_mqtt (self , aiot = None ):
363
+ self .mqtt .check_msg ()
364
+ if self .thing_id is not None :
365
+ self .senmlpack .clear ()
366
+ for record in self .records .values ():
367
+ if record .updated :
368
+ record .add_to_pack (self .senmlpack , push = True )
369
+ if len (self .senmlpack ._data ):
370
+ logging .debug ("Pushing records to Arduino IoT cloud:" )
371
+ if log_level_enabled (logging .DEBUG ):
372
+ for record in self .senmlpack ._data :
373
+ logging .debug (f" ==> record: { record .name } value: { str (record .value )[:48 ]} ..." )
374
+ self .mqtt .publish (self .topic_out , self .senmlpack .to_cbor (), qos = 1 )
375
+ self .last_ping = timestamp ()
376
+ elif self .keepalive and (timestamp () - self .last_ping ) > self .keepalive :
377
+ self .mqtt .ping ()
378
+ self .last_ping = timestamp ()
379
+ logging .debug ("No records to push, sent a ping request." )
380
+
381
+ async def run (self , interval , backoff ):
369
382
# Creates tasks from coros here manually before calling
370
383
# gather, so we can keep track of tasks in self.tasks dict.
371
384
for name , coro in self .tasks .items ():
372
385
self .create_task (name , coro )
373
386
374
387
# Create connection task.
375
- self .create_task ( "conn_task " , self .conn_task )
388
+ self .register ( "connection_task " , on_run = self .poll_connect , interval = interval , backoff = backoff )
376
389
377
390
while True :
378
391
task_except = None
@@ -394,10 +407,57 @@ async def run(self):
394
407
elif task_except is not None and log_level_enabled (logging .ERROR ):
395
408
logging .error (f"task: { name } raised exception: { str (task_except )} ." )
396
409
if name == "mqtt_task" :
397
- self .create_task ("conn_task" , self .conn_task )
410
+ self .register (
411
+ "connection_task" ,
412
+ on_run = self .poll_connect ,
413
+ interval = interval ,
414
+ backoff = backoff
415
+ )
398
416
break # Break after the first task is removed.
399
417
except (CancelledError , InvalidStateError ):
400
418
pass
401
419
402
- def start (self ):
403
- asyncio .run (self .run ())
420
+ def start (self , interval = 1.0 , backoff = 1.2 ):
421
+ if self .async_mode :
422
+ asyncio .run (self .run (interval , backoff ))
423
+ else :
424
+ # Synchronous mode.
425
+ while not self .poll_connect ():
426
+ time .sleep (interval )
427
+ interval = min (interval * backoff , 5.0 )
428
+
429
+ while self .thing_id is None :
430
+ self .poll_discovery ()
431
+ time .sleep (0.100 )
432
+
433
+ self .connected = True
434
+
435
+ def update (self ):
436
+ if self .async_mode :
437
+ raise RuntimeError ("This function can't be called in asyncio mode." )
438
+
439
+ if not self .connected :
440
+ try :
441
+ self .start ()
442
+ self .connected = True
443
+ except Exception as e :
444
+ raise e
445
+
446
+ try :
447
+ ts = timestamp_ms ()
448
+ for record in self .records .values ():
449
+ if record .runnable and self .ts_expired (record , ts ):
450
+ record .run_sync (self )
451
+ record .last_run = ts
452
+ except Exception as e :
453
+ self .records .pop (record .name )
454
+ if log_level_enabled (logging .ERROR ):
455
+ logging .error (f"task: { record .name } raised exception: { str (e )} ." )
456
+
457
+ try :
458
+ self .poll_mqtt ()
459
+ except Exception as e :
460
+ self .connected = False
461
+ if log_level_enabled (logging .WARNING ):
462
+ logging .warning (f"Connection lost { e } " )
463
+ raise e
0 commit comments