26
26
from kpn_senml import SenmlPack
27
27
from kpn_senml import SenmlRecord
28
28
from aiotcloud .umqtt import MQTTClient
29
+
29
30
try :
30
31
import logging
31
32
import asyncio
36
37
import uasyncio as asyncio
37
38
from aiotcloud import ntptime
38
39
from uasyncio .core import CancelledError
40
+
39
41
# MicroPython doesn't have this exception
40
42
class InvalidStateError (Exception ):
41
43
pass
42
44
45
+
43
46
def timestamp ():
44
47
return int (time .time ())
45
48
49
+
46
50
class AIOTObject (SenmlRecord ):
47
51
def __init__ (self , name , ** kwargs ):
48
- self .on_read = kwargs .pop ("on_read" , None )
52
+ self .on_read = kwargs .pop ("on_read" , None )
49
53
self .on_write = kwargs .pop ("on_write" , None )
50
54
self .interval = kwargs .pop ("interval" , 1.0 )
51
55
self ._runnable = kwargs .pop ("runnable" , False )
52
56
value = kwargs .pop ("value" , None )
53
- if keys := kwargs .pop ("keys" , {}): # Create a complex object (with sub-records).
54
- mkrec = lambda k , v : AIOTObject (f"{ name } :{ k } " , value = v , callback = self .senml_callback )
55
- value = {k : mkrec (k , v ) for (k , v ) in {k : kwargs .pop (k , None ) for k in keys }.items ()}
57
+ if keys := kwargs .pop ("keys" , {}):
58
+ value = { # Create a complex object (with sub-records).
59
+ k : AIOTObject (f"{ name } :{ k } " , value = v , callback = self .senml_callback )
60
+ for (k , v ) in {k : kwargs .pop (k , None ) for k in keys }.items ()
61
+ }
56
62
self ._updated = False
57
63
self .on_write_scheduled = False
58
64
self .timestamp = timestamp ()
59
- self .dtype = type (value ) # NOTE: must be set before calling super
60
65
callback = kwargs .pop ("callback" , self .senml_callback )
61
66
for key in kwargs : # kwargs should be empty by now, unless a wrong attr was used.
62
67
raise TypeError (f"'{ self .__class__ .__name__ } ' got an unexpected keyword argument '{ key } '" )
@@ -94,15 +99,17 @@ def runnable(self):
94
99
@SenmlRecord .value .setter
95
100
def value (self , value ):
96
101
if value is not None :
97
- if self .dtype is type ( None ) :
98
- self .dtype = type (value )
99
- elif not isinstance ( value , self . dtype ):
100
- raise TypeError ( f"record: { self .name } invalid data type. Expected { self .dtype } not { type (value )} " )
101
- else :
102
+ if self .value is not None :
103
+ if not isinstance ( self .value , type (value )):
104
+ raise TypeError (
105
+ f"record: { self .name } invalid data type. Expected { type ( self .value ) } not { type (value )} "
106
+ )
102
107
self ._updated = True
103
108
self .timestamp = timestamp ()
104
- logging .debug (f"record: { self .name } %s: { value } ts: { self .timestamp } "
105
- % ("initialized" if self .value is None else "updated" ))
109
+ logging .debug (
110
+ f"record: { self .name } %s: { value } ts: { self .timestamp } "
111
+ % ("initialized" if self .value is None else "updated" )
112
+ )
106
113
self ._value = value
107
114
108
115
def __getattr__ (self , attr ):
@@ -119,7 +126,7 @@ def __setattr__(self, name, value):
119
126
def _build_rec_dict (self , naming_map , appendTo ):
120
127
if isinstance (self .value , dict ):
121
128
for r in self .value .values ():
122
- if r .value is not None : # NOTE: should filter by updated when it's supported.
129
+ if r .value is not None : # NOTE: should filter by updated when it's supported.
123
130
r ._build_rec_dict (naming_map , appendTo )
124
131
else :
125
132
super ()._build_rec_dict (naming_map , appendTo )
@@ -128,7 +135,7 @@ def add_to_pack(self, pack):
128
135
if isinstance (self .value , dict ):
129
136
for r in self .value .values ():
130
137
# NOTE: If record value is None it can still be added to the pack for initialization.
131
- pack .add (r ) # NOTE: should filter by updated when it's supported.
138
+ pack .add (r ) # NOTE: should filter by updated when it's supported.
132
139
else :
133
140
pack .add (self )
134
141
self .updated = False
@@ -143,14 +150,15 @@ def senml_callback(self, record, **kwargs):
143
150
144
151
async def run (self , aiot ):
145
152
while True :
146
- if ( self .on_read is not None ) :
153
+ if self .on_read is not None :
147
154
self .value = self .on_read (aiot )
148
- if ( self .on_write is not None and self .on_write_scheduled ) :
155
+ if self .on_write is not None and self .on_write_scheduled :
149
156
self .on_write_scheduled = False
150
157
self .on_write (aiot , self if isinstance (self .value , dict ) else self .value )
151
158
await asyncio .sleep (self .interval )
152
159
153
- class AIOTClient ():
160
+
161
+ class AIOTClient :
154
162
def __init__ (self , device_id , ssl_params = None , server = "mqtts-sa.iot.oniudra.cc" , port = 8883 , keepalive = 10 ):
155
163
self .tasks = {}
156
164
self .records = {}
@@ -159,7 +167,7 @@ def __init__(self, device_id, ssl_params=None, server="mqtts-sa.iot.oniudra.cc",
159
167
self .update_systime ()
160
168
self .last_ping = timestamp ()
161
169
self .device_topic = b"/a/d/" + device_id + b"/e/i"
162
- self .senmlpack = SenmlPack ("urn:uuid:" + device_id .decode ("utf-8" ), self .senml_generic_callback )
170
+ self .senmlpack = SenmlPack ("urn:uuid:" + device_id .decode ("utf-8" ), self .senml_generic_callback )
163
171
self .mqtt = MQTTClient (device_id , server , port , ssl_params , keepalive = keepalive , callback = self .mqtt_callback )
164
172
# Note: the following internal objects are initialized by the cloud.
165
173
for name in ["thing_id" , "tz_offset" , "tz_dst_until" ]:
@@ -183,11 +191,8 @@ def get(self, key, default=None):
183
191
184
192
def update_systime (self ):
185
193
try :
186
- from aiotcloud import ntptime
187
194
ntptime .settime ()
188
195
logging .info ("RTC time set from NTP." )
189
- except ImportError :
190
- pass
191
196
except Exception as e :
192
197
logging .error (f"Failed to set RTC time from NTP: { e } ." )
193
198
@@ -239,8 +244,8 @@ async def discovery_task(self, interval=0.100):
239
244
self .mqtt .check_msg ()
240
245
if self .records .get ("thing_id" ).value is not None :
241
246
self .thing_id = self .records .pop ("thing_id" ).value
242
- if not self .thing_id : # Empty thing ID should not happen.
243
- raise (Exception ("Device is not linked to a Thing ID." ))
247
+ if not self .thing_id : # Empty thing ID should not happen.
248
+ raise (Exception ("Device is not linked to a Thing ID." ))
244
249
245
250
self .topic_out = self .create_topic ("e" , "o" )
246
251
self .mqtt .subscribe (self .create_topic ("e" , "i" ))
@@ -249,7 +254,7 @@ async def discovery_task(self, interval=0.100):
249
254
lastval_record .add_to_pack (self .senmlpack )
250
255
self .mqtt .subscribe (self .create_topic ("shadow" , "i" ))
251
256
self .mqtt .publish (self .create_topic ("shadow" , "o" ), self .senmlpack .to_cbor (), qos = True )
252
- logging .info (f "Device configured via discovery protocol." )
257
+ logging .info ("Device configured via discovery protocol." )
253
258
await asyncio .sleep (interval )
254
259
255
260
async def mqtt_task (self , interval = 0.100 ):
@@ -258,30 +263,28 @@ async def mqtt_task(self, interval=0.100):
258
263
if self .thing_id is not None :
259
264
self .senmlpack .clear ()
260
265
for record in self .records .values ():
261
- if ( record .updated ) :
266
+ if record .updated :
262
267
record .add_to_pack (self .senmlpack )
263
268
if len (self .senmlpack ._data ):
264
269
logging .debug ("Pushing records to AIoT Cloud:" )
265
- if (self .debug ):
266
- for record in self .senmlpack :
267
- logging .debug (f" ==> record: { record .name } value: { str (record .value )[:48 ]} ..." )
270
+ for record in self .senmlpack :
271
+ logging .debug (f" ==> record: { record .name } value: { str (record .value )[:48 ]} ..." )
268
272
self .mqtt .publish (self .topic_out , self .senmlpack .to_cbor (), qos = True )
269
273
self .last_ping = timestamp ()
270
- elif ( self .keepalive and (timestamp () - self .last_ping ) > self .keepalive ) :
274
+ elif self .keepalive and (timestamp () - self .last_ping ) > self .keepalive :
271
275
self .mqtt .ping ()
272
276
self .last_ping = timestamp ()
273
277
logging .debug ("No records to push, sent a ping request." )
274
278
await asyncio .sleep (interval )
275
-
276
- async def run (self , user_main = None , debug = False ):
277
- self .debug = debug
279
+
280
+ async def run (self , user_main = None ):
278
281
logging .info ("Connecting to AIoT cloud..." )
279
282
if not self .mqtt .connect ():
280
283
logging .error ("Failed to connect AIoT cloud." )
281
284
return
282
285
283
286
self .mqtt .subscribe (self .device_topic )
284
- if ( user_main is not None ) :
287
+ if user_main is not None :
285
288
self .create_task ("user_main" , user_main , self )
286
289
self .create_task ("mqtt_task" , self .mqtt_task )
287
290
self .create_task ("discovery" , self .discovery_task )
@@ -291,8 +294,8 @@ async def run(self, user_main=None, debug=False):
291
294
await asyncio .gather (* self .tasks .values (), return_exceptions = False )
292
295
logging .info ("All tasks finished!" )
293
296
break
294
- except Exception as e :
295
- pass # import traceback; traceback.print_exc()
297
+ except Exception :
298
+ pass # import traceback; traceback.print_exc()
296
299
297
300
for name in list (self .tasks ):
298
301
task = self .tasks [name ]
@@ -301,4 +304,5 @@ async def run(self, user_main=None, debug=False):
301
304
self .tasks .pop (name )
302
305
self .records .pop (name , None )
303
306
logging .error (f"Removed task: { name } . Raised exception: { task .exception ()} ." )
304
- except (CancelledError , InvalidStateError ) as e : pass
307
+ except (CancelledError , InvalidStateError ):
308
+ pass
0 commit comments