Skip to content

Commit a6e76ce

Browse files
committedJul 18, 2022
ucloud: Improved task creation and discovery handling.
1 parent 41c8186 commit a6e76ce

File tree

2 files changed

+52
-57
lines changed

2 files changed

+52
-57
lines changed
 

‎aiotcloud/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ def __init__(self, name, **kwargs):
4848
class Schedule(AIOTObject):
4949
def __init__(self, name, **kwargs):
5050
kwargs.update({("runnable", True)}) # Force task creation.
51-
kwargs.update({("interval", 1.0)}) # Will run on every second
5251
self.on_active = kwargs.pop("on_active", None)
5352
# Uncomment to allow the schedule to change in runtime.
5453
#kwargs["on_write"] = kwargs.get("on_write", lambda aiot, value: None)

‎aiotcloud/ucloud.py

Lines changed: 52 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(self, name, **kwargs):
5858
self.timestamp = timestamp()
5959
self.dtype = type(value) # NOTE: must be set before calling super
6060
callback = kwargs.pop("callback", self.senml_callback)
61-
for key in kwargs: # kwargs should be empty unless a wrong attr was used.
61+
for key in kwargs: # kwargs should be empty by now, unless a wrong attr was used.
6262
raise TypeError(f"'{self.__class__.__name__}' got an unexpected keyword argument '{key}'")
6363
super().__init__(name, value=value, callback=callback)
6464

