Skip to content

misc: Sync mode and workflow updates. #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions .github/workflows/client-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,50 @@ jobs:
run: |
source tests/ci.sh && ci_configure_softhsm

- name: '☁️ Connect to IoT cloud (CPython / Basic Auth)'
- name: '☁️ Connect to IoT cloud (CPython / Basic Auth / Async)'
env:
DEVICE_ID: ${{ secrets.DEVICE_ID1 }}
SECRET_KEY: ${{ secrets.SECRET_KEY }}
run: |
python tests/ci.py --basic-auth

- name: '☁️ Connect to IoT cloud (CPython / Key/Cert Auth)'
- name: '☁️ Connect to IoT cloud (CPython / Basic Auth / Sync)'
env:
DEVICE_ID: ${{ secrets.DEVICE_ID1 }}
SECRET_KEY: ${{ secrets.SECRET_KEY }}
run: |
python tests/ci.py --basic-auth --sync

- name: '☁️ Connect to IoT cloud (CPython / Key-Cert Auth / Async)'
env:
DEVICE_ID: ${{ secrets.DEVICE_ID2 }}
SECRET_KEY: ${{ secrets.SECRET_KEY }}
run: |
python tests/ci.py --file-auth


- name: '☁️ Connect to IoT cloud (CPython / Crypto Auth)'
- name: '☁️ Connect to IoT cloud (CPython / Crypto Auth / Async)'
env:
DEVICE_ID: ${{ secrets.DEVICE_ID2 }}
SECRET_KEY: ${{ secrets.SECRET_KEY }}
run: |
export SOFTHSM2_CONF="${HOME}/softhsm/tokens/softhsm2.conf"
python tests/ci.py --crypto-device

- name: '☁️ Connect to IoT cloud (MicroPython / Basic Auth)'
- name: '☁️ Connect to IoT cloud (MicroPython / Basic Auth / Async)'
env:
DEVICE_ID: ${{ secrets.DEVICE_ID1 }}
SECRET_KEY: ${{ secrets.SECRET_KEY }}
run: |
export PATH="${HOME}/cache/bin:${PATH}"
micropython -c "import sys; print(sys.path)"
micropython tests/ci.py --basic-auth

