diff --git a/examples/example.py b/examples/example.py index 8c23137..62f3b71 100644 --- a/examples/example.py +++ b/examples/example.py @@ -44,6 +44,7 @@ def user_task(client): # Parse command line args. parser = argparse.ArgumentParser(description="arduino_iot_cloud.py") parser.add_argument("-d", "--debug", action="store_true", help="Enable debugging messages") + parser.add_argument("-s", "--sync", action="store_true", help="Run in synchronous mode") args = parser.parse_args() # Assume the host has an active Internet connection. @@ -60,7 +61,7 @@ def user_task(client): # Create a client object to connect to the Arduino IoT cloud. # The most basic authentication method uses a username and password. The username is the device # ID, and the password is the secret key obtained from the IoT cloud when provisioning a device. - client = ArduinoCloudClient(device_id=DEVICE_ID, username=DEVICE_ID, password=SECRET_KEY) + client = ArduinoCloudClient(device_id=DEVICE_ID, username=DEVICE_ID, password=SECRET_KEY, sync_mode=args.sync) # Alternatively, the client also supports key and certificate-based authentication. To use this # mode, set "keyfile" and "certfile", and the CA certificate (if any) in "ssl_params". @@ -73,6 +74,7 @@ def user_task(client): # "keyfile": KEY_PATH, "certfile": CERT_PATH, "cafile": CA_PATH, # "verify_mode": ssl.CERT_REQUIRED, "server_hostname" : "iot.arduino.cc" # }, + # sync_mode=args.sync, # ) # Register cloud objects. @@ -107,5 +109,11 @@ def user_task(client): # to client.register(). client.register(Task("user_task", on_run=user_task, interval=1.0)) - # Start the Arduino IoT cloud client. + # Start the Arduino IoT cloud client. In synchronous mode, this function returns immediately + # after connecting to the cloud. client.start() + + # In sync mode, start returns after connecting, and the client must be polled periodically. + while True: + client.update() + time.sleep(0.100) diff --git a/examples/micropython_basic.py b/examples/micropython_basic.py index 50eff0c..c2871de 100644 --- a/examples/micropython_basic.py +++ b/examples/micropython_basic.py @@ -66,7 +66,7 @@ def wifi_connect(): logging.basicConfig( datefmt="%H:%M:%S", format="%(asctime)s.%(msecs)03d %(message)s", - level=logging.INFO, + level=logging.DEBUG, ) # NOTE: Add networking code here or in boot.py @@ -75,7 +75,7 @@ def wifi_connect(): # Create a client object to connect to the Arduino IoT cloud. # The most basic authentication method uses a username and password. The username is the device # ID, and the password is the secret key obtained from the IoT cloud when provisioning a device. - client = ArduinoCloudClient(device_id=DEVICE_ID, username=DEVICE_ID, password=SECRET_KEY) + client = ArduinoCloudClient(device_id=DEVICE_ID, username=DEVICE_ID, password=SECRET_KEY, sync_mode=False) # Alternatively, the client also supports key and certificate-based authentication. To use this # mode, set "keyfile" and "certfile", and the CA certificate (if any) in "ssl_params". @@ -86,6 +86,7 @@ def wifi_connect(): # "keyfile": KEY_PATH, "certfile": CERT_PATH, "cadata": CADATA, # "verify_mode": ssl.CERT_REQUIRED, "server_hostname" : "iot.arduino.cc" # }, + # sync_mode=False, # ) # Register cloud objects. @@ -133,5 +134,11 @@ def wifi_connect(): except (ImportError, AttributeError): pass - # Start the Arduino IoT cloud client. + # Start the Arduino IoT cloud client. In synchronous mode, this function returns immediately + # after connecting to the cloud. client.start() + + # In sync mode, start returns after connecting, and the client must be polled periodically. + while True: + client.update() + time.sleep(0.100) diff --git a/src/arduino_iot_cloud/__init__.py b/src/arduino_iot_cloud/__init__.py index 04e09cb..02661c8 100644 --- a/src/arduino_iot_cloud/__init__.py +++ b/src/arduino_iot_cloud/__init__.py @@ -4,10 +4,10 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -import asyncio import binascii from .ucloud import ArduinoCloudClient # noqa from .ucloud import ArduinoCloudObject +from .ucloud import ArduinoCloudObject as Task # noqa from .ucloud import timestamp @@ -30,33 +30,6 @@ b"8d6444ffe82217304ff2b89aafca8ecf" ) -async def coro(): # noqa - pass - - -def is_async(obj): - if hasattr(asyncio, "iscoroutinefunction"): - return asyncio.iscoroutinefunction(obj) - else: - return isinstance(obj, type(coro)) - - -class Task(ArduinoCloudObject): - def __init__(self, name, **kwargs): - kwargs.update({("runnable", True)}) # Force task creation. - self.on_run = kwargs.pop("on_run", None) - if not callable(self.on_run): - raise TypeError("Expected a callable object") - super().__init__(name, **kwargs) - - async def run(self, aiot): - if is_async(self.on_run): - await self.on_run(aiot) - else: - while True: - self.on_run(aiot) - await asyncio.sleep(self.interval) - class Location(ArduinoCloudObject): def __init__(self, name, **kwargs): @@ -80,24 +53,22 @@ def __init__(self, name, **kwargs): class Schedule(ArduinoCloudObject): def __init__(self, name, **kwargs): - kwargs.update({("runnable", True)}) # Force task creation. + kwargs.update({("on_run", self.on_run)}) self.on_active = kwargs.pop("on_active", None) # Uncomment to allow the schedule to change in runtime. # kwargs["on_write"] = kwargs.get("on_write", lambda aiot, value: None) self.active = False super().__init__(name, keys={"frm", "to", "len", "msk"}, **kwargs) - async def run(self, aiot): - while True: - if self.initialized: - ts = timestamp() + aiot.get("tz_offset", 0) - if ts > self.frm and ts < (self.frm + self.len): - if not self.active and self.on_active is not None: - self.on_active(aiot, self.value) - self.active = True - else: - self.active = False - await asyncio.sleep(self.interval) + def on_run(self, aiot): + if self.initialized: + ts = timestamp() + aiot.get("tz_offset", 0) + if ts > self.frm and ts < (self.frm + self.len): + if not self.active and self.on_active is not None: + self.on_active(aiot, self.value) + self.active = True + else: + self.active = False class Television(ArduinoCloudObject): diff --git a/src/arduino_iot_cloud/ucloud.py b/src/arduino_iot_cloud/ucloud.py index cff1933..2ff31cc 100644 --- a/src/arduino_iot_cloud/ucloud.py +++ b/src/arduino_iot_cloud/ucloud.py @@ -35,6 +35,10 @@ def timestamp(): return int(time.time()) +def timestamp_ms(): + return time.time_ns()//1000000 + + def log_level_enabled(level): return logging.getLogger().isEnabledFor(level) @@ -43,8 +47,9 @@ class ArduinoCloudObject(SenmlRecord): def __init__(self, name, **kwargs): self.on_read = kwargs.pop("on_read", None) self.on_write = kwargs.pop("on_write", None) + self.on_run = kwargs.pop("on_run", None) self.interval = kwargs.pop("interval", 1.0) - self._runnable = kwargs.pop("runnable", False) + self.backoff = kwargs.pop("backoff", None) value = kwargs.pop("value", None) if keys := kwargs.pop("keys", {}): value = { # Create a complex object (with sub-records). @@ -54,6 +59,8 @@ def __init__(self, name, **kwargs): self._updated = False self.on_write_scheduled = False self.timestamp = timestamp() + self.last_run = timestamp_ms() + self.runnable = any((self.on_run, self.on_read, self.on_write)) callback = kwargs.pop("callback", self.senml_callback) for key in kwargs: # kwargs should be empty by now, unless a wrong attr was used. raise TypeError(f"'{self.__class__.__name__}' got an unexpected keyword argument '{key}'") @@ -84,10 +91,6 @@ def initialized(self): return all(r.initialized for r in self.value.values()) return self.value is not None - @property - def runnable(self): - return self.on_read is not None or self.on_write is not None or self._runnable - @SenmlRecord.value.setter def value(self, value): if value is not None: @@ -152,12 +155,19 @@ def senml_callback(self, record, **kwargs): async def run(self, client): while True: - if self.on_read is not None: - self.value = self.on_read(client) - if self.on_write is not None and self.on_write_scheduled: - self.on_write_scheduled = False - self.on_write(client, self if isinstance(self.value, dict) else self.value) + self.run_sync(client) await asyncio.sleep(self.interval) + if self.backoff is not None: + self.interval = min(self.interval * self.backoff, 5.0) + + def run_sync(self, client): + if self.on_run is not None: + self.on_run(client) + if self.on_read is not None: + self.value = self.on_read(client) + if self.on_write is not None and self.on_write_scheduled: + self.on_write_scheduled = False + self.on_write(client, self if isinstance(self.value, dict) else self.value) class ArduinoCloudClient: @@ -171,17 +181,20 @@ def __init__( port=None, keepalive=10, ntp_server="pool.ntp.org", - ntp_timeout=3 + ntp_timeout=3, + sync_mode=False ): self.tasks = {} self.records = {} self.thing_id = None self.keepalive = keepalive self.last_ping = timestamp() + self.last_run = timestamp() self.senmlpack = SenmlPack("", self.senml_generic_callback) - self.started = False self.ntp_server = ntp_server self.ntp_timeout = ntp_timeout + self.async_mode = not sync_mode + self.connected = False if "pin" in ssl_params: try: @@ -213,7 +226,7 @@ def __init__( # Create MQTT client. self.mqtt = MQTTClient( - device_id, server, port, ssl_params, username, password, keepalive, self.mqtt_callback + device_id, server, port, ssl_params, username, password, keepalive, self.mqtt_callback ) # Add internal objects initialized by the cloud. @@ -252,11 +265,12 @@ def update_systime(self, server=None, timeout=None): def create_task(self, name, coro, *args, **kwargs): if callable(coro): coro = coro(*args) - if self.started: + try: + asyncio.get_event_loop() self.tasks[name] = asyncio.create_task(coro) if log_level_enabled(logging.INFO): logging.info(f"task: {name} created.") - else: + except Exception: # Defer task creation until there's a running event loop. self.tasks[name] = coro @@ -272,14 +286,14 @@ def register(self, aiotobj, coro=None, **kwargs): # Register the ArduinoCloudObject self.records[aiotobj.name] = aiotobj - # Create a task for this object if it has any callbacks. - if aiotobj.runnable: - self.create_task(aiotobj.name, aiotobj.run, self) - # Check if object needs to be initialized from the cloud. if not aiotobj.initialized and "r:m" not in self.records: self.register("r:m", value="getLastValues") + # Create a task for this object if it has any callbacks. + if self.async_mode and aiotobj.runnable: + self.create_task(aiotobj.name, aiotobj.run, self) + def senml_generic_callback(self, record, **kwargs): # This callback catches all unknown/umatched sub/records that were not part of the pack. rname, sname = record.name.split(":") if ":" in record.name else [record.name, None] @@ -303,76 +317,75 @@ def mqtt_callback(self, topic, message): self.senmlpack.from_cbor(message) self.senmlpack.clear() - async def discovery_task(self, interval=0.100): - self.mqtt.subscribe(self.device_topic, qos=1) - while self.thing_id is None: - self.mqtt.check_msg() - if self.records.get("thing_id").value is not None: - self.thing_id = self.records.pop("thing_id").value - if not self.thing_id: # Empty thing ID should not happen. - raise (Exception("Device is not linked to a Thing ID.")) - - self.topic_out = self.create_topic("e", "o") - self.mqtt.subscribe(self.create_topic("e", "i")) - - if lastval_record := self.records.pop("r:m", None): - lastval_record.add_to_pack(self.senmlpack) - self.mqtt.subscribe(self.create_topic("shadow", "i"), qos=1) - self.mqtt.publish(self.create_topic("shadow", "o"), self.senmlpack.to_cbor(), qos=1) - logging.info("Device configured via discovery protocol.") - await asyncio.sleep(interval) - raise DoneException() - - async def conn_task(self, interval=1.0, backoff=1.2): + def ts_expired(self, record, ts): + return (ts - record.last_run) > int(record.interval * 1000) + + def poll_connect(self, aiot=None): logging.info("Connecting to Arduino IoT cloud...") - while True: - try: - self.mqtt.connect() - break - except Exception as e: - if log_level_enabled(logging.WARNING): - logging.warning(f"Connection failed {e}, retrying after {interval}s") - await asyncio.sleep(interval) - interval = min(interval * backoff, 4.0) + try: + self.mqtt.connect() + except Exception as e: + if log_level_enabled(logging.WARNING): + logging.warning(f"Connection failed {e}, retrying...") + return False if self.thing_id is None: - self.create_task("discovery", self.discovery_task) + self.mqtt.subscribe(self.device_topic, qos=1) else: self.mqtt.subscribe(self.create_topic("e", "i")) - self.create_task("mqtt_task", self.mqtt_task) - raise DoneException() - async def mqtt_task(self, interval=0.100): - while True: - self.mqtt.check_msg() - if self.thing_id is not None: - self.senmlpack.clear() - for record in self.records.values(): - if record.updated: - record.add_to_pack(self.senmlpack, push=True) - if len(self.senmlpack._data): - logging.debug("Pushing records to Arduino IoT cloud:") - if log_level_enabled(logging.DEBUG): - for record in self.senmlpack._data: - logging.debug(f" ==> record: {record.name} value: {str(record.value)[:48]}...") - self.mqtt.publish(self.topic_out, self.senmlpack.to_cbor(), qos=1) - self.last_ping = timestamp() - elif self.keepalive and (timestamp() - self.last_ping) > self.keepalive: - self.mqtt.ping() - self.last_ping = timestamp() - logging.debug("No records to push, sent a ping request.") - await asyncio.sleep(interval) - raise DoneException() - - async def run(self): - self.started = True + if self.async_mode: + if self.thing_id is None: + self.register("discovery", on_run=self.poll_discovery, interval=0.100) + self.register("mqtt_task", on_run=self.poll_mqtt, interval=0.100) + raise DoneException() + return True + + def poll_discovery(self, aiot=None): + self.mqtt.check_msg() + if self.records.get("thing_id").value is not None: + self.thing_id = self.records.pop("thing_id").value + if not self.thing_id: # Empty thing ID should not happen. + raise Exception("Device is not linked to a Thing ID.") + + self.topic_out = self.create_topic("e", "o") + self.mqtt.subscribe(self.create_topic("e", "i")) + + if lastval_record := self.records.pop("r:m", None): + lastval_record.add_to_pack(self.senmlpack) + self.mqtt.subscribe(self.create_topic("shadow", "i"), qos=1) + self.mqtt.publish(self.create_topic("shadow", "o"), self.senmlpack.to_cbor(), qos=1) + logging.info("Device configured via discovery protocol.") + if self.async_mode: + raise DoneException() + + def poll_mqtt(self, aiot=None): + self.mqtt.check_msg() + if self.thing_id is not None: + self.senmlpack.clear() + for record in self.records.values(): + if record.updated: + record.add_to_pack(self.senmlpack, push=True) + if len(self.senmlpack._data): + logging.debug("Pushing records to Arduino IoT cloud:") + if log_level_enabled(logging.DEBUG): + for record in self.senmlpack._data: + logging.debug(f" ==> record: {record.name} value: {str(record.value)[:48]}...") + self.mqtt.publish(self.topic_out, self.senmlpack.to_cbor(), qos=1) + self.last_ping = timestamp() + elif self.keepalive and (timestamp() - self.last_ping) > self.keepalive: + self.mqtt.ping() + self.last_ping = timestamp() + logging.debug("No records to push, sent a ping request.") + + async def run(self, interval, backoff): # Creates tasks from coros here manually before calling # gather, so we can keep track of tasks in self.tasks dict. for name, coro in self.tasks.items(): self.create_task(name, coro) # Create connection task. - self.create_task("conn_task", self.conn_task) + self.register("connection_task", on_run=self.poll_connect, interval=interval, backoff=backoff) while True: task_except = None @@ -394,10 +407,57 @@ async def run(self): elif task_except is not None and log_level_enabled(logging.ERROR): logging.error(f"task: {name} raised exception: {str(task_except)}.") if name == "mqtt_task": - self.create_task("conn_task", self.conn_task) + self.register( + "connection_task", + on_run=self.poll_connect, + interval=interval, + backoff=backoff + ) break # Break after the first task is removed. except (CancelledError, InvalidStateError): pass - def start(self): - asyncio.run(self.run()) + def start(self, interval=1.0, backoff=1.2): + if self.async_mode: + asyncio.run(self.run(interval, backoff)) + else: + # Synchronous mode. + while not self.poll_connect(): + time.sleep(interval) + interval = min(interval * backoff, 5.0) + + while self.thing_id is None: + self.poll_discovery() + time.sleep(0.100) + + self.connected = True + + def update(self): + if self.async_mode: + raise RuntimeError("This function can't be called in asyncio mode.") + + if not self.connected: + try: + self.start() + self.connected = True + except Exception as e: + raise e + + try: + ts = timestamp_ms() + for record in self.records.values(): + if record.runnable and self.ts_expired(record, ts): + record.run_sync(self) + record.last_run = ts + except Exception as e: + self.records.pop(record.name) + if log_level_enabled(logging.ERROR): + logging.error(f"task: {record.name} raised exception: {str(e)}.") + + try: + self.poll_mqtt() + except Exception as e: + self.connected = False + if log_level_enabled(logging.WARNING): + logging.warning(f"Connection lost {e}") + raise e