diff --git a/.github/workflows/client-test.yml b/.github/workflows/client-test.yml index f91a95d..2e08c61 100644 --- a/.github/workflows/client-test.yml +++ b/.github/workflows/client-test.yml @@ -71,14 +71,21 @@ jobs: run: | source tests/ci.sh && ci_configure_softhsm - - name: '☁️ Connect to IoT cloud (CPython / Basic Auth)' + - name: '☁️ Connect to IoT cloud (CPython / Basic Auth / Async)' env: DEVICE_ID: ${{ secrets.DEVICE_ID1 }} SECRET_KEY: ${{ secrets.SECRET_KEY }} run: | python tests/ci.py --basic-auth - - name: '☁️ Connect to IoT cloud (CPython / Key/Cert Auth)' + - name: '☁️ Connect to IoT cloud (CPython / Basic Auth / Sync)' + env: + DEVICE_ID: ${{ secrets.DEVICE_ID1 }} + SECRET_KEY: ${{ secrets.SECRET_KEY }} + run: | + python tests/ci.py --basic-auth --sync + + - name: '☁️ Connect to IoT cloud (CPython / Key-Cert Auth / Async)' env: DEVICE_ID: ${{ secrets.DEVICE_ID2 }} SECRET_KEY: ${{ secrets.SECRET_KEY }} @@ -86,7 +93,7 @@ jobs: python tests/ci.py --file-auth - - name: '☁️ Connect to IoT cloud (CPython / Crypto Auth)' + - name: '☁️ Connect to IoT cloud (CPython / Crypto Auth / Async)' env: DEVICE_ID: ${{ secrets.DEVICE_ID2 }} SECRET_KEY: ${{ secrets.SECRET_KEY }} @@ -94,7 +101,7 @@ jobs: export SOFTHSM2_CONF="${HOME}/softhsm/tokens/softhsm2.conf" python tests/ci.py --crypto-device - - name: '☁️ Connect to IoT cloud (MicroPython / Basic Auth)' + - name: '☁️ Connect to IoT cloud (MicroPython / Basic Auth / Async)' env: DEVICE_ID: ${{ secrets.DEVICE_ID1 }} SECRET_KEY: ${{ secrets.SECRET_KEY }} @@ -102,3 +109,12 @@ jobs: export PATH="${HOME}/cache/bin:${PATH}" micropython -c "import sys; print(sys.path)" micropython tests/ci.py --basic-auth + + - name: '☁️ Connect to IoT cloud (MicroPython / Basic Auth / Sync)' + env: + DEVICE_ID: ${{ secrets.DEVICE_ID1 }} + SECRET_KEY: ${{ secrets.SECRET_KEY }} + run: | + export PATH="${HOME}/cache/bin:${PATH}" + micropython -c "import sys; print(sys.path)" + micropython tests/ci.py --basic-auth --sync diff --git a/src/arduino_iot_cloud/ucloud.py b/src/arduino_iot_cloud/ucloud.py index 2208a57..36e5e3b 100644 --- a/src/arduino_iot_cloud/ucloud.py +++ b/src/arduino_iot_cloud/ucloud.py @@ -63,7 +63,7 @@ def __init__(self, name, **kwargs): self._updated = False self.on_write_scheduled = False self.timestamp = timestamp() - self.last_run = timestamp_ms() + self.last_poll = timestamp_ms() self.runnable = any((self.on_run, self.on_read, self.on_write)) callback = kwargs.pop("callback", self.senml_callback) for key in kwargs: # kwargs should be empty by now, unless a wrong attr was used. @@ -193,7 +193,6 @@ def __init__( self.thing_id = None self.keepalive = keepalive self.last_ping = timestamp() - self.last_run = timestamp() self.senmlpack = SenmlPack("", self.senml_generic_callback) self.ntp_server = ntp_server self.ntp_timeout = ntp_timeout @@ -322,8 +321,20 @@ def mqtt_callback(self, topic, message): self.senmlpack.from_cbor(message) self.senmlpack.clear() - def ts_expired(self, record, ts): - return (ts - record.last_run) > int(record.interval * 1000) + def ts_expired(self, ts, last_ts_ms, interval_s): + return last_ts_ms == 0 or (ts - last_ts_ms) > int(interval_s * 1000) + + def poll_records(self): + ts = timestamp_ms() + try: + for record in self.records.values(): + if record.runnable and self.ts_expired(ts, record.last_poll, record.interval): + record.run_sync(self) + record.last_poll = ts + except Exception as e: + self.records.pop(record.name) + if log_level_enabled(logging.ERROR): + logging.error(f"task: {record.name} raised exception: {str(e)}.") def poll_connect(self, aiot=None): logging.info("Connecting to Arduino IoT cloud...") @@ -332,7 +343,7 @@ def poll_connect(self, aiot=None): except Exception as e: if log_level_enabled(logging.WARNING): logging.warning(f"Connection failed {e}, retrying...") - return False + return if self.thing_id is None: self.mqtt.subscribe(self.device_topic, qos=1) @@ -341,10 +352,10 @@ def poll_connect(self, aiot=None): if self.async_mode: if self.thing_id is None: - self.register("discovery", on_run=self.poll_discovery, interval=0.100) + self.register("discovery", on_run=self.poll_discovery, interval=0.200) self.register("mqtt_task", on_run=self.poll_mqtt, interval=0.100) raise DoneException() - return True + self.connected = True def poll_discovery(self, aiot=None): self.mqtt.check_msg() @@ -429,17 +440,26 @@ async def run(self, interval, backoff): def start(self, interval=1.0, backoff=1.2): if self.async_mode: asyncio.run(self.run(interval, backoff)) - else: - # Synchronous mode. - while not self.poll_connect(): - time.sleep(interval) - interval = min(interval * backoff, 5.0) + return - while self.thing_id is None: + last_conn_ms = 0 + last_disc_ms = 0 + + while True: + ts = timestamp_ms() + if not self.connected and self.ts_expired(ts, last_conn_ms, interval): + self.poll_connect() + if last_conn_ms != 0: + interval = min(interval * backoff, 5.0) + last_conn_ms = ts + + if self.connected and self.thing_id is None and self.ts_expired(ts, last_disc_ms, 0.250): self.poll_discovery() - time.sleep(0.100) + last_disc_ms = ts - self.connected = True + if self.connected and self.thing_id is not None: + break + self.poll_records() def update(self): if self.async_mode: @@ -448,20 +468,10 @@ def update(self): if not self.connected: try: self.start() - self.connected = True except Exception as e: raise e - try: - ts = timestamp_ms() - for record in self.records.values(): - if record.runnable and self.ts_expired(record, ts): - record.run_sync(self) - record.last_run = ts - except Exception as e: - self.records.pop(record.name) - if log_level_enabled(logging.ERROR): - logging.error(f"task: {record.name} raised exception: {str(e)}.") + self.poll_records() try: self.poll_mqtt() @@ -469,4 +479,3 @@ def update(self): self.connected = False if log_level_enabled(logging.WARNING): logging.warning(f"Connection lost {e}") - raise e diff --git a/tests/ci.py b/tests/ci.py index 41353e3..f639c49 100644 --- a/tests/ci.py +++ b/tests/ci.py @@ -3,9 +3,11 @@ # https://creativecommons.org/publicdomain/zero/1.0/ import logging import os +import time import sys import asyncio from arduino_iot_cloud import ArduinoCloudClient +from arduino_iot_cloud import Task import argparse @@ -20,6 +22,16 @@ def on_value_changed(client, value): sys.exit(0) +def wdt_task(client, ts=[None]): + if ts[0] is None: + ts[0] = time.time() + if time.time() - ts[0] > 5: + loop = asyncio.get_event_loop() + loop.set_exception_handler(exception_handler) + logging.error("Timeout waiting for variable") + sys.exit(1) + + if __name__ == "__main__": # Parse command line args. parser = argparse.ArgumentParser(description="arduino_iot_cloud.py") @@ -35,6 +47,9 @@ def on_value_changed(client, value): parser.add_argument( "-f", "--file-auth", action="store_true", help="Use key/cert files" ) + parser.add_argument( + "-s", "--sync", action="store_true", help="Run in synchronous mode" + ) args = parser.parse_args() # Configure the logger. @@ -56,6 +71,7 @@ def on_value_changed(client, value): device_id=os.getenv("DEVICE_ID"), username=os.getenv("DEVICE_ID"), password=os.getenv("SECRET_KEY"), + sync_mode=args.sync, ) elif args.file_auth: import ssl @@ -67,6 +83,7 @@ def on_value_changed(client, value): "ca_certs": "ca-root.pem", "cert_reqs": ssl.CERT_REQUIRED, }, + sync_mode=args.sync, ) elif args.crypto_device: import ssl @@ -82,16 +99,17 @@ def on_value_changed(client, value): "engine_path": "/lib/x86_64-linux-gnu/engines-3/libpkcs11.so", "module_path": "/lib/x86_64-linux-gnu/softhsm/libsofthsm2.so", }, + sync_mode=args.sync, ) else: parser.print_help() sys.exit(1) # Register cloud objects. - # Note: The following objects must be created first in the dashboard and linked to the device. - # This cloud object is initialized with its last known value from the cloud. When this object is updated - # from the dashboard, the on_switch_changed function is called with the client object and the new value. + # When this object gets initialized from the cloud the test is complete. client.register("answer", value=None, on_write=on_value_changed) + # This task will exist with failure after a timeout. + client.register(Task("wdt_task", on_run=wdt_task, interval=1.0)) # Start the Arduino IoT cloud client. client.start()