@@ -97,11 +97,11 @@ def value(self, value):
9797
if self.dtype is type(None):
9898
self.dtype = type(value)
9999
elif not isinstance(value, self.dtype):
100-
raise TypeError(f"task: {self.name} invalid data type. Expected {self.dtype} not {type(value)}")
100+
raise TypeError(f"record: {self.name} invalid data type. Expected {self.dtype} not {type(value)}")
101101
else:
102102
self._updated = True
103103
self.timestamp = timestamp()
104-
logging.debug(f"task: {self.name} %s: {value} ts: {self.timestamp}"
104+
logging.debug(f"record: {self.name} %s: {value} ts: {self.timestamp}"
105105
%("initialized" if self.value is None else "updated"))
106106
self._value = value
107107

@@ -135,9 +135,8 @@ def add_to_pack(self, pack):
135135

136136
def senml_callback(self, record, **kwargs):
137137
"""
138-
This is called after the record is updated from the senml pack.
139-
Sets updated flag to False, to avoid sending the same value back,
140-
and schedules on_write callback on the next run.
138+
This is called after the record is updated from the cloud. Clear the updated flag to
139+
avoid sending the same value back to the cloud, and schedule the on_write callback.
141140
"""
142141
self.updated = False
143142
self.on_write_scheduled = True
@@ -153,18 +152,18 @@ async def run(self, aiot):
153152

154153
class AIOTClient():
155154
def __init__(self, device_id, ssl_params=None, server="mqtts-sa.iot.oniudra.cc", port=8883, keepalive=10):
156-
self.tasks = []
155+
self.tasks = {}
157156
self.records = {}
158157
self.thing_id = None
159158
self.keepalive = keepalive
160159
self.update_systime()
161160
self.last_ping = timestamp()
162161
self.device_topic = b"/a/d/" + device_id + b"/e/i"
163162
self.senmlpack = SenmlPack("urn:uuid:"+device_id.decode("utf-8"), self.senml_generic_callback)
164-
self.mqtt_client = MQTTClient(device_id, server, port, ssl_params, keepalive=keepalive, callback=self.mqtt_callback)
165-
# Note: the following objects are initialized by the cloud.
166-
self.register("thing_id", value=None, on_write=self.discovery_callback)
167-
self.register("tz_offset", value=None)
163+
self.mqtt = MQTTClient(device_id, server, port, ssl_params, keepalive=keepalive, callback=self.mqtt_callback)
164+
# Note: the following internal objects are initialized by the cloud.
165+
for name in ["thing_id", "tz_offset", "tz_dst_until"]:
166+
self.register(name, value=None)
168167

169168
def __getitem__(self, key):
170169
if isinstance(self.records[key].value, dict):
@@ -192,13 +191,13 @@ def update_systime(self):
192191
except Exception as e:
193192
logging.error(f"Failed to set RTC time from NTP: {e}.")
194193

195-
def create_new_task(self, coro, *args, name=None):
196-
if hasattr(asyncio.Task, "get_name"):
197-
self.tasks.append(asyncio.create_task(coro(*args), name=name))
198-
else:
199-
self.tasks.append(asyncio.create_task(coro(*args)))
194+
def create_task(self, name, coro, *args, **kwargs):
195+
self.tasks[name] = asyncio.create_task(coro(*args))
200196
logging.debug(f"task: {name} created.")
201197

198+
def create_topic(self, topic, inout):
199+
return bytes(f"/a/t/{self.thing_id}/{topic}/{inout}", "utf-8")
200+
202201
def register(self, aiotobj, **kwargs):
203202
if isinstance(aiotobj, str):
204203
if kwargs.get("value", None) is None and kwargs.get("on_read", None) is not None:
@@ -210,39 +209,21 @@ def register(self, aiotobj, **kwargs):
210209

211210
# Create a task for this object if it has any callbacks.
212211
if aiotobj.runnable:
213-
self.create_new_task(aiotobj.run, self, name=aiotobj.name)
212+
self.create_task(aiotobj.name, aiotobj.run, self)
214213

215214
# Check if object needs to be initialized from the cloud.
216215
if not aiotobj.initialized and "r:m" not in self.records:
217216
self.register("r:m", value="getLastValues")
218217

219218
def senml_generic_callback(self, record, **kwargs):
220219
"""
221-
This callback catches all unknown/umatched records.
220+
This callback catches all unknown/umatched records that were not part the pack.
222221
"""
223-
rname, sname = record.name.split(":") if ":" in record.name else [record.name, ""]
222+
rname, sname = record.name.split(":") if ":" in record.name else [record.name, None]
224223
if rname in self.records:
225224
logging.debug(f"Ignoring cloud initialization for record: {record.name}")
226225
else:
227-
logging.debug(f"Unkown record: {record.name} value: {record.value}")
228-
229-
def discovery_callback(self, aiot, thing_id):
230-
logging.info(f"Device configured via discovery protocol.")
231-
if not thing_id:
232-
raise(Exception("Device is not linked to a Thing ID."))
233-
self.thing_id = bytes(thing_id, "utf-8")
234-
self.topic_in = b"/a/t/" + self.thing_id + b"/e/i"
235-
self.topic_out = b"/a/t/" + self.thing_id + b"/e/o"
236-
logging.info(f"Subscribing to thing topic {self.topic_in}.")
237-
self.mqtt_client.subscribe(self.topic_in)
238-
239-
if lastval_record := self.records.pop("r:m", None):
240-
shadow_in = b"/a/t/" + self.thing_id + b"/shadow/i"
241-
shadow_out= b"/a/t/" + self.thing_id + b"/shadow/o"
242-
lastval_record.add_to_pack(self.senmlpack)
243-
logging.info(f"Subscribing to shadow topic {shadow_in}.")
244-
self.mqtt_client.subscribe(shadow_in)
245-
self.mqtt_client.publish(shadow_out, self.senmlpack.to_cbor(), qos=True)
226+
logging.info(f"Unkown record found: {record.name} value: {record.value}")
246227

247228
def mqtt_callback(self, topic, message):
248229
logging.debug(f"mqtt topic: {topic[-8:]}... message: {message[:8]}...")
@@ -253,9 +234,27 @@ def mqtt_callback(self, topic, message):
253234
self.senmlpack.from_cbor(message)
254235
self.senmlpack.clear()
255236

237+
async def discovery_task(self, interval=0.100):
238+
while self.thing_id is None:
239+
self.mqtt.check_msg()
240+
if self.records.get("thing_id").value is not None:
241+
self.thing_id = self.records.pop("thing_id").value
242+
if not self.thing_id: # Empty thing ID should not happen.
243+
raise(Exception("Device is not linked to a Thing ID."))
244+
245+
self.topic_out = self.create_topic("e", "o")
246+
self.mqtt.subscribe(self.create_topic("e", "i"))
247+
248+
if lastval_record := self.records.pop("r:m", None):
249+
lastval_record.add_to_pack(self.senmlpack)
250+
self.mqtt.subscribe(self.create_topic("shadow", "i"))
251+
self.mqtt.publish(self.create_topic("shadow", "o"), self.senmlpack.to_cbor(), qos=True)
252+
logging.info(f"Device configured via discovery protocol.")
253+
await asyncio.sleep(interval)
254+
256255
async def mqtt_task(self, interval=0.100):
257256
while True:
258-
self.mqtt_client.check_msg()
257+
self.mqtt.check_msg()
259258
if self.thing_id is not None:
260259
self.senmlpack.clear()
261260
for record in self.records.values():
@@ -266,43 +265,40 @@ async def mqtt_task(self, interval=0.100):
266265
if (self.debug):
267266
for record in self.senmlpack:
268267
logging.debug(f" ==> record: {record.name} value: {str(record.value)[:48]}...")
269-
self.mqtt_client.publish(self.topic_out, self.senmlpack.to_cbor(), qos=True)
268+
self.mqtt.publish(self.topic_out, self.senmlpack.to_cbor(), qos=True)
270269
self.last_ping = timestamp()
271270
elif (self.keepalive and (timestamp() - self.last_ping) > self.keepalive):
272-
self.mqtt_client.ping()
271+
self.mqtt.ping()
273272
self.last_ping = timestamp()
274273
logging.debug("No records to push, sent a ping request.")
275274
await asyncio.sleep(interval)
276275

277276
async def run(self, user_main=None, debug=False):
278277
self.debug = debug
279-
if (user_main is not None):
280-
# If user code is provided, append to tasks list.
281-
self.create_new_task(user_main, self, name="user code")
282-
283278
logging.info("Connecting to AIoT cloud...")
284-
if not self.mqtt_client.connect():
279+
if not self.mqtt.connect():
285280
logging.error("Failed to connect AIoT cloud.")
286281
return
287282

288-
logging.info("Subscribing to device topic.")
289-
self.mqtt_client.subscribe(self.device_topic)
290-
self.create_new_task(self.mqtt_task, name="mqtt")
283+
self.mqtt.subscribe(self.device_topic)
284+
if (user_main is not None):
285+
self.create_task("user_main", user_main, self)
286+
self.create_task("mqtt_task", self.mqtt_task)
287+
self.create_task("discovery", self.discovery_task)
291288

292289
while True:
293290
try:
294-
await asyncio.gather(*self.tasks, return_exceptions=False)
291+
await asyncio.gather(*self.tasks.values(), return_exceptions=False)
295292
logging.info("All tasks finished!")
296293
break
297294
except Exception as e:
298295
pass #import traceback; traceback.print_exc()
299296

300-
for task in self.tasks:
297+
for name in list(self.tasks):
298+
task = self.tasks[name]
301299
try:
302300
if task.done():
303-
self.tasks.remove(task)
304-
if hasattr(asyncio.Task, "get_name"):
305-
logging.error(f"Removed task: {task.get_name()}. Raised exception: {task.exception()}.")
306-
else:
307-
logging.error(f"Removed task.")
301+
self.tasks.pop(name)
302+
self.records.pop(name, None)
303+
logging.error(f"Removed task: {name}. Raised exception: {task.exception()}.")
308304
except (CancelledError, InvalidStateError) as e: pass

0 commit comments

Comments
 (0)
Please sign in to comment.