- name: '☁️ Connect to IoT cloud (MicroPython / Basic Auth / Sync)'
env:
DEVICE_ID: ${{ secrets.DEVICE_ID1 }}
SECRET_KEY: ${{ secrets.SECRET_KEY }}
run: |
export PATH="${HOME}/cache/bin:${PATH}"
micropython -c "import sys; print(sys.path)"
micropython tests/ci.py --basic-auth --sync
63 changes: 36 additions & 27 deletions src/arduino_iot_cloud/ucloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, name, **kwargs):
self._updated = False
self.on_write_scheduled = False
self.timestamp = timestamp()
self.last_run = timestamp_ms()
self.last_poll = timestamp_ms()
self.runnable = any((self.on_run, self.on_read, self.on_write))
callback = kwargs.pop("callback", self.senml_callback)
for key in kwargs: # kwargs should be empty by now, unless a wrong attr was used.
Expand Down Expand Up @@ -193,7 +193,6 @@ def __init__(
self.thing_id = None
self.keepalive = keepalive
self.last_ping = timestamp()
self.last_run = timestamp()
self.senmlpack = SenmlPack("", self.senml_generic_callback)
self.ntp_server = ntp_server
self.ntp_timeout = ntp_timeout
Expand Down Expand Up @@ -322,8 +321,20 @@ def mqtt_callback(self, topic, message):
self.senmlpack.from_cbor(message)
self.senmlpack.clear()

def ts_expired(self, record, ts):
return (ts - record.last_run) > int(record.interval * 1000)
def ts_expired(self, ts, last_ts_ms, interval_s):
return last_ts_ms == 0 or (ts - last_ts_ms) > int(interval_s * 1000)

def poll_records(self):
ts = timestamp_ms()
try:
for record in self.records.values():
if record.runnable and self.ts_expired(ts, record.last_poll, record.interval):
record.run_sync(self)
record.last_poll = ts
except Exception as e:
self.records.pop(record.name)
if log_level_enabled(logging.ERROR):
logging.error(f"task: {record.name} raised exception: {str(e)}.")

def poll_connect(self, aiot=None):
logging.info("Connecting to Arduino IoT cloud...")
Expand All @@ -332,7 +343,7 @@ def poll_connect(self, aiot=None):
except Exception as e:
if log_level_enabled(logging.WARNING):
logging.warning(f"Connection failed {e}, retrying...")
return False
return

if self.thing_id is None:
self.mqtt.subscribe(self.device_topic, qos=1)
Expand All @@ -341,10 +352,10 @@ def poll_connect(self, aiot=None):

if self.async_mode:
if self.thing_id is None:
self.register("discovery", on_run=self.poll_discovery, interval=0.100)
self.register("discovery", on_run=self.poll_discovery, interval=0.200)
self.register("mqtt_task", on_run=self.poll_mqtt, interval=0.100)
raise DoneException()
return True
self.connected = True

def poll_discovery(self, aiot=None):
self.mqtt.check_msg()
Expand Down Expand Up @@ -429,17 +440,26 @@ async def run(self, interval, backoff):
def start(self, interval=1.0, backoff=1.2):
if self.async_mode:
asyncio.run(self.run(interval, backoff))
else:
# Synchronous mode.
while not self.poll_connect():
time.sleep(interval)
interval = min(interval * backoff, 5.0)
return

while self.thing_id is None:
last_conn_ms = 0
last_disc_ms = 0

while True:
ts = timestamp_ms()
if not self.connected and self.ts_expired(ts, last_conn_ms, interval):
self.poll_connect()
if last_conn_ms != 0:
interval = min(interval * backoff, 5.0)
last_conn_ms = ts

if self.connected and self.thing_id is None and self.ts_expired(ts, last_disc_ms, 0.250):
self.poll_discovery()
time.sleep(0.100)
last_disc_ms = ts

self.connected = True
if self.connected and self.thing_id is not None:
break
self.poll_records()

def update(self):
if self.async_mode:
Expand All @@ -448,25 +468,14 @@ def update(self):
if not self.connected:
try:
self.start()
self.connected = True
except Exception as e:
raise e

try:
ts = timestamp_ms()
for record in self.records.values():
if record.runnable and self.ts_expired(record, ts):
record.run_sync(self)
record.last_run = ts
except Exception as e:
self.records.pop(record.name)
if log_level_enabled(logging.ERROR):
logging.error(f"task: {record.name} raised exception: {str(e)}.")
self.poll_records()

try:
self.poll_mqtt()
except Exception as e:
self.connected = False
if log_level_enabled(logging.WARNING):
logging.warning(f"Connection lost {e}")
raise e
24 changes: 21 additions & 3 deletions tests/ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
# https://creativecommons.org/publicdomain/zero/1.0/
import logging
import os
import time
import sys
import asyncio
from arduino_iot_cloud import ArduinoCloudClient
from arduino_iot_cloud import Task
import argparse


Expand All @@ -20,6 +22,16 @@ def on_value_changed(client, value):
sys.exit(0)


def wdt_task(client, ts=[None]):
if ts[0] is None:
ts[0] = time.time()
if time.time() - ts[0] > 5:
loop = asyncio.get_event_loop()
loop.set_exception_handler(exception_handler)
logging.error("Timeout waiting for variable")
sys.exit(1)


if __name__ == "__main__":
# Parse command line args.
parser = argparse.ArgumentParser(description="arduino_iot_cloud.py")
Expand All @@ -35,6 +47,9 @@ def on_value_changed(client, value):
parser.add_argument(
"-f", "--file-auth", action="store_true", help="Use key/cert files"
)
parser.add_argument(
"-s", "--sync", action="store_true", help="Run in synchronous mode"
)
args = parser.parse_args()

# Configure the logger.
Expand All @@ -56,6 +71,7 @@ def on_value_changed(client, value):
device_id=os.getenv("DEVICE_ID"),
username=os.getenv("DEVICE_ID"),
password=os.getenv("SECRET_KEY"),
sync_mode=args.sync,
)
elif args.file_auth:
import ssl
Expand All @@ -67,6 +83,7 @@ def on_value_changed(client, value):
"ca_certs": "ca-root.pem",
"cert_reqs": ssl.CERT_REQUIRED,
},
sync_mode=args.sync,
)
elif args.crypto_device:
import ssl
Expand All @@ -82,16 +99,17 @@ def on_value_changed(client, value):
"engine_path": "/lib/x86_64-linux-gnu/engines-3/libpkcs11.so",
"module_path": "/lib/x86_64-linux-gnu/softhsm/libsofthsm2.so",
},
sync_mode=args.sync,
)
else:
parser.print_help()
sys.exit(1)

# Register cloud objects.
# Note: The following objects must be created first in the dashboard and linked to the device.
# This cloud object is initialized with its last known value from the cloud. When this object is updated
# from the dashboard, the on_switch_changed function is called with the client object and the new value.
# When this object gets initialized from the cloud the test is complete.
client.register("answer", value=None, on_write=on_value_changed)
# This task will exist with failure after a timeout.
client.register(Task("wdt_task", on_run=wdt_task, interval=1.0))

# Start the Arduino IoT cloud client.
client.start()
Loading